risingwave_meta/barrier/
rpc.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::error::Error;
18use std::fmt::{Debug, Formatter};
19use std::future::poll_fn;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::future::{BoxFuture, join_all};
27use futures::{FutureExt, StreamExt};
28use itertools::Itertools;
29use risingwave_common::bail;
30use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
31use risingwave_common::id::JobId;
32use risingwave_common::util::epoch::Epoch;
33use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
34use risingwave_common::util::tracing::TracingContext;
35use risingwave_connector::source::SplitImpl;
36use risingwave_meta_model::WorkerId;
37use risingwave_pb::common::{HostAddress, WorkerNode};
38use risingwave_pb::hummock::HummockVersionStats;
39use risingwave_pb::id::PartialGraphId;
40use risingwave_pb::source::{PbCdcTableSnapshotSplits, PbCdcTableSnapshotSplitsWithGeneration};
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::stream_node::NodeBody;
43use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation};
44use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
45use risingwave_pb::stream_service::inject_barrier_request::{
46    BuildActorInfo, FragmentBuildActorInfo,
47};
48use risingwave_pb::stream_service::streaming_control_stream_request::{
49    CreatePartialGraphRequest, PbCreatePartialGraphRequest, PbInitRequest,
50    RemovePartialGraphRequest, ResetPartialGraphsRequest,
51};
52use risingwave_pb::stream_service::{
53    InjectBarrierRequest, StreamingControlStreamRequest, streaming_control_stream_request,
54    streaming_control_stream_response,
55};
56use risingwave_rpc_client::StreamingControlHandle;
57use thiserror_ext::AsReport;
58use tokio::time::{Instant, sleep};
59use tokio_retry::strategy::ExponentialBackoff;
60use tracing::{debug, error, info, warn};
61use uuid::Uuid;
62
63use super::{BarrierKind, TracedEpoch};
64use crate::barrier::BackfillOrderState;
65use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
66use crate::barrier::cdc_progress::CdcTableBackfillTracker;
67use crate::barrier::checkpoint::{
68    BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
69    DatabaseCheckpointControlMetrics, IndependentCheckpointJobControl,
70};
71use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
72use crate::barrier::edge_builder::{EdgeBuilderFragmentInfo, FragmentEdgeBuilder};
73use crate::barrier::info::{
74    BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
75    SubscriberType,
76};
77use crate::barrier::partial_graph::PartialGraphRecoverer;
78use crate::barrier::progress::CreateMviewProgressTracker;
79use crate::barrier::utils::NodeToCollect;
80use crate::controller::fragment::InflightFragmentInfo;
81use crate::controller::utils::StreamingJobExtraInfo;
82use crate::manager::MetaSrvEnv;
83use crate::model::{
84    ActorId, FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
85    SubscriptionId,
86};
87use crate::stream::cdc::{
88    CdcTableSnapshotSplits, is_parallelized_backfill_enabled_cdc_scan_fragment,
89};
90use crate::stream::{
91    ExtendedFragmentBackfillOrder, StreamFragmentGraph, UserDefinedFragmentBackfillOrder,
92    build_actor_connector_splits,
93};
94use crate::{MetaError, MetaResult};
95
96pub(super) fn to_partial_graph_id(
97    database_id: DatabaseId,
98    creating_job_id: Option<JobId>,
99) -> PartialGraphId {
100    let raw_job_id = creating_job_id
101        .map(|job_id| {
102            assert_ne!(job_id, u32::MAX);
103            job_id.as_raw_id()
104        })
105        .unwrap_or(u32::MAX);
106    (((database_id.as_raw_id() as u64) << 32) | (raw_job_id as u64)).into()
107}
108
109pub(super) fn from_partial_graph_id(
110    partial_graph_id: PartialGraphId,
111) -> (DatabaseId, Option<JobId>) {
112    let id = partial_graph_id.as_raw_id();
113    let database_id = (id >> 32) as u32;
114    let raw_creating_job_id = (id & ((1 << 32) - 1)) as u32;
115    let creating_job_id = if raw_creating_job_id == u32::MAX {
116        None
117    } else {
118        Some(JobId::new(raw_creating_job_id))
119    };
120    (database_id.into(), creating_job_id)
121}
122
123pub(super) fn build_locality_fragment_state_table_mapping(
124    fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
125) -> HashMap<FragmentId, Vec<TableId>> {
126    let mut mapping = HashMap::new();
127
128    for (fragment_id, fragment_info) in fragment_infos {
129        let mut state_table_ids = Vec::new();
130        visit_stream_node_cont(&fragment_info.nodes, |stream_node| {
131            if let Some(NodeBody::LocalityProvider(locality_provider)) =
132                stream_node.node_body.as_ref()
133            {
134                let state_table_id = locality_provider
135                    .state_table
136                    .as_ref()
137                    .expect("must have state table")
138                    .id;
139                state_table_ids.push(state_table_id);
140                false
141            } else {
142                true
143            }
144        });
145        if !state_table_ids.is_empty() {
146            mapping.insert(*fragment_id, state_table_ids);
147        }
148    }
149
150    mapping
151}
152
153pub(super) fn database_partial_graphs<'a>(
154    database_id: DatabaseId,
155    creating_jobs: impl Iterator<Item = JobId> + Sized + 'a,
156) -> impl Iterator<Item = PartialGraphId> + 'a {
157    creating_jobs
158        .map(Some)
159        .chain([None])
160        .map(move |creating_job_id| to_partial_graph_id(database_id, creating_job_id))
161}
162
163struct ControlStreamNode {
164    worker_id: WorkerId,
165    host: HostAddress,
166    handle: StreamingControlHandle,
167}
168
169enum WorkerNodeState {
170    Connected {
171        control_stream: ControlStreamNode,
172        removed: bool,
173    },
174    Reconnecting(BoxFuture<'static, StreamingControlHandle>),
175}
176
177pub(super) struct ControlStreamManager {
178    workers: HashMap<WorkerId, (WorkerNode, WorkerNodeState)>,
179    pub env: MetaSrvEnv,
180}
181
182impl ControlStreamManager {
183    pub(super) fn new(env: MetaSrvEnv) -> Self {
184        Self {
185            workers: Default::default(),
186            env,
187        }
188    }
189
190    pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
191        self.workers[&worker_id].0.host.clone().unwrap()
192    }
193
194    pub(super) async fn add_worker(
195        &mut self,
196        node: WorkerNode,
197        partial_graphs: impl Iterator<Item = PartialGraphId>,
198        term_id: &String,
199        context: &impl GlobalBarrierWorkerContext,
200    ) {
201        let node_id = node.id;
202        if let Entry::Occupied(entry) = self.workers.entry(node_id) {
203            let (existing_node, worker_state) = entry.get();
204            assert_eq!(existing_node.host, node.host);
205            warn!(id = %node.id, host = ?node.host, "node already exists");
206            match worker_state {
207                WorkerNodeState::Connected { .. } => {
208                    warn!(id = %node.id, host = ?node.host, "new node already connected");
209                    return;
210                }
211                WorkerNodeState::Reconnecting(_) => {
212                    warn!(id = %node.id, host = ?node.host, "remove previous pending worker connect request and reconnect");
213                    entry.remove();
214                }
215            }
216        }
217        let node_host = node.host.clone().unwrap();
218        let mut backoff = ExponentialBackoff::from_millis(100)
219            .max_delay(Duration::from_secs(3))
220            .factor(5);
221        const MAX_RETRY: usize = 5;
222        for i in 1..=MAX_RETRY {
223            match context
224                .new_control_stream(
225                    &node,
226                    &PbInitRequest {
227                        term_id: term_id.clone(),
228                    },
229                )
230                .await
231            {
232                Ok(mut handle) => {
233                    WorkerNodeConnected {
234                        handle: &mut handle,
235                        node: &node,
236                    }
237                    .initialize(partial_graphs);
238                    info!(?node_host, "add control stream worker");
239                    assert!(
240                        self.workers
241                            .insert(
242                                node_id,
243                                (
244                                    node,
245                                    WorkerNodeState::Connected {
246                                        control_stream: ControlStreamNode {
247                                            worker_id: node_id as _,
248                                            host: node_host,
249                                            handle,
250                                        },
251                                        removed: false
252                                    }
253                                )
254                            )
255                            .is_none()
256                    );
257                    return;
258                }
259                Err(e) => {
260                    // It may happen that the dns information of newly registered worker node
261                    // has not been propagated to the meta node and cause error. Wait for a while and retry
262                    let delay = backoff.next().unwrap();
263                    error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
264                    sleep(delay).await;
265                }
266            }
267        }
268        error!(?node_host, "fail to create worker node after retry");
269    }
270
271    pub(super) fn remove_worker(&mut self, node: WorkerNode) {
272        if let Entry::Occupied(mut entry) = self.workers.entry(node.id) {
273            let (_, worker_state) = entry.get_mut();
274            match worker_state {
275                WorkerNodeState::Connected { removed, .. } => {
276                    info!(worker_id = %node.id, "mark connected worker as removed");
277                    *removed = true;
278                }
279                WorkerNodeState::Reconnecting(_) => {
280                    info!(worker_id = %node.id, "remove worker");
281                    entry.remove();
282                }
283            }
284        }
285    }
286
287    fn retry_connect(
288        node: WorkerNode,
289        term_id: String,
290        context: Arc<impl GlobalBarrierWorkerContext>,
291    ) -> BoxFuture<'static, StreamingControlHandle> {
292        async move {
293            let mut attempt = 0;
294            let backoff = ExponentialBackoff::from_millis(100)
295                .max_delay(Duration::from_mins(1))
296                .factor(5);
297            let init_request = PbInitRequest { term_id };
298            for delay in backoff {
299                attempt += 1;
300                sleep(delay).await;
301                match context.new_control_stream(&node, &init_request).await {
302                    Ok(handle) => {
303                        return handle;
304                    }
305                    Err(e) => {
306                        warn!(e = %e.as_report(), ?node, attempt, "fail to create control stream worker");
307                    }
308                }
309            }
310            unreachable!("end of retry backoff")
311        }.boxed()
312    }
313
314    pub(super) async fn recover(
315        env: MetaSrvEnv,
316        nodes: &HashMap<WorkerId, WorkerNode>,
317        term_id: &str,
318        context: Arc<impl GlobalBarrierWorkerContext>,
319    ) -> Self {
320        let reset_start_time = Instant::now();
321        let init_request = PbInitRequest {
322            term_id: term_id.to_owned(),
323        };
324        let init_request = &init_request;
325        let nodes = join_all(nodes.iter().map(|(worker_id, node)| async {
326            let result = context.new_control_stream(node, init_request).await;
327            (*worker_id, node.clone(), result)
328        }))
329        .await;
330        let mut unconnected_workers = HashSet::new();
331        let mut workers = HashMap::new();
332        for (worker_id, node, result) in nodes {
333            match result {
334                Ok(handle) => {
335                    let control_stream = ControlStreamNode {
336                        worker_id: node.id,
337                        host: node.host.clone().unwrap(),
338                        handle,
339                    };
340                    assert!(
341                        workers
342                            .insert(
343                                worker_id,
344                                (
345                                    node,
346                                    WorkerNodeState::Connected {
347                                        control_stream,
348                                        removed: false
349                                    }
350                                )
351                            )
352                            .is_none()
353                    );
354                }
355                Err(e) => {
356                    unconnected_workers.insert(worker_id);
357                    warn!(
358                        e = %e.as_report(),
359                        %worker_id,
360                        ?node,
361                        "failed to connect to node"
362                    );
363                    assert!(
364                        workers
365                            .insert(
366                                worker_id,
367                                (
368                                    node.clone(),
369                                    WorkerNodeState::Reconnecting(Self::retry_connect(
370                                        node,
371                                        term_id.to_owned(),
372                                        context.clone()
373                                    ))
374                                )
375                            )
376                            .is_none()
377                    );
378                }
379            }
380        }
381
382        info!(elapsed=?reset_start_time.elapsed(), ?unconnected_workers, "control stream reset");
383
384        Self { workers, env }
385    }
386
387    /// Clear all nodes and response streams in the manager.
388    pub(super) fn clear(&mut self) {
389        *self = Self::new(self.env.clone());
390    }
391}
392
393pub(super) struct WorkerNodeConnected<'a> {
394    node: &'a WorkerNode,
395    handle: &'a mut StreamingControlHandle,
396}
397
398impl<'a> WorkerNodeConnected<'a> {
399    pub(super) fn initialize(self, partial_graphs: impl Iterator<Item = PartialGraphId>) {
400        for partial_graph_id in partial_graphs {
401            if let Err(e) = self.handle.send_request(StreamingControlStreamRequest {
402                request: Some(
403                    streaming_control_stream_request::Request::CreatePartialGraph(
404                        PbCreatePartialGraphRequest { partial_graph_id },
405                    ),
406                ),
407            }) {
408                warn!(e = %e.as_report(), node = ?self.node, "failed to send initial partial graph request");
409            }
410        }
411    }
412}
413
414pub(super) enum WorkerNodeEvent<'a> {
415    Response(MetaResult<streaming_control_stream_response::Response>),
416    Connected(WorkerNodeConnected<'a>),
417}
418
419impl ControlStreamManager {
420    fn poll_next_event<'a>(
421        this_opt: &mut Option<&'a mut Self>,
422        cx: &mut Context<'_>,
423        term_id: &str,
424        context: &Arc<impl GlobalBarrierWorkerContext>,
425        poll_reconnect: bool,
426    ) -> Poll<(WorkerId, WorkerNodeEvent<'a>)> {
427        let this = this_opt.as_mut().expect("Future polled after completion");
428        if this.workers.is_empty() {
429            return Poll::Pending;
430        }
431        {
432            for (&worker_id, (node, worker_state)) in &mut this.workers {
433                let control_stream = match worker_state {
434                    WorkerNodeState::Connected { control_stream, .. } => control_stream,
435                    WorkerNodeState::Reconnecting(_) if !poll_reconnect => {
436                        continue;
437                    }
438                    WorkerNodeState::Reconnecting(join_handle) => {
439                        match join_handle.poll_unpin(cx) {
440                            Poll::Ready(handle) => {
441                                info!(id=%node.id, host=?node.host, "reconnected to worker");
442                                *worker_state = WorkerNodeState::Connected {
443                                    control_stream: ControlStreamNode {
444                                        worker_id: node.id,
445                                        host: node.host.clone().unwrap(),
446                                        handle,
447                                    },
448                                    removed: false,
449                                };
450                                let this = this_opt.take().expect("should exist");
451                                let (node, worker_state) =
452                                    this.workers.get_mut(&worker_id).expect("should exist");
453                                let WorkerNodeState::Connected { control_stream, .. } =
454                                    worker_state
455                                else {
456                                    unreachable!()
457                                };
458                                return Poll::Ready((
459                                    worker_id,
460                                    WorkerNodeEvent::Connected(WorkerNodeConnected {
461                                        node,
462                                        handle: &mut control_stream.handle,
463                                    }),
464                                ));
465                            }
466                            Poll::Pending => {
467                                continue;
468                            }
469                        }
470                    }
471                };
472                match control_stream.handle.response_stream.poll_next_unpin(cx) {
473                    Poll::Ready(result) => {
474                        {
475                            let result = result
476                                .ok_or_else(|| (false, anyhow!("end of stream").into()))
477                                .and_then(|result| {
478                                    result.map_err(|err| -> (bool, MetaError) { (false, err.into()) }).and_then(|resp| {
479                                        match resp
480                                            .response
481                                            .ok_or_else(|| (false, anyhow!("empty response").into()))?
482                                        {
483                                            streaming_control_stream_response::Response::Shutdown(_) => Err((true, anyhow!(
484                                                "worker node {worker_id} is shutting down"
485                                            )
486                                                .into())),
487                                            streaming_control_stream_response::Response::Init(_) => {
488                                                // This arm should be unreachable.
489                                                Err((false, anyhow!("get unexpected init response").into()))
490                                            }
491                                            resp => {
492                                                if let streaming_control_stream_response::Response::CompleteBarrier(barrier_resp) = &resp {
493                                                    assert_eq!(worker_id, barrier_resp.worker_id);
494                                                }
495                                                Ok(resp)
496                                            }
497                                        }
498                                    })
499                                });
500                            let result = match result {
501                                Ok(resp) => Ok(resp),
502                                Err((shutdown, err)) => {
503                                    warn!(worker_id = %node.id, host = ?node.host, err = %err.as_report(), "get error from response stream");
504                                    let WorkerNodeState::Connected { removed, .. } = worker_state
505                                    else {
506                                        unreachable!("checked connected")
507                                    };
508                                    if *removed || shutdown {
509                                        this.workers.remove(&worker_id);
510                                    } else {
511                                        *worker_state = WorkerNodeState::Reconnecting(
512                                            ControlStreamManager::retry_connect(
513                                                node.clone(),
514                                                term_id.to_owned(),
515                                                context.clone(),
516                                            ),
517                                        );
518                                    }
519                                    Err(err)
520                                }
521                            };
522                            return Poll::Ready((worker_id, WorkerNodeEvent::Response(result)));
523                        }
524                    }
525                    Poll::Pending => {
526                        continue;
527                    }
528                }
529            }
530        };
531
532        Poll::Pending
533    }
534
535    #[await_tree::instrument("control_stream_next_event")]
536    pub(super) async fn next_event<'a>(
537        &'a mut self,
538        term_id: &str,
539        context: &Arc<impl GlobalBarrierWorkerContext>,
540    ) -> (WorkerId, WorkerNodeEvent<'a>) {
541        let mut this = Some(self);
542        poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, true)).await
543    }
544
545    #[await_tree::instrument("control_stream_next_response")]
546    pub(super) async fn next_response(
547        &mut self,
548        term_id: &str,
549        context: &Arc<impl GlobalBarrierWorkerContext>,
550    ) -> (
551        WorkerId,
552        MetaResult<streaming_control_stream_response::Response>,
553    ) {
554        let mut this = Some(self);
555        let (worker_id, event) =
556            poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, false)).await;
557        match event {
558            WorkerNodeEvent::Response(result) => (worker_id, result),
559            WorkerNodeEvent::Connected(_) => {
560                unreachable!("set poll_reconnect=false")
561            }
562        }
563    }
564}
565
566pub(super) struct DatabaseInitialBarrierCollector {
567    pub(super) database_id: DatabaseId,
568    pub(super) initializing_partial_graphs: HashSet<PartialGraphId>,
569    pub(super) database: DatabaseCheckpointControl,
570}
571
572impl Debug for DatabaseInitialBarrierCollector {
573    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
574        f.debug_struct("DatabaseInitialBarrierCollector")
575            .field("database_id", &self.database_id)
576            .field("initializing_graphs", &self.initializing_partial_graphs)
577            .finish()
578    }
579}
580
581impl DatabaseInitialBarrierCollector {
582    pub(super) fn is_collected(&self) -> bool {
583        self.initializing_partial_graphs.is_empty()
584    }
585
586    pub(super) fn partial_graph_initialized(&mut self, partial_graph_id: PartialGraphId) {
587        assert!(self.initializing_partial_graphs.remove(&partial_graph_id));
588    }
589
590    pub(super) fn all_partial_graphs(&self) -> impl Iterator<Item = PartialGraphId> + '_ {
591        database_partial_graphs(
592            self.database_id,
593            self.database
594                .independent_checkpoint_job_controls
595                .keys()
596                .copied(),
597        )
598    }
599
600    pub(super) fn finish(self) -> DatabaseCheckpointControl {
601        assert!(self.is_collected());
602        self.database
603    }
604
605    pub(super) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
606        self.database.is_valid_after_worker_err(worker_id)
607    }
608}
609
610impl PartialGraphRecoverer<'_> {
611    /// Extract information from the loaded runtime barrier worker snapshot info, and inject the initial barrier.
612    #[expect(clippy::too_many_arguments)]
613    pub(super) fn inject_database_initial_barrier(
614        &mut self,
615        database_id: DatabaseId,
616        jobs: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
617        job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
618        state_table_committed_epochs: &mut HashMap<TableId, u64>,
619        state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
620        fragment_relations: &FragmentDownstreamRelation,
621        stream_actors: &HashMap<ActorId, StreamActor>,
622        source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
623        background_jobs: &mut HashSet<JobId>,
624        mv_depended_subscriptions: &mut HashMap<TableId, HashMap<SubscriptionId, u64>>,
625        is_paused: bool,
626        hummock_version_stats: &HummockVersionStats,
627        cdc_table_snapshot_splits: &mut HashMap<JobId, CdcTableSnapshotSplits>,
628    ) -> MetaResult<DatabaseCheckpointControl> {
629        fn collect_source_splits(
630            fragment_infos: impl Iterator<Item = &InflightFragmentInfo>,
631            source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
632        ) -> HashMap<ActorId, Vec<SplitImpl>> {
633            fragment_infos
634                .flat_map(|info| info.actors.keys())
635                .filter_map(|actor_id| {
636                    let actor_id = *actor_id as ActorId;
637                    source_splits
638                        .remove(&actor_id)
639                        .map(|splits| (actor_id, splits))
640                })
641                .collect()
642        }
643        fn build_mutation(
644            splits: &HashMap<ActorId, Vec<SplitImpl>>,
645            cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits>,
646            backfill_orders: &ExtendedFragmentBackfillOrder,
647            is_paused: bool,
648        ) -> Mutation {
649            let backfill_nodes_to_pause = get_nodes_with_backfill_dependencies(backfill_orders)
650                .into_iter()
651                .collect();
652            Mutation::Add(AddMutation {
653                // Actors built during recovery is not treated as newly added actors.
654                actor_dispatchers: Default::default(),
655                added_actors: Default::default(),
656                actor_splits: build_actor_connector_splits(splits),
657                actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
658                    splits: cdc_table_snapshot_split_assignment,
659                }),
660                pause: is_paused,
661                subscriptions_to_add: Default::default(),
662                backfill_nodes_to_pause,
663                new_upstream_sinks: Default::default(),
664            })
665        }
666
667        fn resolve_jobs_committed_epoch<'a>(
668            state_table_committed_epochs: &mut HashMap<TableId, u64>,
669            fragments: impl Iterator<Item = &'a InflightFragmentInfo> + 'a,
670        ) -> u64 {
671            let mut epochs = InflightFragmentInfo::existing_table_ids(fragments).map(|table_id| {
672                (
673                    table_id,
674                    state_table_committed_epochs
675                        .remove(&table_id)
676                        .expect("should exist"),
677                )
678            });
679            let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
680            for (table_id, epoch) in epochs {
681                assert_eq!(
682                    prev_epoch, epoch,
683                    "{} has different committed epoch to {}",
684                    first_table_id, table_id
685                );
686            }
687            prev_epoch
688        }
689        fn job_backfill_orders(
690            job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
691            job_id: JobId,
692        ) -> UserDefinedFragmentBackfillOrder {
693            UserDefinedFragmentBackfillOrder::new(
694                job_extra_info
695                    .get(&job_id)
696                    .and_then(|info| info.backfill_orders.clone())
697                    .map_or_else(HashMap::new, |orders| orders.0),
698            )
699        }
700
701        let mut subscribers: HashMap<_, HashMap<_, _>> = jobs
702            .keys()
703            .filter_map(|job_id| {
704                mv_depended_subscriptions
705                    .remove(&job_id.as_mv_table_id())
706                    .map(|subscriptions| {
707                        (
708                            job_id.as_mv_table_id(),
709                            subscriptions
710                                .into_iter()
711                                .map(|(subscription_id, retention)| {
712                                    (
713                                        subscription_id.as_subscriber_id(),
714                                        SubscriberType::Subscription(retention),
715                                    )
716                                })
717                                .collect(),
718                        )
719                    })
720            })
721            .collect();
722
723        let mut database_jobs = HashMap::new();
724        let mut snapshot_backfill_jobs = HashMap::new();
725
726        for (job_id, job_fragments) in jobs {
727            if background_jobs.remove(&job_id) {
728                if job_fragments.values().any(|fragment| {
729                    fragment
730                        .fragment_type_mask
731                        .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
732                }) {
733                    debug!(%job_id, "recovered snapshot backfill job");
734                    snapshot_backfill_jobs.insert(job_id, job_fragments);
735                } else {
736                    database_jobs.insert(job_id, (job_fragments, true));
737                }
738            } else {
739                database_jobs.insert(job_id, (job_fragments, false));
740            }
741        }
742
743        let database_job_log_epochs: HashMap<_, _> = database_jobs
744            .keys()
745            .filter_map(|job_id| {
746                state_table_log_epochs
747                    .remove(&job_id.as_mv_table_id())
748                    .map(|epochs| (job_id.as_mv_table_id(), epochs))
749            })
750            .collect();
751
752        let prev_epoch = resolve_jobs_committed_epoch(
753            state_table_committed_epochs,
754            database_jobs.values().flat_map(|(job, _)| job.values()),
755        );
756        let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
757        // Use a different `curr_epoch` for each recovery attempt.
758        let curr_epoch = prev_epoch.next();
759        let barrier_info = BarrierInfo {
760            prev_epoch,
761            curr_epoch,
762            kind: BarrierKind::Initial,
763        };
764
765        let mut ongoing_snapshot_backfill_jobs: HashMap<JobId, _> = HashMap::new();
766        for (job_id, fragment_infos) in snapshot_backfill_jobs {
767            let committed_epoch =
768                resolve_jobs_committed_epoch(state_table_committed_epochs, fragment_infos.values());
769            if committed_epoch == barrier_info.prev_epoch() {
770                info!(
771                    "recovered creating snapshot backfill job {} catch up with upstream already",
772                    job_id
773                );
774                database_jobs
775                    .try_insert(job_id, (fragment_infos, true))
776                    .expect("non-duplicate");
777                continue;
778            }
779            let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
780                fragment_infos
781                    .values()
782                    .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
783            )?
784            .0
785            .ok_or_else(|| {
786                anyhow!(
787                    "recovered snapshot backfill job {} has no snapshot backfill info",
788                    job_id
789                )
790            })?;
791            let mut snapshot_epoch = None;
792            let upstream_table_ids: HashSet<_> = snapshot_backfill_info
793                .upstream_mv_table_id_to_backfill_epoch
794                .keys()
795                .cloned()
796                .collect();
797            for (upstream_table_id, epoch) in
798                snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
799            {
800                let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
801                let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
802                if *snapshot_epoch != epoch {
803                    return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
804                }
805            }
806            let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
807                anyhow!(
808                    "snapshot backfill job {} has not set snapshot epoch",
809                    job_id
810                )
811            })?;
812            for upstream_table_id in &upstream_table_ids {
813                subscribers
814                    .entry(*upstream_table_id)
815                    .or_default()
816                    .try_insert(job_id.as_subscriber_id(), SubscriberType::SnapshotBackfill)
817                    .expect("non-duplicate");
818            }
819            ongoing_snapshot_backfill_jobs
820                .try_insert(
821                    job_id,
822                    (
823                        fragment_infos,
824                        upstream_table_ids,
825                        committed_epoch,
826                        snapshot_epoch,
827                    ),
828                )
829                .expect("non-duplicated");
830        }
831
832        let mut cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits> =
833            HashMap::new();
834
835        let database_jobs: HashMap<JobId, InflightStreamingJobInfo> = {
836            database_jobs
837                .into_iter()
838                .map(|(job_id, (fragment_infos, is_background_creating))| {
839                    let status = if is_background_creating {
840                        let backfill_ordering = job_backfill_orders(job_extra_info, job_id);
841                        let backfill_ordering = StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
842                            backfill_ordering,
843                            fragment_relations,
844                            || fragment_infos.iter().map(|(fragment_id, fragment)| {
845                            (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
846                        }));
847                        let locality_fragment_state_table_mapping =
848                            build_locality_fragment_state_table_mapping(&fragment_infos);
849                        let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
850                            &backfill_ordering,
851                            &fragment_infos,
852                            locality_fragment_state_table_mapping,
853                        );
854                        CreateStreamingJobStatus::Creating {
855                            tracker: CreateMviewProgressTracker::recover(
856                                job_id,
857                                &fragment_infos,
858                                backfill_order_state,
859                                hummock_version_stats,
860                            ),
861                        }
862                    } else {
863                        CreateStreamingJobStatus::Created
864                    };
865                    let cdc_table_backfill_tracker =
866                        if let Some(splits) = cdc_table_snapshot_splits.remove(&job_id) {
867                            let cdc_fragment = fragment_infos
868                                .values()
869                                .find(|fragment| {
870                                    is_parallelized_backfill_enabled_cdc_scan_fragment(
871                                        fragment.fragment_type_mask,
872                                        &fragment.nodes,
873                                    )
874                                    .is_some()
875                                })
876                                .expect("should have parallel cdc fragment");
877                            let cdc_actors = cdc_fragment.actors.keys().copied().collect();
878                            let mut tracker =
879                                CdcTableBackfillTracker::restore(cdc_fragment.fragment_id, splits);
880                            cdc_table_snapshot_split_assignment
881                                .extend(tracker.reassign_splits(cdc_actors)?);
882                            Some(tracker)
883                        } else {
884                            None
885                        };
886                    Ok((
887                        job_id,
888                        InflightStreamingJobInfo {
889                            job_id,
890                            fragment_infos,
891                            subscribers: subscribers
892                                .remove(&job_id.as_mv_table_id())
893                                .unwrap_or_default(),
894                            status,
895                            cdc_table_backfill_tracker,
896                        },
897                    ))
898                })
899                .try_collect::<_, _, MetaError>()
900        }?;
901
902        let control_stream_manager = self.control_stream_manager();
903        let mut builder = FragmentEdgeBuilder::new(
904            database_jobs
905                .values()
906                .flat_map(|job| {
907                    let partial_graph_id = to_partial_graph_id(database_id, None);
908                    job.fragment_infos().map(move |info| {
909                        (
910                            info.fragment_id,
911                            EdgeBuilderFragmentInfo::from_inflight(
912                                info,
913                                partial_graph_id,
914                                control_stream_manager,
915                            ),
916                        )
917                    })
918                })
919                .chain(ongoing_snapshot_backfill_jobs.iter().flat_map(
920                    |(job_id, (fragments, ..))| {
921                        let partial_graph_id = to_partial_graph_id(database_id, Some(*job_id));
922                        fragments.values().map(move |fragment| {
923                            (
924                                fragment.fragment_id,
925                                EdgeBuilderFragmentInfo::from_inflight(
926                                    fragment,
927                                    partial_graph_id,
928                                    control_stream_manager,
929                                ),
930                            )
931                        })
932                    },
933                )),
934        );
935        builder.add_relations(fragment_relations);
936        let mut edges = builder.build();
937
938        {
939            let new_actors =
940                edges.collect_actors_to_create(database_jobs.values().flat_map(move |job| {
941                    job.fragment_infos.values().map(move |fragment_infos| {
942                        (
943                            fragment_infos.fragment_id,
944                            &fragment_infos.nodes,
945                            fragment_infos.actors.iter().map(move |(actor_id, actor)| {
946                                (
947                                    stream_actors.get(actor_id).expect("should exist"),
948                                    actor.worker_id,
949                                )
950                            }),
951                            job.subscribers.keys().copied(),
952                        )
953                    })
954                }));
955
956            let nodes_actors =
957                InflightFragmentInfo::actor_ids_to_collect(database_jobs.values().flatten());
958            let database_job_source_splits =
959                collect_source_splits(database_jobs.values().flatten(), source_splits);
960            let database_backfill_orders =
961                UserDefinedFragmentBackfillOrder::merge(database_jobs.values().map(|job| {
962                    if matches!(job.status, CreateStreamingJobStatus::Creating { .. }) {
963                        job_backfill_orders(job_extra_info, job.job_id)
964                    } else {
965                        UserDefinedFragmentBackfillOrder::default()
966                    }
967                }));
968            let database_backfill_orders =
969                StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
970                    database_backfill_orders,
971                    fragment_relations,
972                    || {
973                        database_jobs.values().flat_map(|job_fragments| {
974                            job_fragments
975                                .fragment_infos
976                                .iter()
977                                .map(|(fragment_id, fragment)| {
978                                    (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
979                                })
980                        })
981                    },
982                );
983            let mutation = build_mutation(
984                &database_job_source_splits,
985                cdc_table_snapshot_split_assignment,
986                &database_backfill_orders,
987                is_paused,
988            );
989
990            let partial_graph_id = to_partial_graph_id(database_id, None);
991            self.recover_graph(
992                partial_graph_id,
993                mutation,
994                &barrier_info,
995                &nodes_actors,
996                InflightFragmentInfo::existing_table_ids(database_jobs.values().flatten()),
997                new_actors,
998                DatabaseCheckpointControlMetrics::new(database_id),
999            )?;
1000            debug!(
1001                %database_id,
1002                "inject initial barrier"
1003            );
1004        };
1005
1006        let mut independent_checkpoint_job_controls: HashMap<
1007            JobId,
1008            IndependentCheckpointJobControl,
1009        > = HashMap::new();
1010        for (job_id, (info, upstream_table_ids, committed_epoch, snapshot_epoch)) in
1011            ongoing_snapshot_backfill_jobs
1012        {
1013            let node_actors = edges.collect_actors_to_create(info.values().map(|fragment_infos| {
1014                (
1015                    fragment_infos.fragment_id,
1016                    &fragment_infos.nodes,
1017                    fragment_infos.actors.iter().map(move |(actor_id, actor)| {
1018                        (
1019                            stream_actors.get(actor_id).expect("should exist"),
1020                            actor.worker_id,
1021                        )
1022                    }),
1023                    vec![], // no subscribers for backfilling jobs,
1024                )
1025            }));
1026
1027            let database_job_source_splits =
1028                collect_source_splits(database_jobs.values().flatten(), source_splits);
1029            assert!(
1030                !cdc_table_snapshot_splits.contains_key(&job_id),
1031                "snapshot backfill job {job_id} should not have cdc backfill"
1032            );
1033            if is_paused {
1034                bail!("should not pause when having snapshot backfill job {job_id}");
1035            }
1036            let job_backfill_orders = job_backfill_orders(job_extra_info, job_id);
1037            let job_backfill_orders =
1038                StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
1039                    job_backfill_orders,
1040                    fragment_relations,
1041                    || {
1042                        info.iter().map(|(fragment_id, fragment)| {
1043                            (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
1044                        })
1045                    },
1046                );
1047            let mutation = build_mutation(
1048                &database_job_source_splits,
1049                Default::default(), // no cdc backfill job for
1050                &job_backfill_orders,
1051                false,
1052            );
1053
1054            let job = CreatingStreamingJobControl::recover(
1055                database_id,
1056                job_id,
1057                upstream_table_ids,
1058                &database_job_log_epochs,
1059                snapshot_epoch,
1060                committed_epoch,
1061                &barrier_info,
1062                info,
1063                job_backfill_orders,
1064                fragment_relations,
1065                hummock_version_stats,
1066                node_actors,
1067                mutation.clone(),
1068                self,
1069            )?;
1070            independent_checkpoint_job_controls.insert(
1071                job_id,
1072                IndependentCheckpointJobControl::CreatingStreamingJob(job),
1073            );
1074        }
1075
1076        self.control_stream_manager()
1077            .env
1078            .shared_actor_infos()
1079            .recover_database(
1080                database_id,
1081                database_jobs
1082                    .values()
1083                    .flat_map(|info| {
1084                        info.fragment_infos()
1085                            .map(move |fragment| (fragment, info.job_id))
1086                    })
1087                    .chain(
1088                        independent_checkpoint_job_controls
1089                            .iter()
1090                            .flat_map(|(job_id, job)| {
1091                                let job_id = *job_id;
1092                                job.fragment_infos()
1093                                    .into_iter()
1094                                    .flat_map(move |infos| infos.values().map(move |f| (f, job_id)))
1095                            }),
1096                    ),
1097            );
1098
1099        let committed_epoch = barrier_info.prev_epoch();
1100        let new_epoch = barrier_info.curr_epoch;
1101        let database_info = InflightDatabaseInfo::recover(
1102            database_id,
1103            database_jobs.into_values(),
1104            self.control_stream_manager()
1105                .env
1106                .shared_actor_infos()
1107                .clone(),
1108        );
1109        let database_state = BarrierWorkerState::recovery(new_epoch, is_paused);
1110        Ok(DatabaseCheckpointControl::recovery(
1111            database_id,
1112            database_state,
1113            committed_epoch,
1114            database_info,
1115            independent_checkpoint_job_controls,
1116        ))
1117    }
1118}
1119
1120impl ControlStreamManager {
1121    fn connected_workers(&self) -> impl Iterator<Item = (WorkerId, &ControlStreamNode)> + '_ {
1122        self.workers
1123            .iter()
1124            .filter_map(|(worker_id, (_, worker_state))| match worker_state {
1125                WorkerNodeState::Connected { control_stream, .. } => {
1126                    Some((*worker_id, control_stream))
1127                }
1128                WorkerNodeState::Reconnecting(_) => None,
1129            })
1130    }
1131
1132    pub(super) fn inject_barrier(
1133        &mut self,
1134        partial_graph_id: PartialGraphId,
1135        mutation: Option<Mutation>,
1136        barrier_info: &BarrierInfo,
1137        node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
1138        table_ids_to_sync: impl Iterator<Item = TableId>,
1139        nodes_to_sync_table: impl Iterator<Item = WorkerId>,
1140        mut new_actors: Option<StreamJobActorsToCreate>,
1141    ) -> MetaResult<NodeToCollect> {
1142        fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
1143            "inject_barrier_err"
1144        ));
1145
1146        let nodes_to_sync_table: HashSet<_> = nodes_to_sync_table.collect();
1147
1148        nodes_to_sync_table.iter().for_each(|worker_id| {
1149            assert!(node_actors.contains_key(worker_id), "worker_id {worker_id} in nodes_to_sync_table {nodes_to_sync_table:?} but not in node_actors {node_actors:?}");
1150        });
1151
1152        let mut node_need_collect = NodeToCollect::new();
1153        let table_ids_to_sync = table_ids_to_sync.collect_vec();
1154
1155        node_actors.iter()
1156            .try_for_each(|(worker_id, actor_ids_to_collect)| {
1157                assert!(!actor_ids_to_collect.is_empty(), "empty actor_ids_to_collect on worker {worker_id} in node_actors {node_actors:?}");
1158                let table_ids_to_sync = if nodes_to_sync_table.contains(worker_id) {
1159                    table_ids_to_sync.clone()
1160                } else {
1161                    vec![]
1162                };
1163
1164                let node = if let Some((_, worker_state)) = self.workers.get(worker_id)
1165                    &&
1166                    let WorkerNodeState::Connected { control_stream, .. } = worker_state
1167                {
1168                    control_stream
1169                } else {
1170                    return Err(anyhow!("unconnected worker node {}", worker_id).into());
1171                };
1172
1173                {
1174                    let mutation = mutation.clone();
1175                    let barrier = Barrier {
1176                        epoch: Some(risingwave_pb::data::Epoch {
1177                            curr: barrier_info.curr_epoch(),
1178                            prev: barrier_info.prev_epoch(),
1179                        }),
1180                        mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
1181                        tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
1182                            .to_protobuf(),
1183                        kind: barrier_info.kind.to_protobuf() as i32,
1184                    };
1185
1186                    node.handle
1187                        .request_sender
1188                        .send(StreamingControlStreamRequest {
1189                            request: Some(
1190                                streaming_control_stream_request::Request::InjectBarrier(
1191                                    InjectBarrierRequest {
1192                                        request_id: Uuid::new_v4().to_string(),
1193                                        barrier: Some(barrier),
1194                                        actor_ids_to_collect: actor_ids_to_collect.iter().copied().collect(),
1195                                        table_ids_to_sync,
1196                                        partial_graph_id,
1197                                        actors_to_build: new_actors
1198                                            .as_mut()
1199                                            .map(|new_actors| new_actors.remove(worker_id))
1200                                            .into_iter()
1201                                            .flatten()
1202                                            .flatten()
1203                                            .map(|(fragment_id, (node, actors, initial_subscriber_ids))| {
1204                                                FragmentBuildActorInfo {
1205                                                    fragment_id,
1206                                                    node: Some(node),
1207                                                    actors: actors
1208                                                        .into_iter()
1209                                                        .map(|(actor, upstreams, dispatchers)| {
1210                                                            BuildActorInfo {
1211                                                                actor_id: actor.actor_id,
1212                                                                fragment_upstreams: upstreams
1213                                                                    .into_iter()
1214                                                                    .map(|(fragment_id, upstreams)| {
1215                                                                        (
1216                                                                            fragment_id,
1217                                                                            UpstreamActors {
1218                                                                                actors: upstreams
1219                                                                                    .into_values()
1220                                                                                    .collect(),
1221                                                                            },
1222                                                                        )
1223                                                                    })
1224                                                                    .collect(),
1225                                                                dispatchers,
1226                                                                vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
1227                                                                mview_definition: actor.mview_definition,
1228                                                                expr_context: actor.expr_context,
1229                                                                config_override: actor.config_override.to_string(),
1230                                                                initial_subscriber_ids: initial_subscriber_ids.iter().copied().collect(),
1231                                                            }
1232                                                        })
1233                                                        .collect(),
1234                                                }
1235                                            })
1236                                            .collect(),
1237                                    },
1238                                ),
1239                            ),
1240                        })
1241                        .map_err(|_| {
1242                            MetaError::from(anyhow!(
1243                                "failed to send request to {} {:?}",
1244                                node.worker_id,
1245                                node.host
1246                            ))
1247                        })?;
1248
1249                    node_need_collect.insert(*worker_id);
1250                    Result::<_, MetaError>::Ok(())
1251                }
1252            })
1253            .inspect_err(|e| {
1254                // Record failure in event log.
1255                use risingwave_pb::meta::event_log;
1256                let event = event_log::EventInjectBarrierFail {
1257                    prev_epoch: barrier_info.prev_epoch(),
1258                    cur_epoch: barrier_info.curr_epoch(),
1259                    error: e.to_report_string(),
1260                };
1261                self.env
1262                    .event_log_manager_ref()
1263                    .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
1264            })?;
1265        Ok(node_need_collect)
1266    }
1267
1268    pub(super) fn add_partial_graph(&mut self, partial_graph_id: PartialGraphId) {
1269        self.connected_workers().for_each(|(_, node)| {
1270            if node
1271                .handle
1272                .request_sender
1273                .send(StreamingControlStreamRequest {
1274                    request: Some(
1275                        streaming_control_stream_request::Request::CreatePartialGraph(
1276                            CreatePartialGraphRequest {
1277                                partial_graph_id,
1278                            },
1279                        ),
1280                    ),
1281                }).is_err() {
1282                let (database_id, creating_job_id) = from_partial_graph_id(partial_graph_id);
1283                warn!(%database_id, ?creating_job_id, worker_id = %node.worker_id, "fail to add partial graph to worker")
1284            }
1285        });
1286    }
1287
1288    pub(super) fn remove_partial_graphs(&mut self, partial_graph_ids: Vec<PartialGraphId>) {
1289        self.connected_workers().for_each(|(_, node)| {
1290            if node.handle
1291                .request_sender
1292                .send(StreamingControlStreamRequest {
1293                    request: Some(
1294                        streaming_control_stream_request::Request::RemovePartialGraph(
1295                            RemovePartialGraphRequest {
1296                                partial_graph_ids: partial_graph_ids.clone(),
1297                            },
1298                        ),
1299                    ),
1300                })
1301                .is_err()
1302            {
1303                warn!(worker_id = %node.worker_id,node = ?node.host,"failed to send remove partial graph request");
1304            }
1305        })
1306    }
1307
1308    pub(super) fn reset_partial_graphs(
1309        &mut self,
1310        partial_graph_ids: Vec<PartialGraphId>,
1311    ) -> HashSet<WorkerId> {
1312        self.connected_workers()
1313            .filter_map(|(worker_id, node)| {
1314                if node
1315                    .handle
1316                    .request_sender
1317                    .send(StreamingControlStreamRequest {
1318                        request: Some(
1319                            streaming_control_stream_request::Request::ResetPartialGraphs(
1320                                ResetPartialGraphsRequest {
1321                                    partial_graph_ids: partial_graph_ids.clone(),
1322                                },
1323                            ),
1324                        ),
1325                    })
1326                    .is_err()
1327                {
1328                    warn!(%worker_id, node = ?node.host,"failed to send reset database request");
1329                    None
1330                } else {
1331                    Some(worker_id)
1332                }
1333            })
1334            .collect()
1335    }
1336}
1337
1338impl GlobalBarrierWorkerContextImpl {
1339    pub(super) async fn new_control_stream_impl(
1340        &self,
1341        node: &WorkerNode,
1342        init_request: &PbInitRequest,
1343    ) -> MetaResult<StreamingControlHandle> {
1344        let handle = self
1345            .env
1346            .stream_client_pool()
1347            .get(node)
1348            .await?
1349            .start_streaming_control(init_request.clone())
1350            .await?;
1351        Ok(handle)
1352    }
1353}
1354
1355pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1356    message: &str,
1357    errors: impl IntoIterator<Item = (WorkerId, E)>,
1358) -> MetaError {
1359    use std::fmt::Write;
1360
1361    use risingwave_common::error::error_request_copy;
1362    use risingwave_common::error::tonic::extra::Score;
1363
1364    let errors = errors.into_iter().collect_vec();
1365
1366    if errors.is_empty() {
1367        return anyhow!(message.to_owned()).into();
1368    }
1369
1370    // Create the error from the single error.
1371    let single_error = |(worker_id, e)| {
1372        anyhow::Error::from(e)
1373            .context(format!("{message}, in worker node {worker_id}"))
1374            .into()
1375    };
1376
1377    if errors.len() == 1 {
1378        return single_error(errors.into_iter().next().unwrap());
1379    }
1380
1381    // Find the error with the highest score.
1382    let max_score = errors
1383        .iter()
1384        .filter_map(|(_, e)| error_request_copy::<Score>(e))
1385        .max();
1386
1387    if let Some(max_score) = max_score {
1388        let mut errors = errors;
1389        let max_scored = errors
1390            .extract_if(.., |(_, e)| {
1391                error_request_copy::<Score>(e) == Some(max_score)
1392            })
1393            .next()
1394            .unwrap();
1395
1396        return single_error(max_scored);
1397    }
1398
1399    // The errors do not have scores, so simply concatenate them.
1400    let concat: String = errors
1401        .into_iter()
1402        .fold(format!("{message}: "), |mut s, (w, e)| {
1403            write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1404            s
1405        });
1406    anyhow!(concat).into()
1407}
1408
1409#[cfg(test)]
1410mod test_partial_graph_id {
1411    use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
1412
1413    #[test]
1414    fn test_partial_graph_id_conversion() {
1415        let database_id = 233.into();
1416        let job_id = 233.into();
1417        assert_eq!(
1418            (database_id, None),
1419            from_partial_graph_id(to_partial_graph_id(database_id, None))
1420        );
1421        assert_eq!(
1422            (database_id, Some(job_id)),
1423            from_partial_graph_id(to_partial_graph_id(database_id, Some(job_id)))
1424        );
1425    }
1426}