risingwave_meta/barrier/
rpc.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::error::Error;
18use std::fmt::{Debug, Formatter};
19use std::future::poll_fn;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::future::{BoxFuture, join_all};
27use futures::{FutureExt, StreamExt};
28use itertools::Itertools;
29use risingwave_common::bail;
30use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
31use risingwave_common::id::JobId;
32use risingwave_common::util::epoch::Epoch;
33use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
34use risingwave_common::util::tracing::TracingContext;
35use risingwave_connector::source::SplitImpl;
36use risingwave_meta_model::WorkerId;
37use risingwave_pb::common::{HostAddress, WorkerNode};
38use risingwave_pb::hummock::HummockVersionStats;
39use risingwave_pb::id::PartialGraphId;
40use risingwave_pb::source::{PbCdcTableSnapshotSplits, PbCdcTableSnapshotSplitsWithGeneration};
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::stream_node::NodeBody;
43use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation};
44use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
45use risingwave_pb::stream_service::inject_barrier_request::{
46    BuildActorInfo, FragmentBuildActorInfo,
47};
48use risingwave_pb::stream_service::streaming_control_stream_request::{
49    CreatePartialGraphRequest, PbCreatePartialGraphRequest, PbInitRequest,
50    RemovePartialGraphRequest, ResetPartialGraphsRequest,
51};
52use risingwave_pb::stream_service::{
53    BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest,
54    streaming_control_stream_request, streaming_control_stream_response,
55};
56use risingwave_rpc_client::StreamingControlHandle;
57use thiserror_ext::AsReport;
58use tokio::time::{Instant, sleep};
59use tokio_retry::strategy::ExponentialBackoff;
60use tracing::{debug, error, info, warn};
61use uuid::Uuid;
62
63use super::{BarrierKind, TracedEpoch};
64use crate::barrier::BackfillOrderState;
65use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
66use crate::barrier::cdc_progress::CdcTableBackfillTracker;
67use crate::barrier::checkpoint::{
68    BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
69};
70use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
71use crate::barrier::edge_builder::FragmentEdgeBuilder;
72use crate::barrier::info::{
73    BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
74    SubscriberType,
75};
76use crate::barrier::progress::CreateMviewProgressTracker;
77use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
78use crate::controller::fragment::InflightFragmentInfo;
79use crate::controller::utils::StreamingJobExtraInfo;
80use crate::manager::MetaSrvEnv;
81use crate::model::{
82    ActorId, FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
83    SubscriptionId,
84};
85use crate::stream::cdc::{
86    CdcTableSnapshotSplits, is_parallelized_backfill_enabled_cdc_scan_fragment,
87};
88use crate::stream::{
89    ExtendedFragmentBackfillOrder, StreamFragmentGraph, UserDefinedFragmentBackfillOrder,
90    build_actor_connector_splits,
91};
92use crate::{MetaError, MetaResult};
93
94pub(super) fn to_partial_graph_id(
95    database_id: DatabaseId,
96    creating_job_id: Option<JobId>,
97) -> PartialGraphId {
98    let raw_job_id = creating_job_id
99        .map(|job_id| {
100            assert_ne!(job_id, u32::MAX);
101            job_id.as_raw_id()
102        })
103        .unwrap_or(u32::MAX);
104    (((database_id.as_raw_id() as u64) << 32) | (raw_job_id as u64)).into()
105}
106
107pub(super) fn from_partial_graph_id(
108    partial_graph_id: PartialGraphId,
109) -> (DatabaseId, Option<JobId>) {
110    let id = partial_graph_id.as_raw_id();
111    let database_id = (id >> 32) as u32;
112    let raw_creating_job_id = (id & ((1 << 32) - 1)) as u32;
113    let creating_job_id = if raw_creating_job_id == u32::MAX {
114        None
115    } else {
116        Some(JobId::new(raw_creating_job_id))
117    };
118    (database_id.into(), creating_job_id)
119}
120
121pub(super) fn build_locality_fragment_state_table_mapping(
122    fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
123) -> HashMap<FragmentId, Vec<TableId>> {
124    let mut mapping = HashMap::new();
125
126    for (fragment_id, fragment_info) in fragment_infos {
127        let mut state_table_ids = Vec::new();
128        visit_stream_node_cont(&fragment_info.nodes, |stream_node| {
129            if let Some(NodeBody::LocalityProvider(locality_provider)) =
130                stream_node.node_body.as_ref()
131            {
132                let state_table_id = locality_provider
133                    .state_table
134                    .as_ref()
135                    .expect("must have state table")
136                    .id;
137                state_table_ids.push(state_table_id);
138                false
139            } else {
140                true
141            }
142        });
143        if !state_table_ids.is_empty() {
144            mapping.insert(*fragment_id, state_table_ids);
145        }
146    }
147
148    mapping
149}
150
151struct ControlStreamNode {
152    worker_id: WorkerId,
153    host: HostAddress,
154    handle: StreamingControlHandle,
155}
156
157enum WorkerNodeState {
158    Connected {
159        control_stream: ControlStreamNode,
160        removed: bool,
161    },
162    Reconnecting(BoxFuture<'static, StreamingControlHandle>),
163}
164
165pub(super) struct ControlStreamManager {
166    workers: HashMap<WorkerId, (WorkerNode, WorkerNodeState)>,
167    pub env: MetaSrvEnv,
168}
169
170impl ControlStreamManager {
171    pub(super) fn new(env: MetaSrvEnv) -> Self {
172        Self {
173            workers: Default::default(),
174            env,
175        }
176    }
177
178    pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
179        self.workers[&worker_id].0.host.clone().unwrap()
180    }
181
182    pub(super) async fn add_worker(
183        &mut self,
184        node: WorkerNode,
185        inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
186        term_id: String,
187        context: &impl GlobalBarrierWorkerContext,
188    ) {
189        let node_id = node.id;
190        if let Entry::Occupied(entry) = self.workers.entry(node_id) {
191            let (existing_node, worker_state) = entry.get();
192            assert_eq!(existing_node.host, node.host);
193            warn!(id = %node.id, host = ?node.host, "node already exists");
194            match worker_state {
195                WorkerNodeState::Connected { .. } => {
196                    warn!(id = %node.id, host = ?node.host, "new node already connected");
197                    return;
198                }
199                WorkerNodeState::Reconnecting(_) => {
200                    warn!(id = %node.id, host = ?node.host, "remove previous pending worker connect request and reconnect");
201                    entry.remove();
202                }
203            }
204        }
205        let node_host = node.host.clone().unwrap();
206        let mut backoff = ExponentialBackoff::from_millis(100)
207            .max_delay(Duration::from_secs(3))
208            .factor(5);
209        const MAX_RETRY: usize = 5;
210        for i in 1..=MAX_RETRY {
211            match context
212                .new_control_stream(
213                    &node,
214                    &PbInitRequest {
215                        term_id: term_id.clone(),
216                    },
217                )
218                .await
219            {
220                Ok(mut handle) => {
221                    WorkerNodeConnected {
222                        handle: &mut handle,
223                        node: &node,
224                    }
225                    .initialize(inflight_infos);
226                    info!(?node_host, "add control stream worker");
227                    assert!(
228                        self.workers
229                            .insert(
230                                node_id,
231                                (
232                                    node,
233                                    WorkerNodeState::Connected {
234                                        control_stream: ControlStreamNode {
235                                            worker_id: node_id as _,
236                                            host: node_host,
237                                            handle,
238                                        },
239                                        removed: false
240                                    }
241                                )
242                            )
243                            .is_none()
244                    );
245                    return;
246                }
247                Err(e) => {
248                    // It may happen that the dns information of newly registered worker node
249                    // has not been propagated to the meta node and cause error. Wait for a while and retry
250                    let delay = backoff.next().unwrap();
251                    error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
252                    sleep(delay).await;
253                }
254            }
255        }
256        error!(?node_host, "fail to create worker node after retry");
257    }
258
259    pub(super) fn remove_worker(&mut self, node: WorkerNode) {
260        if let Entry::Occupied(mut entry) = self.workers.entry(node.id) {
261            let (_, worker_state) = entry.get_mut();
262            match worker_state {
263                WorkerNodeState::Connected { removed, .. } => {
264                    info!(worker_id = %node.id, "mark connected worker as removed");
265                    *removed = true;
266                }
267                WorkerNodeState::Reconnecting(_) => {
268                    info!(worker_id = %node.id, "remove worker");
269                    entry.remove();
270                }
271            }
272        }
273    }
274
275    fn retry_connect(
276        node: WorkerNode,
277        term_id: String,
278        context: Arc<impl GlobalBarrierWorkerContext>,
279    ) -> BoxFuture<'static, StreamingControlHandle> {
280        async move {
281            let mut attempt = 0;
282            let backoff = ExponentialBackoff::from_millis(100)
283                .max_delay(Duration::from_mins(1))
284                .factor(5);
285            let init_request = PbInitRequest { term_id };
286            for delay in backoff {
287                attempt += 1;
288                sleep(delay).await;
289                match context.new_control_stream(&node, &init_request).await {
290                    Ok(handle) => {
291                        return handle;
292                    }
293                    Err(e) => {
294                        warn!(e = %e.as_report(), ?node, attempt, "fail to create control stream worker");
295                    }
296                }
297            }
298            unreachable!("end of retry backoff")
299        }.boxed()
300    }
301
302    pub(super) async fn recover(
303        env: MetaSrvEnv,
304        nodes: &HashMap<WorkerId, WorkerNode>,
305        term_id: &str,
306        context: Arc<impl GlobalBarrierWorkerContext>,
307    ) -> Self {
308        let reset_start_time = Instant::now();
309        let init_request = PbInitRequest {
310            term_id: term_id.to_owned(),
311        };
312        let init_request = &init_request;
313        let nodes = join_all(nodes.iter().map(|(worker_id, node)| async {
314            let result = context.new_control_stream(node, init_request).await;
315            (*worker_id, node.clone(), result)
316        }))
317        .await;
318        let mut unconnected_workers = HashSet::new();
319        let mut workers = HashMap::new();
320        for (worker_id, node, result) in nodes {
321            match result {
322                Ok(handle) => {
323                    let control_stream = ControlStreamNode {
324                        worker_id: node.id,
325                        host: node.host.clone().unwrap(),
326                        handle,
327                    };
328                    assert!(
329                        workers
330                            .insert(
331                                worker_id,
332                                (
333                                    node,
334                                    WorkerNodeState::Connected {
335                                        control_stream,
336                                        removed: false
337                                    }
338                                )
339                            )
340                            .is_none()
341                    );
342                }
343                Err(e) => {
344                    unconnected_workers.insert(worker_id);
345                    warn!(
346                        e = %e.as_report(),
347                        %worker_id,
348                        ?node,
349                        "failed to connect to node"
350                    );
351                    assert!(
352                        workers
353                            .insert(
354                                worker_id,
355                                (
356                                    node.clone(),
357                                    WorkerNodeState::Reconnecting(Self::retry_connect(
358                                        node,
359                                        term_id.to_owned(),
360                                        context.clone()
361                                    ))
362                                )
363                            )
364                            .is_none()
365                    );
366                }
367            }
368        }
369
370        info!(elapsed=?reset_start_time.elapsed(), ?unconnected_workers, "control stream reset");
371
372        Self { workers, env }
373    }
374
375    /// Clear all nodes and response streams in the manager.
376    pub(super) fn clear(&mut self) {
377        *self = Self::new(self.env.clone());
378    }
379}
380
381pub(super) struct WorkerNodeConnected<'a> {
382    node: &'a WorkerNode,
383    handle: &'a mut StreamingControlHandle,
384}
385
386impl<'a> WorkerNodeConnected<'a> {
387    pub(super) fn initialize(
388        self,
389        inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
390    ) {
391        for request in ControlStreamManager::collect_init_partial_graph(inflight_infos) {
392            if let Err(e) = self.handle.send_request(StreamingControlStreamRequest {
393                request: Some(
394                    streaming_control_stream_request::Request::CreatePartialGraph(request),
395                ),
396            }) {
397                warn!(e = %e.as_report(), node = ?self.node, "failed to send initial partial graph request");
398            }
399        }
400    }
401}
402
403pub(super) enum WorkerNodeEvent<'a> {
404    Response(MetaResult<streaming_control_stream_response::Response>),
405    Connected(WorkerNodeConnected<'a>),
406}
407
408impl ControlStreamManager {
409    fn poll_next_event<'a>(
410        this_opt: &mut Option<&'a mut Self>,
411        cx: &mut Context<'_>,
412        term_id: &str,
413        context: &Arc<impl GlobalBarrierWorkerContext>,
414        poll_reconnect: bool,
415    ) -> Poll<(WorkerId, WorkerNodeEvent<'a>)> {
416        let this = this_opt.as_mut().expect("Future polled after completion");
417        if this.workers.is_empty() {
418            return Poll::Pending;
419        }
420        {
421            for (&worker_id, (node, worker_state)) in &mut this.workers {
422                let control_stream = match worker_state {
423                    WorkerNodeState::Connected { control_stream, .. } => control_stream,
424                    WorkerNodeState::Reconnecting(_) if !poll_reconnect => {
425                        continue;
426                    }
427                    WorkerNodeState::Reconnecting(join_handle) => {
428                        match join_handle.poll_unpin(cx) {
429                            Poll::Ready(handle) => {
430                                info!(id=%node.id, host=?node.host, "reconnected to worker");
431                                *worker_state = WorkerNodeState::Connected {
432                                    control_stream: ControlStreamNode {
433                                        worker_id: node.id,
434                                        host: node.host.clone().unwrap(),
435                                        handle,
436                                    },
437                                    removed: false,
438                                };
439                                let this = this_opt.take().expect("should exist");
440                                let (node, worker_state) =
441                                    this.workers.get_mut(&worker_id).expect("should exist");
442                                let WorkerNodeState::Connected { control_stream, .. } =
443                                    worker_state
444                                else {
445                                    unreachable!()
446                                };
447                                return Poll::Ready((
448                                    worker_id,
449                                    WorkerNodeEvent::Connected(WorkerNodeConnected {
450                                        node,
451                                        handle: &mut control_stream.handle,
452                                    }),
453                                ));
454                            }
455                            Poll::Pending => {
456                                continue;
457                            }
458                        }
459                    }
460                };
461                match control_stream.handle.response_stream.poll_next_unpin(cx) {
462                    Poll::Ready(result) => {
463                        {
464                            let result = result
465                                .ok_or_else(|| (false, anyhow!("end of stream").into()))
466                                .and_then(|result| {
467                                    result.map_err(|err| -> (bool, MetaError) { (false, err.into()) }).and_then(|resp| {
468                                        match resp
469                                            .response
470                                            .ok_or_else(|| (false, anyhow!("empty response").into()))?
471                                        {
472                                            streaming_control_stream_response::Response::Shutdown(_) => Err((true, anyhow!(
473                                                "worker node {worker_id} is shutting down"
474                                            )
475                                                .into())),
476                                            streaming_control_stream_response::Response::Init(_) => {
477                                                // This arm should be unreachable.
478                                                Err((false, anyhow!("get unexpected init response").into()))
479                                            }
480                                            resp => {
481                                                if let streaming_control_stream_response::Response::CompleteBarrier(barrier_resp) = &resp {
482                                                    assert_eq!(worker_id, barrier_resp.worker_id);
483                                                }
484                                                Ok(resp)
485                                            }
486                                        }
487                                    })
488                                });
489                            let result = match result {
490                                Ok(resp) => Ok(resp),
491                                Err((shutdown, err)) => {
492                                    warn!(worker_id = %node.id, host = ?node.host, err = %err.as_report(), "get error from response stream");
493                                    let WorkerNodeState::Connected { removed, .. } = worker_state
494                                    else {
495                                        unreachable!("checked connected")
496                                    };
497                                    if *removed || shutdown {
498                                        this.workers.remove(&worker_id);
499                                    } else {
500                                        *worker_state = WorkerNodeState::Reconnecting(
501                                            ControlStreamManager::retry_connect(
502                                                node.clone(),
503                                                term_id.to_owned(),
504                                                context.clone(),
505                                            ),
506                                        );
507                                    }
508                                    Err(err)
509                                }
510                            };
511                            return Poll::Ready((worker_id, WorkerNodeEvent::Response(result)));
512                        }
513                    }
514                    Poll::Pending => {
515                        continue;
516                    }
517                }
518            }
519        };
520
521        Poll::Pending
522    }
523
524    #[await_tree::instrument("control_stream_next_event")]
525    pub(super) async fn next_event<'a>(
526        &'a mut self,
527        term_id: &str,
528        context: &Arc<impl GlobalBarrierWorkerContext>,
529    ) -> (WorkerId, WorkerNodeEvent<'a>) {
530        let mut this = Some(self);
531        poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, true)).await
532    }
533
534    #[await_tree::instrument("control_stream_next_response")]
535    pub(super) async fn next_response(
536        &mut self,
537        term_id: &str,
538        context: &Arc<impl GlobalBarrierWorkerContext>,
539    ) -> (
540        WorkerId,
541        MetaResult<streaming_control_stream_response::Response>,
542    ) {
543        let mut this = Some(self);
544        let (worker_id, event) =
545            poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, false)).await;
546        match event {
547            WorkerNodeEvent::Response(result) => (worker_id, result),
548            WorkerNodeEvent::Connected(_) => {
549                unreachable!("set poll_reconnect=false")
550            }
551        }
552    }
553
554    fn collect_init_partial_graph(
555        initial_inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
556    ) -> impl Iterator<Item = PbCreatePartialGraphRequest> {
557        initial_inflight_infos.flat_map(|(database_id, creating_job_ids)| {
558            [PbCreatePartialGraphRequest {
559                partial_graph_id: to_partial_graph_id(database_id, None),
560            }]
561            .into_iter()
562            .chain(
563                creating_job_ids.map(move |job_id| PbCreatePartialGraphRequest {
564                    partial_graph_id: to_partial_graph_id(database_id, Some(job_id)),
565                }),
566            )
567        })
568    }
569}
570
571pub(super) struct DatabaseInitialBarrierCollector {
572    database_id: DatabaseId,
573    node_to_collect: NodeToCollect,
574    database_state: BarrierWorkerState,
575    database_info: InflightDatabaseInfo,
576    creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
577    committed_epoch: u64,
578}
579
580impl Debug for DatabaseInitialBarrierCollector {
581    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
582        f.debug_struct("DatabaseInitialBarrierCollector")
583            .field("database_id", &self.database_id)
584            .field("node_to_collect", &self.node_to_collect)
585            .finish()
586    }
587}
588
589impl DatabaseInitialBarrierCollector {
590    pub(super) fn is_collected(&self) -> bool {
591        self.node_to_collect.is_empty()
592            && self
593                .creating_streaming_job_controls
594                .values()
595                .all(|job| job.is_empty())
596    }
597
598    pub(super) fn database_state(
599        &self,
600    ) -> (
601        &BarrierWorkerState,
602        &HashMap<JobId, CreatingStreamingJobControl>,
603    ) {
604        (&self.database_state, &self.creating_streaming_job_controls)
605    }
606
607    pub(super) fn creating_job_ids(&self) -> impl Iterator<Item = JobId> {
608        self.creating_streaming_job_controls.keys().copied()
609    }
610
611    pub(super) fn collect_resp(&mut self, resp: BarrierCompleteResponse) {
612        let (database_id, creating_job_id) = from_partial_graph_id(resp.partial_graph_id);
613        assert_eq!(self.database_id, database_id);
614        if let Some(creating_job_id) = creating_job_id {
615            assert!(
616                !self
617                    .creating_streaming_job_controls
618                    .get_mut(&creating_job_id)
619                    .expect("should exist")
620                    .collect(resp),
621                "unlikely to finish backfill since just recovered"
622            );
623        } else {
624            assert_eq!(resp.epoch, self.committed_epoch);
625            assert!(self.node_to_collect.remove(&resp.worker_id));
626        }
627    }
628
629    pub(super) fn finish(self) -> DatabaseCheckpointControl {
630        assert!(self.is_collected());
631        DatabaseCheckpointControl::recovery(
632            self.database_id,
633            self.database_state,
634            self.committed_epoch,
635            self.database_info,
636            self.creating_streaming_job_controls,
637        )
638    }
639
640    pub(super) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
641        is_valid_after_worker_err(&self.node_to_collect, worker_id)
642            && self
643                .creating_streaming_job_controls
644                .values()
645                .all(|job| job.is_valid_after_worker_err(worker_id))
646    }
647}
648
649impl ControlStreamManager {
650    /// Extract information from the loaded runtime barrier worker snapshot info, and inject the initial barrier.
651    #[expect(clippy::too_many_arguments)]
652    pub(super) fn inject_database_initial_barrier(
653        &mut self,
654        database_id: DatabaseId,
655        jobs: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
656        job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
657        state_table_committed_epochs: &mut HashMap<TableId, u64>,
658        state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
659        fragment_relations: &FragmentDownstreamRelation,
660        stream_actors: &HashMap<ActorId, StreamActor>,
661        source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
662        background_jobs: &mut HashSet<JobId>,
663        mv_depended_subscriptions: &mut HashMap<TableId, HashMap<SubscriptionId, u64>>,
664        is_paused: bool,
665        hummock_version_stats: &HummockVersionStats,
666        cdc_table_snapshot_splits: &mut HashMap<JobId, CdcTableSnapshotSplits>,
667        injected_creating_jobs: &mut HashSet<JobId>,
668    ) -> MetaResult<DatabaseInitialBarrierCollector> {
669        self.add_partial_graph(database_id, None);
670        fn collect_source_splits(
671            fragment_infos: impl Iterator<Item = &InflightFragmentInfo>,
672            source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
673        ) -> HashMap<ActorId, Vec<SplitImpl>> {
674            fragment_infos
675                .flat_map(|info| info.actors.keys())
676                .filter_map(|actor_id| {
677                    let actor_id = *actor_id as ActorId;
678                    source_splits
679                        .remove(&actor_id)
680                        .map(|splits| (actor_id, splits))
681                })
682                .collect()
683        }
684        fn build_mutation(
685            splits: &HashMap<ActorId, Vec<SplitImpl>>,
686            cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits>,
687            backfill_orders: &ExtendedFragmentBackfillOrder,
688            is_paused: bool,
689        ) -> Mutation {
690            let backfill_nodes_to_pause = get_nodes_with_backfill_dependencies(backfill_orders)
691                .into_iter()
692                .collect();
693            Mutation::Add(AddMutation {
694                // Actors built during recovery is not treated as newly added actors.
695                actor_dispatchers: Default::default(),
696                added_actors: Default::default(),
697                actor_splits: build_actor_connector_splits(splits),
698                actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
699                    splits: cdc_table_snapshot_split_assignment,
700                }),
701                pause: is_paused,
702                subscriptions_to_add: Default::default(),
703                backfill_nodes_to_pause,
704                new_upstream_sinks: Default::default(),
705            })
706        }
707
708        fn resolve_jobs_committed_epoch<'a>(
709            state_table_committed_epochs: &mut HashMap<TableId, u64>,
710            fragments: impl Iterator<Item = &'a InflightFragmentInfo> + 'a,
711        ) -> u64 {
712            let mut epochs = InflightFragmentInfo::existing_table_ids(fragments).map(|table_id| {
713                (
714                    table_id,
715                    state_table_committed_epochs
716                        .remove(&table_id)
717                        .expect("should exist"),
718                )
719            });
720            let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
721            for (table_id, epoch) in epochs {
722                assert_eq!(
723                    prev_epoch, epoch,
724                    "{} has different committed epoch to {}",
725                    first_table_id, table_id
726                );
727            }
728            prev_epoch
729        }
730        fn job_backfill_orders(
731            job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
732            job_id: JobId,
733        ) -> UserDefinedFragmentBackfillOrder {
734            UserDefinedFragmentBackfillOrder::new(
735                job_extra_info
736                    .get(&job_id)
737                    .and_then(|info| info.backfill_orders.clone())
738                    .map_or_else(HashMap::new, |orders| orders.0),
739            )
740        }
741
742        let mut subscribers: HashMap<_, HashMap<_, _>> = jobs
743            .keys()
744            .filter_map(|job_id| {
745                mv_depended_subscriptions
746                    .remove(&job_id.as_mv_table_id())
747                    .map(|subscriptions| {
748                        (
749                            job_id.as_mv_table_id(),
750                            subscriptions
751                                .into_iter()
752                                .map(|(subscription_id, retention)| {
753                                    (
754                                        subscription_id.as_subscriber_id(),
755                                        SubscriberType::Subscription(retention),
756                                    )
757                                })
758                                .collect(),
759                        )
760                    })
761            })
762            .collect();
763
764        let mut database_jobs = HashMap::new();
765        let mut snapshot_backfill_jobs = HashMap::new();
766
767        for (job_id, job_fragments) in jobs {
768            if background_jobs.remove(&job_id) {
769                if job_fragments.values().any(|fragment| {
770                    fragment
771                        .fragment_type_mask
772                        .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
773                }) {
774                    debug!(%job_id, "recovered snapshot backfill job");
775                    snapshot_backfill_jobs.insert(job_id, job_fragments);
776                } else {
777                    database_jobs.insert(job_id, (job_fragments, true));
778                }
779            } else {
780                database_jobs.insert(job_id, (job_fragments, false));
781            }
782        }
783
784        let database_job_log_epochs: HashMap<_, _> = database_jobs
785            .keys()
786            .filter_map(|job_id| {
787                state_table_log_epochs
788                    .remove(&job_id.as_mv_table_id())
789                    .map(|epochs| (job_id.as_mv_table_id(), epochs))
790            })
791            .collect();
792
793        let prev_epoch = resolve_jobs_committed_epoch(
794            state_table_committed_epochs,
795            database_jobs.values().flat_map(|(job, _)| job.values()),
796        );
797        let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
798        // Use a different `curr_epoch` for each recovery attempt.
799        let curr_epoch = prev_epoch.next();
800        let barrier_info = BarrierInfo {
801            prev_epoch,
802            curr_epoch,
803            kind: BarrierKind::Initial,
804        };
805
806        let mut ongoing_snapshot_backfill_jobs: HashMap<JobId, _> = HashMap::new();
807        for (job_id, fragment_infos) in snapshot_backfill_jobs {
808            let committed_epoch =
809                resolve_jobs_committed_epoch(state_table_committed_epochs, fragment_infos.values());
810            if committed_epoch == barrier_info.prev_epoch() {
811                info!(
812                    "recovered creating snapshot backfill job {} catch up with upstream already",
813                    job_id
814                );
815                database_jobs
816                    .try_insert(job_id, (fragment_infos, true))
817                    .expect("non-duplicate");
818                continue;
819            }
820            let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
821                fragment_infos
822                    .values()
823                    .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
824            )?
825            .0
826            .ok_or_else(|| {
827                anyhow!(
828                    "recovered snapshot backfill job {} has no snapshot backfill info",
829                    job_id
830                )
831            })?;
832            let mut snapshot_epoch = None;
833            let upstream_table_ids: HashSet<_> = snapshot_backfill_info
834                .upstream_mv_table_id_to_backfill_epoch
835                .keys()
836                .cloned()
837                .collect();
838            for (upstream_table_id, epoch) in
839                snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
840            {
841                let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
842                let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
843                if *snapshot_epoch != epoch {
844                    return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
845                }
846            }
847            let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
848                anyhow!(
849                    "snapshot backfill job {} has not set snapshot epoch",
850                    job_id
851                )
852            })?;
853            for upstream_table_id in &upstream_table_ids {
854                subscribers
855                    .entry(*upstream_table_id)
856                    .or_default()
857                    .try_insert(job_id.as_subscriber_id(), SubscriberType::SnapshotBackfill)
858                    .expect("non-duplicate");
859            }
860            ongoing_snapshot_backfill_jobs
861                .try_insert(
862                    job_id,
863                    (
864                        fragment_infos,
865                        upstream_table_ids,
866                        committed_epoch,
867                        snapshot_epoch,
868                    ),
869                )
870                .expect("non-duplicated");
871        }
872
873        let mut cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits> =
874            HashMap::new();
875
876        let database_jobs: HashMap<JobId, InflightStreamingJobInfo> = {
877            database_jobs
878                .into_iter()
879                .map(|(job_id, (fragment_infos, is_background_creating))| {
880                    let status = if is_background_creating {
881                        let backfill_ordering = job_backfill_orders(job_extra_info, job_id);
882                        let backfill_ordering = StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
883                            backfill_ordering,
884                            fragment_relations,
885                            || fragment_infos.iter().map(|(fragment_id, fragment)| {
886                            (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
887                        }));
888                        let locality_fragment_state_table_mapping =
889                            build_locality_fragment_state_table_mapping(&fragment_infos);
890                        let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
891                            &backfill_ordering,
892                            &fragment_infos,
893                            locality_fragment_state_table_mapping,
894                        );
895                        CreateStreamingJobStatus::Creating {
896                            tracker: CreateMviewProgressTracker::recover(
897                                job_id,
898                                &fragment_infos,
899                                backfill_order_state,
900                                hummock_version_stats,
901                            ),
902                        }
903                    } else {
904                        CreateStreamingJobStatus::Created
905                    };
906                    let cdc_table_backfill_tracker =
907                        if let Some(splits) = cdc_table_snapshot_splits.remove(&job_id) {
908                            let cdc_fragment = fragment_infos
909                                .values()
910                                .find(|fragment| {
911                                    is_parallelized_backfill_enabled_cdc_scan_fragment(
912                                        fragment.fragment_type_mask,
913                                        &fragment.nodes,
914                                    )
915                                    .is_some()
916                                })
917                                .expect("should have parallel cdc fragment");
918                            let cdc_actors = cdc_fragment.actors.keys().copied().collect();
919                            let mut tracker =
920                                CdcTableBackfillTracker::restore(cdc_fragment.fragment_id, splits);
921                            cdc_table_snapshot_split_assignment
922                                .extend(tracker.reassign_splits(cdc_actors)?);
923                            Some(tracker)
924                        } else {
925                            None
926                        };
927                    Ok((
928                        job_id,
929                        InflightStreamingJobInfo {
930                            job_id,
931                            fragment_infos,
932                            subscribers: subscribers
933                                .remove(&job_id.as_mv_table_id())
934                                .unwrap_or_default(),
935                            status,
936                            cdc_table_backfill_tracker,
937                        },
938                    ))
939                })
940                .try_collect::<_, _, MetaError>()
941        }?;
942
943        let mut builder = FragmentEdgeBuilder::new(
944            database_jobs
945                .values()
946                .flat_map(|job| {
947                    job.fragment_infos()
948                        .map(|info| (info, to_partial_graph_id(database_id, None)))
949                })
950                .chain(ongoing_snapshot_backfill_jobs.iter().flat_map(
951                    |(job_id, (fragments, ..))| {
952                        let partial_graph_id = to_partial_graph_id(database_id, Some(*job_id));
953                        fragments
954                            .values()
955                            .map(move |fragment| (fragment, partial_graph_id))
956                    },
957                )),
958            self,
959        );
960        builder.add_relations(fragment_relations);
961        let mut edges = builder.build();
962
963        let node_to_collect = {
964            let new_actors =
965                edges.collect_actors_to_create(database_jobs.values().flat_map(move |job| {
966                    job.fragment_infos.values().map(move |fragment_infos| {
967                        (
968                            fragment_infos.fragment_id,
969                            &fragment_infos.nodes,
970                            fragment_infos.actors.iter().map(move |(actor_id, actor)| {
971                                (
972                                    stream_actors.get(actor_id).expect("should exist"),
973                                    actor.worker_id,
974                                )
975                            }),
976                            job.subscribers.keys().copied(),
977                        )
978                    })
979                }));
980
981            let nodes_actors =
982                InflightFragmentInfo::actor_ids_to_collect(database_jobs.values().flatten());
983            let database_job_source_splits =
984                collect_source_splits(database_jobs.values().flatten(), source_splits);
985            let database_backfill_orders =
986                UserDefinedFragmentBackfillOrder::merge(database_jobs.values().map(|job| {
987                    if matches!(job.status, CreateStreamingJobStatus::Creating { .. }) {
988                        job_backfill_orders(job_extra_info, job.job_id)
989                    } else {
990                        UserDefinedFragmentBackfillOrder::default()
991                    }
992                }));
993            let database_backfill_orders =
994                StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
995                    database_backfill_orders,
996                    fragment_relations,
997                    || {
998                        database_jobs.values().flat_map(|job_fragments| {
999                            job_fragments
1000                                .fragment_infos
1001                                .iter()
1002                                .map(|(fragment_id, fragment)| {
1003                                    (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
1004                                })
1005                        })
1006                    },
1007                );
1008            let mutation = build_mutation(
1009                &database_job_source_splits,
1010                cdc_table_snapshot_split_assignment,
1011                &database_backfill_orders,
1012                is_paused,
1013            );
1014
1015            let node_to_collect = self.inject_barrier(
1016                database_id,
1017                None,
1018                Some(mutation),
1019                &barrier_info,
1020                &nodes_actors,
1021                InflightFragmentInfo::existing_table_ids(database_jobs.values().flatten()),
1022                nodes_actors.keys().copied(),
1023                Some(new_actors),
1024            )?;
1025            debug!(
1026                ?node_to_collect,
1027                %database_id,
1028                "inject initial barrier"
1029            );
1030            node_to_collect
1031        };
1032
1033        let mut creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl> =
1034            HashMap::new();
1035        for (job_id, (info, upstream_table_ids, committed_epoch, snapshot_epoch)) in
1036            ongoing_snapshot_backfill_jobs
1037        {
1038            let node_actors = edges.collect_actors_to_create(info.values().map(|fragment_infos| {
1039                (
1040                    fragment_infos.fragment_id,
1041                    &fragment_infos.nodes,
1042                    fragment_infos.actors.iter().map(move |(actor_id, actor)| {
1043                        (
1044                            stream_actors.get(actor_id).expect("should exist"),
1045                            actor.worker_id,
1046                        )
1047                    }),
1048                    vec![], // no subscribers for backfilling jobs,
1049                )
1050            }));
1051
1052            let database_job_source_splits =
1053                collect_source_splits(database_jobs.values().flatten(), source_splits);
1054            assert!(
1055                !cdc_table_snapshot_splits.contains_key(&job_id),
1056                "snapshot backfill job {job_id} should not have cdc backfill"
1057            );
1058            if is_paused {
1059                bail!("should not pause when having snapshot backfill job {job_id}");
1060            }
1061            let job_backfill_orders = job_backfill_orders(job_extra_info, job_id);
1062            let job_backfill_orders =
1063                StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
1064                    job_backfill_orders,
1065                    fragment_relations,
1066                    || {
1067                        info.iter().map(|(fragment_id, fragment)| {
1068                            (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
1069                        })
1070                    },
1071                );
1072            let mutation = build_mutation(
1073                &database_job_source_splits,
1074                Default::default(), // no cdc backfill job for
1075                &job_backfill_orders,
1076                false,
1077            );
1078
1079            // add the job_id to `injected_creating_jobs` before it attempts to call `CreatingStreamingJobControl::recover`,
1080            // which may create a new partial graph in CN.
1081            injected_creating_jobs.insert(job_id);
1082            creating_streaming_job_controls.insert(
1083                job_id,
1084                CreatingStreamingJobControl::recover(
1085                    database_id,
1086                    job_id,
1087                    upstream_table_ids,
1088                    &database_job_log_epochs,
1089                    snapshot_epoch,
1090                    committed_epoch,
1091                    &barrier_info,
1092                    info,
1093                    job_backfill_orders,
1094                    fragment_relations,
1095                    hummock_version_stats,
1096                    node_actors,
1097                    mutation,
1098                    self,
1099                )?,
1100            );
1101        }
1102
1103        self.env.shared_actor_infos().recover_database(
1104            database_id,
1105            database_jobs
1106                .values()
1107                .flat_map(|info| {
1108                    info.fragment_infos()
1109                        .map(move |fragment| (fragment, info.job_id))
1110                })
1111                .chain(
1112                    creating_streaming_job_controls
1113                        .values()
1114                        .flat_map(|job| job.fragment_infos_with_job_id()),
1115                ),
1116        );
1117
1118        let committed_epoch = barrier_info.prev_epoch();
1119        let new_epoch = barrier_info.curr_epoch;
1120        let database_info = InflightDatabaseInfo::recover(
1121            database_id,
1122            database_jobs.into_values(),
1123            self.env.shared_actor_infos().clone(),
1124        );
1125        let database_state = BarrierWorkerState::recovery(new_epoch, is_paused);
1126        Ok(DatabaseInitialBarrierCollector {
1127            database_id,
1128            node_to_collect,
1129            database_state,
1130            database_info,
1131            creating_streaming_job_controls,
1132            committed_epoch,
1133        })
1134    }
1135
1136    fn connected_workers(&self) -> impl Iterator<Item = (WorkerId, &ControlStreamNode)> + '_ {
1137        self.workers
1138            .iter()
1139            .filter_map(|(worker_id, (_, worker_state))| match worker_state {
1140                WorkerNodeState::Connected { control_stream, .. } => {
1141                    Some((*worker_id, control_stream))
1142                }
1143                WorkerNodeState::Reconnecting(_) => None,
1144            })
1145    }
1146
1147    pub(super) fn inject_barrier(
1148        &mut self,
1149        database_id: DatabaseId,
1150        creating_job_id: Option<JobId>,
1151        mutation: Option<Mutation>,
1152        barrier_info: &BarrierInfo,
1153        node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
1154        table_ids_to_sync: impl Iterator<Item = TableId>,
1155        nodes_to_sync_table: impl Iterator<Item = WorkerId>,
1156        mut new_actors: Option<StreamJobActorsToCreate>,
1157    ) -> MetaResult<NodeToCollect> {
1158        fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
1159            "inject_barrier_err"
1160        ));
1161
1162        let nodes_to_sync_table: HashSet<_> = nodes_to_sync_table.collect();
1163
1164        let partial_graph_id = to_partial_graph_id(database_id, creating_job_id);
1165
1166        nodes_to_sync_table.iter().for_each(|worker_id| {
1167            assert!(node_actors.contains_key(worker_id), "worker_id {worker_id} in nodes_to_sync_table {nodes_to_sync_table:?} but not in node_actors {node_actors:?}");
1168        });
1169
1170        let mut node_need_collect = NodeToCollect::new();
1171        let table_ids_to_sync = table_ids_to_sync.collect_vec();
1172
1173        node_actors.iter()
1174            .try_for_each(|(worker_id, actor_ids_to_collect)| {
1175                assert!(!actor_ids_to_collect.is_empty(), "empty actor_ids_to_collect on worker {worker_id} in node_actors {node_actors:?}");
1176                let table_ids_to_sync = if nodes_to_sync_table.contains(worker_id) {
1177                    table_ids_to_sync.clone()
1178                } else {
1179                    vec![]
1180                };
1181
1182                let node = if let Some((_, worker_state)) = self.workers.get(worker_id)
1183                    &&
1184                    let WorkerNodeState::Connected { control_stream, .. } = worker_state
1185                {
1186                    control_stream
1187                } else {
1188                    return Err(anyhow!("unconnected worker node {}", worker_id).into());
1189                };
1190
1191                {
1192                    let mutation = mutation.clone();
1193                    let barrier = Barrier {
1194                        epoch: Some(risingwave_pb::data::Epoch {
1195                            curr: barrier_info.curr_epoch(),
1196                            prev: barrier_info.prev_epoch(),
1197                        }),
1198                        mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
1199                        tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
1200                            .to_protobuf(),
1201                        kind: barrier_info.kind.to_protobuf() as i32,
1202                    };
1203
1204                    node.handle
1205                        .request_sender
1206                        .send(StreamingControlStreamRequest {
1207                            request: Some(
1208                                streaming_control_stream_request::Request::InjectBarrier(
1209                                    InjectBarrierRequest {
1210                                        request_id: Uuid::new_v4().to_string(),
1211                                        barrier: Some(barrier),
1212                                        actor_ids_to_collect: actor_ids_to_collect.iter().copied().collect(),
1213                                        table_ids_to_sync,
1214                                        partial_graph_id,
1215                                        actors_to_build: new_actors
1216                                            .as_mut()
1217                                            .map(|new_actors| new_actors.remove(worker_id))
1218                                            .into_iter()
1219                                            .flatten()
1220                                            .flatten()
1221                                            .map(|(fragment_id, (node, actors, initial_subscriber_ids))| {
1222                                                FragmentBuildActorInfo {
1223                                                    fragment_id,
1224                                                    node: Some(node),
1225                                                    actors: actors
1226                                                        .into_iter()
1227                                                        .map(|(actor, upstreams, dispatchers)| {
1228                                                            BuildActorInfo {
1229                                                                actor_id: actor.actor_id,
1230                                                                fragment_upstreams: upstreams
1231                                                                    .into_iter()
1232                                                                    .map(|(fragment_id, upstreams)| {
1233                                                                        (
1234                                                                            fragment_id,
1235                                                                            UpstreamActors {
1236                                                                                actors: upstreams
1237                                                                                    .into_values()
1238                                                                                    .collect(),
1239                                                                            },
1240                                                                        )
1241                                                                    })
1242                                                                    .collect(),
1243                                                                dispatchers,
1244                                                                vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
1245                                                                mview_definition: actor.mview_definition,
1246                                                                expr_context: actor.expr_context,
1247                                                                config_override: actor.config_override.to_string(),
1248                                                                initial_subscriber_ids: initial_subscriber_ids.iter().copied().collect(),
1249                                                            }
1250                                                        })
1251                                                        .collect(),
1252                                                }
1253                                            })
1254                                            .collect(),
1255                                    },
1256                                ),
1257                            ),
1258                        })
1259                        .map_err(|_| {
1260                            MetaError::from(anyhow!(
1261                                "failed to send request to {} {:?}",
1262                                node.worker_id,
1263                                node.host
1264                            ))
1265                        })?;
1266
1267                    node_need_collect.insert(*worker_id);
1268                    Result::<_, MetaError>::Ok(())
1269                }
1270            })
1271            .inspect_err(|e| {
1272                // Record failure in event log.
1273                use risingwave_pb::meta::event_log;
1274                let event = event_log::EventInjectBarrierFail {
1275                    prev_epoch: barrier_info.prev_epoch(),
1276                    cur_epoch: barrier_info.curr_epoch(),
1277                    error: e.to_report_string(),
1278                };
1279                self.env
1280                    .event_log_manager_ref()
1281                    .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
1282            })?;
1283        Ok(node_need_collect)
1284    }
1285
1286    pub(super) fn add_partial_graph(
1287        &mut self,
1288        database_id: DatabaseId,
1289        creating_job_id: Option<JobId>,
1290    ) {
1291        let partial_graph_id = to_partial_graph_id(database_id, creating_job_id);
1292        self.connected_workers().for_each(|(_, node)| {
1293            if node
1294                .handle
1295                .request_sender
1296                .send(StreamingControlStreamRequest {
1297                    request: Some(
1298                        streaming_control_stream_request::Request::CreatePartialGraph(
1299                            CreatePartialGraphRequest {
1300                                partial_graph_id,
1301                            },
1302                        ),
1303                    ),
1304                }).is_err() {
1305                warn!(%database_id, ?creating_job_id, worker_id = %node.worker_id, "fail to add partial graph to worker")
1306            }
1307        });
1308    }
1309
1310    pub(super) fn remove_partial_graph(
1311        &mut self,
1312        database_id: DatabaseId,
1313        creating_job_ids: Vec<JobId>,
1314    ) {
1315        if creating_job_ids.is_empty() {
1316            return;
1317        }
1318        let partial_graph_ids = creating_job_ids
1319            .into_iter()
1320            .map(|job_id| to_partial_graph_id(database_id, Some(job_id)))
1321            .collect_vec();
1322        self.connected_workers().for_each(|(_, node)| {
1323            if node.handle
1324                .request_sender
1325                .send(StreamingControlStreamRequest {
1326                    request: Some(
1327                        streaming_control_stream_request::Request::RemovePartialGraph(
1328                            RemovePartialGraphRequest {
1329                                partial_graph_ids: partial_graph_ids.clone(),
1330                            },
1331                        ),
1332                    ),
1333                })
1334                .is_err()
1335            {
1336                warn!(worker_id = %node.worker_id,node = ?node.host,"failed to send remove partial graph request");
1337            }
1338        })
1339    }
1340
1341    pub(super) fn reset_partial_graphs(
1342        &mut self,
1343        partial_graph_ids: Vec<PartialGraphId>,
1344    ) -> HashSet<WorkerId> {
1345        self.connected_workers()
1346            .filter_map(|(worker_id, node)| {
1347                if node
1348                    .handle
1349                    .request_sender
1350                    .send(StreamingControlStreamRequest {
1351                        request: Some(
1352                            streaming_control_stream_request::Request::ResetPartialGraphs(
1353                                ResetPartialGraphsRequest {
1354                                    partial_graph_ids: partial_graph_ids.clone(),
1355                                },
1356                            ),
1357                        ),
1358                    })
1359                    .is_err()
1360                {
1361                    warn!(%worker_id, node = ?node.host,"failed to send reset database request");
1362                    None
1363                } else {
1364                    Some(worker_id)
1365                }
1366            })
1367            .collect()
1368    }
1369}
1370
1371impl GlobalBarrierWorkerContextImpl {
1372    pub(super) async fn new_control_stream_impl(
1373        &self,
1374        node: &WorkerNode,
1375        init_request: &PbInitRequest,
1376    ) -> MetaResult<StreamingControlHandle> {
1377        let handle = self
1378            .env
1379            .stream_client_pool()
1380            .get(node)
1381            .await?
1382            .start_streaming_control(init_request.clone())
1383            .await?;
1384        Ok(handle)
1385    }
1386}
1387
1388pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1389    message: &str,
1390    errors: impl IntoIterator<Item = (WorkerId, E)>,
1391) -> MetaError {
1392    use std::fmt::Write;
1393
1394    use risingwave_common::error::error_request_copy;
1395    use risingwave_common::error::tonic::extra::Score;
1396
1397    let errors = errors.into_iter().collect_vec();
1398
1399    if errors.is_empty() {
1400        return anyhow!(message.to_owned()).into();
1401    }
1402
1403    // Create the error from the single error.
1404    let single_error = |(worker_id, e)| {
1405        anyhow::Error::from(e)
1406            .context(format!("{message}, in worker node {worker_id}"))
1407            .into()
1408    };
1409
1410    if errors.len() == 1 {
1411        return single_error(errors.into_iter().next().unwrap());
1412    }
1413
1414    // Find the error with the highest score.
1415    let max_score = errors
1416        .iter()
1417        .filter_map(|(_, e)| error_request_copy::<Score>(e))
1418        .max();
1419
1420    if let Some(max_score) = max_score {
1421        let mut errors = errors;
1422        let max_scored = errors
1423            .extract_if(.., |(_, e)| {
1424                error_request_copy::<Score>(e) == Some(max_score)
1425            })
1426            .next()
1427            .unwrap();
1428
1429        return single_error(max_scored);
1430    }
1431
1432    // The errors do not have scores, so simply concatenate them.
1433    let concat: String = errors
1434        .into_iter()
1435        .fold(format!("{message}: "), |mut s, (w, e)| {
1436            write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1437            s
1438        });
1439    anyhow!(concat).into()
1440}
1441
1442#[cfg(test)]
1443mod test_partial_graph_id {
1444    use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
1445
1446    #[test]
1447    fn test_partial_graph_id_conversion() {
1448        let database_id = 233.into();
1449        let job_id = 233.into();
1450        assert_eq!(
1451            (database_id, None),
1452            from_partial_graph_id(to_partial_graph_id(database_id, None))
1453        );
1454        assert_eq!(
1455            (database_id, Some(job_id)),
1456            from_partial_graph_id(to_partial_graph_id(database_id, Some(job_id)))
1457        );
1458    }
1459}