risingwave_meta/barrier/
rpc.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::error::Error;
18use std::fmt::{Debug, Formatter};
19use std::future::poll_fn;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::future::{BoxFuture, join_all};
27use futures::{FutureExt, StreamExt};
28use itertools::Itertools;
29use risingwave_common::bail;
30use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
31use risingwave_common::id::JobId;
32use risingwave_common::util::epoch::Epoch;
33use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
34use risingwave_common::util::tracing::TracingContext;
35use risingwave_connector::source::SplitImpl;
36use risingwave_meta_model::WorkerId;
37use risingwave_pb::common::{HostAddress, WorkerNode};
38use risingwave_pb::hummock::HummockVersionStats;
39use risingwave_pb::id::PartialGraphId;
40use risingwave_pb::source::{PbCdcTableSnapshotSplits, PbCdcTableSnapshotSplitsWithGeneration};
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::stream_node::NodeBody;
43use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation};
44use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
45use risingwave_pb::stream_service::inject_barrier_request::{
46    BuildActorInfo, FragmentBuildActorInfo,
47};
48use risingwave_pb::stream_service::streaming_control_stream_request::{
49    CreatePartialGraphRequest, PbCreatePartialGraphRequest, PbInitRequest,
50    RemovePartialGraphRequest, ResetPartialGraphsRequest,
51};
52use risingwave_pb::stream_service::{
53    InjectBarrierRequest, StreamingControlStreamRequest, streaming_control_stream_request,
54    streaming_control_stream_response,
55};
56use risingwave_rpc_client::StreamingControlHandle;
57use thiserror_ext::AsReport;
58use tokio::time::{Instant, sleep};
59use tokio_retry::strategy::ExponentialBackoff;
60use tracing::{debug, error, info, warn};
61use uuid::Uuid;
62
63use super::{BarrierKind, TracedEpoch};
64use crate::barrier::BackfillOrderState;
65use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
66use crate::barrier::cdc_progress::CdcTableBackfillTracker;
67use crate::barrier::checkpoint::{
68    BarrierWorkerState, BatchRefreshJobCheckpointControl, BatchRefreshRenderResult,
69    CreatingStreamingJobControl, DatabaseCheckpointControl, DatabaseCheckpointControlMetrics,
70    IndependentCheckpointJobControl,
71};
72use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
73use crate::barrier::edge_builder::{EdgeBuilderFragmentInfo, FragmentEdgeBuilder};
74use crate::barrier::info::{
75    BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
76    SubscriberType,
77};
78use crate::barrier::partial_graph::PartialGraphRecoverer;
79use crate::barrier::progress::CreateMviewProgressTracker;
80use crate::barrier::utils::NodeToCollect;
81use crate::controller::fragment::InflightFragmentInfo;
82use crate::controller::utils::StreamingJobExtraInfo;
83use crate::manager::MetaSrvEnv;
84use crate::model::{
85    ActorId, FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
86    SubscriptionId,
87};
88use crate::stream::cdc::{
89    CdcTableSnapshotSplits, is_parallelized_backfill_enabled_cdc_scan_fragment,
90};
91use crate::stream::{
92    ExtendedFragmentBackfillOrder, StreamFragmentGraph, UserDefinedFragmentBackfillOrder,
93    build_actor_connector_splits,
94};
95use crate::{MetaError, MetaResult};
96
97pub(super) fn to_partial_graph_id(
98    database_id: DatabaseId,
99    creating_job_id: Option<JobId>,
100) -> PartialGraphId {
101    let raw_job_id = creating_job_id
102        .map(|job_id| {
103            assert_ne!(job_id, u32::MAX);
104            job_id.as_raw_id()
105        })
106        .unwrap_or(u32::MAX);
107    (((database_id.as_raw_id() as u64) << 32) | (raw_job_id as u64)).into()
108}
109
110pub(super) fn from_partial_graph_id(
111    partial_graph_id: PartialGraphId,
112) -> (DatabaseId, Option<JobId>) {
113    let id = partial_graph_id.as_raw_id();
114    let database_id = (id >> 32) as u32;
115    let raw_creating_job_id = (id & ((1 << 32) - 1)) as u32;
116    let creating_job_id = if raw_creating_job_id == u32::MAX {
117        None
118    } else {
119        Some(JobId::new(raw_creating_job_id))
120    };
121    (database_id.into(), creating_job_id)
122}
123
124pub(super) fn build_locality_fragment_state_table_mapping(
125    fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
126) -> HashMap<FragmentId, Vec<TableId>> {
127    let mut mapping = HashMap::new();
128
129    for (fragment_id, fragment_info) in fragment_infos {
130        let mut state_table_ids = Vec::new();
131        visit_stream_node_cont(&fragment_info.nodes, |stream_node| {
132            if let Some(NodeBody::LocalityProvider(locality_provider)) =
133                stream_node.node_body.as_ref()
134            {
135                let state_table_id = locality_provider
136                    .state_table
137                    .as_ref()
138                    .expect("must have state table")
139                    .id;
140                state_table_ids.push(state_table_id);
141                false
142            } else {
143                true
144            }
145        });
146        if !state_table_ids.is_empty() {
147            mapping.insert(*fragment_id, state_table_ids);
148        }
149    }
150
151    mapping
152}
153
154pub(super) fn database_partial_graphs<'a>(
155    database_id: DatabaseId,
156    creating_jobs: impl Iterator<Item = JobId> + Sized + 'a,
157) -> impl Iterator<Item = PartialGraphId> + 'a {
158    creating_jobs
159        .map(Some)
160        .chain([None])
161        .map(move |creating_job_id| to_partial_graph_id(database_id, creating_job_id))
162}
163
164struct ControlStreamNode {
165    worker_id: WorkerId,
166    host: HostAddress,
167    handle: StreamingControlHandle,
168}
169
170enum WorkerNodeState {
171    Connected {
172        control_stream: ControlStreamNode,
173        removed: bool,
174    },
175    Reconnecting(BoxFuture<'static, StreamingControlHandle>),
176}
177
178pub(super) struct ControlStreamManager {
179    workers: HashMap<WorkerId, (WorkerNode, WorkerNodeState)>,
180    pub env: MetaSrvEnv,
181}
182
183impl ControlStreamManager {
184    pub(super) fn new(env: MetaSrvEnv) -> Self {
185        Self {
186            workers: Default::default(),
187            env,
188        }
189    }
190
191    pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
192        self.workers[&worker_id].0.host.clone().unwrap()
193    }
194
195    pub(super) async fn add_worker(
196        &mut self,
197        node: WorkerNode,
198        partial_graphs: impl Iterator<Item = PartialGraphId>,
199        term_id: &String,
200        context: &impl GlobalBarrierWorkerContext,
201    ) {
202        let node_id = node.id;
203        if let Entry::Occupied(entry) = self.workers.entry(node_id) {
204            let (existing_node, worker_state) = entry.get();
205            assert_eq!(existing_node.host, node.host);
206            warn!(id = %node.id, host = ?node.host, "node already exists");
207            match worker_state {
208                WorkerNodeState::Connected { .. } => {
209                    warn!(id = %node.id, host = ?node.host, "new node already connected");
210                    return;
211                }
212                WorkerNodeState::Reconnecting(_) => {
213                    warn!(id = %node.id, host = ?node.host, "remove previous pending worker connect request and reconnect");
214                    entry.remove();
215                }
216            }
217        }
218        let node_host = node.host.clone().unwrap();
219        let mut backoff = ExponentialBackoff::from_millis(100)
220            .max_delay(Duration::from_secs(3))
221            .factor(5);
222        const MAX_RETRY: usize = 5;
223        for i in 1..=MAX_RETRY {
224            match context
225                .new_control_stream(
226                    &node,
227                    &PbInitRequest {
228                        term_id: term_id.clone(),
229                    },
230                )
231                .await
232            {
233                Ok(mut handle) => {
234                    WorkerNodeConnected {
235                        handle: &mut handle,
236                        node: &node,
237                    }
238                    .initialize(partial_graphs);
239                    info!(?node_host, "add control stream worker");
240                    assert!(
241                        self.workers
242                            .insert(
243                                node_id,
244                                (
245                                    node,
246                                    WorkerNodeState::Connected {
247                                        control_stream: ControlStreamNode {
248                                            worker_id: node_id as _,
249                                            host: node_host,
250                                            handle,
251                                        },
252                                        removed: false
253                                    }
254                                )
255                            )
256                            .is_none()
257                    );
258                    return;
259                }
260                Err(e) => {
261                    // It may happen that the dns information of newly registered worker node
262                    // has not been propagated to the meta node and cause error. Wait for a while and retry
263                    let delay = backoff.next().unwrap();
264                    error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
265                    sleep(delay).await;
266                }
267            }
268        }
269        error!(?node_host, "fail to create worker node after retry");
270    }
271
272    pub(super) fn remove_worker(&mut self, node: WorkerNode) {
273        if let Entry::Occupied(mut entry) = self.workers.entry(node.id) {
274            let (_, worker_state) = entry.get_mut();
275            match worker_state {
276                WorkerNodeState::Connected { removed, .. } => {
277                    info!(worker_id = %node.id, "mark connected worker as removed");
278                    *removed = true;
279                }
280                WorkerNodeState::Reconnecting(_) => {
281                    info!(worker_id = %node.id, "remove worker");
282                    entry.remove();
283                }
284            }
285        }
286    }
287
288    fn retry_connect(
289        node: WorkerNode,
290        term_id: String,
291        context: Arc<impl GlobalBarrierWorkerContext>,
292    ) -> BoxFuture<'static, StreamingControlHandle> {
293        async move {
294            let mut attempt = 0;
295            let backoff = ExponentialBackoff::from_millis(100)
296                .max_delay(Duration::from_mins(1))
297                .factor(5);
298            let init_request = PbInitRequest { term_id };
299            for delay in backoff {
300                attempt += 1;
301                sleep(delay).await;
302                match context.new_control_stream(&node, &init_request).await {
303                    Ok(handle) => {
304                        return handle;
305                    }
306                    Err(e) => {
307                        warn!(e = %e.as_report(), ?node, attempt, "fail to create control stream worker");
308                    }
309                }
310            }
311            unreachable!("end of retry backoff")
312        }.boxed()
313    }
314
315    pub(super) async fn recover(
316        env: MetaSrvEnv,
317        nodes: &HashMap<WorkerId, WorkerNode>,
318        term_id: &str,
319        context: Arc<impl GlobalBarrierWorkerContext>,
320    ) -> Self {
321        let reset_start_time = Instant::now();
322        let init_request = PbInitRequest {
323            term_id: term_id.to_owned(),
324        };
325        let init_request = &init_request;
326        let nodes = join_all(nodes.iter().map(|(worker_id, node)| async {
327            let result = context.new_control_stream(node, init_request).await;
328            (*worker_id, node.clone(), result)
329        }))
330        .await;
331        let mut unconnected_workers = HashSet::new();
332        let mut workers = HashMap::new();
333        for (worker_id, node, result) in nodes {
334            match result {
335                Ok(handle) => {
336                    let control_stream = ControlStreamNode {
337                        worker_id: node.id,
338                        host: node.host.clone().unwrap(),
339                        handle,
340                    };
341                    assert!(
342                        workers
343                            .insert(
344                                worker_id,
345                                (
346                                    node,
347                                    WorkerNodeState::Connected {
348                                        control_stream,
349                                        removed: false
350                                    }
351                                )
352                            )
353                            .is_none()
354                    );
355                }
356                Err(e) => {
357                    unconnected_workers.insert(worker_id);
358                    warn!(
359                        e = %e.as_report(),
360                        %worker_id,
361                        ?node,
362                        "failed to connect to node"
363                    );
364                    assert!(
365                        workers
366                            .insert(
367                                worker_id,
368                                (
369                                    node.clone(),
370                                    WorkerNodeState::Reconnecting(Self::retry_connect(
371                                        node,
372                                        term_id.to_owned(),
373                                        context.clone()
374                                    ))
375                                )
376                            )
377                            .is_none()
378                    );
379                }
380            }
381        }
382
383        info!(elapsed=?reset_start_time.elapsed(), ?unconnected_workers, "control stream reset");
384
385        Self { workers, env }
386    }
387
388    /// Clear all nodes and response streams in the manager.
389    pub(super) fn clear(&mut self) {
390        *self = Self::new(self.env.clone());
391    }
392}
393
394pub(super) struct WorkerNodeConnected<'a> {
395    node: &'a WorkerNode,
396    handle: &'a mut StreamingControlHandle,
397}
398
399impl<'a> WorkerNodeConnected<'a> {
400    pub(super) fn initialize(self, partial_graphs: impl Iterator<Item = PartialGraphId>) {
401        for partial_graph_id in partial_graphs {
402            if let Err(e) = self.handle.send_request(StreamingControlStreamRequest {
403                request: Some(
404                    streaming_control_stream_request::Request::CreatePartialGraph(
405                        PbCreatePartialGraphRequest { partial_graph_id },
406                    ),
407                ),
408            }) {
409                warn!(e = %e.as_report(), node = ?self.node, "failed to send initial partial graph request");
410            }
411        }
412    }
413}
414
415pub(super) enum WorkerNodeEvent<'a> {
416    Response(MetaResult<streaming_control_stream_response::Response>),
417    Connected(WorkerNodeConnected<'a>),
418}
419
420impl ControlStreamManager {
421    fn poll_next_event<'a>(
422        this_opt: &mut Option<&'a mut Self>,
423        cx: &mut Context<'_>,
424        term_id: &str,
425        context: &Arc<impl GlobalBarrierWorkerContext>,
426        poll_reconnect: bool,
427    ) -> Poll<(WorkerId, WorkerNodeEvent<'a>)> {
428        let this = this_opt.as_mut().expect("Future polled after completion");
429        if this.workers.is_empty() {
430            return Poll::Pending;
431        }
432        {
433            for (&worker_id, (node, worker_state)) in &mut this.workers {
434                let control_stream = match worker_state {
435                    WorkerNodeState::Connected { control_stream, .. } => control_stream,
436                    WorkerNodeState::Reconnecting(_) if !poll_reconnect => {
437                        continue;
438                    }
439                    WorkerNodeState::Reconnecting(join_handle) => {
440                        match join_handle.poll_unpin(cx) {
441                            Poll::Ready(handle) => {
442                                info!(id=%node.id, host=?node.host, "reconnected to worker");
443                                *worker_state = WorkerNodeState::Connected {
444                                    control_stream: ControlStreamNode {
445                                        worker_id: node.id,
446                                        host: node.host.clone().unwrap(),
447                                        handle,
448                                    },
449                                    removed: false,
450                                };
451                                let this = this_opt.take().expect("should exist");
452                                let (node, worker_state) =
453                                    this.workers.get_mut(&worker_id).expect("should exist");
454                                let WorkerNodeState::Connected { control_stream, .. } =
455                                    worker_state
456                                else {
457                                    unreachable!()
458                                };
459                                return Poll::Ready((
460                                    worker_id,
461                                    WorkerNodeEvent::Connected(WorkerNodeConnected {
462                                        node,
463                                        handle: &mut control_stream.handle,
464                                    }),
465                                ));
466                            }
467                            Poll::Pending => {
468                                continue;
469                            }
470                        }
471                    }
472                };
473                match control_stream.handle.response_stream.poll_next_unpin(cx) {
474                    Poll::Ready(result) => {
475                        {
476                            let result = result
477                                .ok_or_else(|| (false, anyhow!("end of stream").into()))
478                                .and_then(|result| {
479                                    result.map_err(|err| -> (bool, MetaError) { (false, err.into()) }).and_then(|resp| {
480                                        match resp
481                                            .response
482                                            .ok_or_else(|| (false, anyhow!("empty response").into()))?
483                                        {
484                                            streaming_control_stream_response::Response::Shutdown(_) => Err((true, anyhow!(
485                                                "worker node {worker_id} is shutting down"
486                                            )
487                                                .into())),
488                                            streaming_control_stream_response::Response::Init(_) => {
489                                                // This arm should be unreachable.
490                                                Err((false, anyhow!("get unexpected init response").into()))
491                                            }
492                                            resp => {
493                                                if let streaming_control_stream_response::Response::CompleteBarrier(barrier_resp) = &resp {
494                                                    assert_eq!(worker_id, barrier_resp.worker_id);
495                                                }
496                                                Ok(resp)
497                                            }
498                                        }
499                                    })
500                                });
501                            let result = match result {
502                                Ok(resp) => Ok(resp),
503                                Err((shutdown, err)) => {
504                                    warn!(worker_id = %node.id, host = ?node.host, err = %err.as_report(), "get error from response stream");
505                                    let WorkerNodeState::Connected { removed, .. } = worker_state
506                                    else {
507                                        unreachable!("checked connected")
508                                    };
509                                    if *removed || shutdown {
510                                        this.workers.remove(&worker_id);
511                                    } else {
512                                        *worker_state = WorkerNodeState::Reconnecting(
513                                            ControlStreamManager::retry_connect(
514                                                node.clone(),
515                                                term_id.to_owned(),
516                                                context.clone(),
517                                            ),
518                                        );
519                                    }
520                                    Err(err)
521                                }
522                            };
523                            return Poll::Ready((worker_id, WorkerNodeEvent::Response(result)));
524                        }
525                    }
526                    Poll::Pending => {
527                        continue;
528                    }
529                }
530            }
531        };
532
533        Poll::Pending
534    }
535
536    #[await_tree::instrument("control_stream_next_event")]
537    pub(super) async fn next_event<'a>(
538        &'a mut self,
539        term_id: &str,
540        context: &Arc<impl GlobalBarrierWorkerContext>,
541    ) -> (WorkerId, WorkerNodeEvent<'a>) {
542        let mut this = Some(self);
543        poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, true)).await
544    }
545
546    #[await_tree::instrument("control_stream_next_response")]
547    pub(super) async fn next_response(
548        &mut self,
549        term_id: &str,
550        context: &Arc<impl GlobalBarrierWorkerContext>,
551    ) -> (
552        WorkerId,
553        MetaResult<streaming_control_stream_response::Response>,
554    ) {
555        let mut this = Some(self);
556        let (worker_id, event) =
557            poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, false)).await;
558        match event {
559            WorkerNodeEvent::Response(result) => (worker_id, result),
560            WorkerNodeEvent::Connected(_) => {
561                unreachable!("set poll_reconnect=false")
562            }
563        }
564    }
565}
566
567pub(super) struct DatabaseInitialBarrierCollector {
568    pub(super) database_id: DatabaseId,
569    pub(super) initializing_partial_graphs: HashSet<PartialGraphId>,
570    pub(super) database: DatabaseCheckpointControl,
571}
572
573impl Debug for DatabaseInitialBarrierCollector {
574    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
575        f.debug_struct("DatabaseInitialBarrierCollector")
576            .field("database_id", &self.database_id)
577            .field("initializing_graphs", &self.initializing_partial_graphs)
578            .finish()
579    }
580}
581
582impl DatabaseInitialBarrierCollector {
583    pub(super) fn is_collected(&self) -> bool {
584        self.initializing_partial_graphs.is_empty()
585    }
586
587    pub(super) fn partial_graph_initialized(&mut self, partial_graph_id: PartialGraphId) {
588        assert!(self.initializing_partial_graphs.remove(&partial_graph_id));
589    }
590
591    pub(super) fn all_partial_graphs(&self) -> impl Iterator<Item = PartialGraphId> + '_ {
592        database_partial_graphs(
593            self.database_id,
594            self.database
595                .independent_checkpoint_job_controls
596                .keys()
597                .copied(),
598        )
599    }
600
601    pub(super) fn finish(self) -> DatabaseCheckpointControl {
602        assert!(self.is_collected());
603        self.database
604    }
605
606    pub(super) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
607        self.database.is_valid_after_worker_err(worker_id)
608    }
609}
610
611impl PartialGraphRecoverer<'_> {
612    /// Extract information from the loaded runtime barrier worker snapshot info, and inject the initial barrier.
613    #[expect(clippy::too_many_arguments)]
614    pub(super) fn inject_database_initial_barrier(
615        &mut self,
616        database_id: DatabaseId,
617        jobs: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
618        job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
619        state_table_committed_epochs: &mut HashMap<TableId, u64>,
620        state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
621        fragment_relations: &FragmentDownstreamRelation,
622        stream_actors: &HashMap<ActorId, StreamActor>,
623        source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
624        background_jobs: &mut HashSet<JobId>,
625        mv_depended_subscriptions: &mut HashMap<TableId, HashMap<SubscriptionId, u64>>,
626        is_paused: bool,
627        hummock_version_stats: &HummockVersionStats,
628        cdc_table_snapshot_splits: &mut HashMap<JobId, CdcTableSnapshotSplits>,
629        batch_refresh: HashMap<JobId, BatchRefreshRenderResult>,
630    ) -> MetaResult<DatabaseCheckpointControl> {
631        fn collect_source_splits(
632            fragment_infos: impl Iterator<Item = &InflightFragmentInfo>,
633            source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
634        ) -> HashMap<ActorId, Vec<SplitImpl>> {
635            fragment_infos
636                .flat_map(|info| info.actors.keys())
637                .filter_map(|actor_id| {
638                    let actor_id = *actor_id as ActorId;
639                    source_splits
640                        .remove(&actor_id)
641                        .map(|splits| (actor_id, splits))
642                })
643                .collect()
644        }
645        fn build_mutation(
646            splits: &HashMap<ActorId, Vec<SplitImpl>>,
647            cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits>,
648            backfill_orders: &ExtendedFragmentBackfillOrder,
649            is_paused: bool,
650        ) -> Mutation {
651            let backfill_nodes_to_pause = get_nodes_with_backfill_dependencies(backfill_orders)
652                .into_iter()
653                .collect();
654            Mutation::Add(AddMutation {
655                // Actors built during recovery is not treated as newly added actors.
656                actor_dispatchers: Default::default(),
657                added_actors: Default::default(),
658                actor_splits: build_actor_connector_splits(splits),
659                actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
660                    splits: cdc_table_snapshot_split_assignment,
661                }),
662                pause: is_paused,
663                subscriptions_to_add: Default::default(),
664                backfill_nodes_to_pause,
665                new_upstream_sinks: Default::default(),
666            })
667        }
668
669        fn resolve_jobs_committed_epoch(
670            state_table_committed_epochs: &mut HashMap<TableId, u64>,
671            table_ids: impl Iterator<Item = TableId>,
672        ) -> u64 {
673            let mut epochs = table_ids.map(|table_id| {
674                (
675                    table_id,
676                    state_table_committed_epochs
677                        .remove(&table_id)
678                        .expect("should exist"),
679                )
680            });
681            let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
682            for (table_id, epoch) in epochs {
683                assert_eq!(
684                    prev_epoch, epoch,
685                    "{} has different committed epoch to {}",
686                    first_table_id, table_id
687                );
688            }
689            prev_epoch
690        }
691        fn job_backfill_orders(
692            job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
693            job_id: JobId,
694        ) -> UserDefinedFragmentBackfillOrder {
695            UserDefinedFragmentBackfillOrder::new(
696                job_extra_info
697                    .get(&job_id)
698                    .and_then(|info| info.backfill_orders.clone())
699                    .map_or_else(HashMap::new, |orders| orders.0),
700            )
701        }
702
703        let mut subscribers: HashMap<_, HashMap<_, _>> = jobs
704            .keys()
705            .filter_map(|job_id| {
706                mv_depended_subscriptions
707                    .remove(&job_id.as_mv_table_id())
708                    .map(|subscriptions| {
709                        (
710                            job_id.as_mv_table_id(),
711                            subscriptions
712                                .into_iter()
713                                .map(|(subscription_id, retention)| {
714                                    (
715                                        subscription_id.as_subscriber_id(),
716                                        SubscriberType::Subscription(retention),
717                                    )
718                                })
719                                .collect(),
720                        )
721                    })
722            })
723            .collect();
724
725        let mut database_jobs = HashMap::new();
726        let mut snapshot_backfill_jobs = HashMap::new();
727
728        for (job_id, job_fragments) in jobs {
729            if background_jobs.remove(&job_id) {
730                if job_fragments.values().any(|fragment| {
731                    fragment
732                        .fragment_type_mask
733                        .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
734                }) {
735                    debug!(%job_id, "recovered snapshot backfill job");
736                    snapshot_backfill_jobs.insert(job_id, job_fragments);
737                } else {
738                    database_jobs.insert(job_id, (job_fragments, true));
739                }
740            } else {
741                database_jobs.insert(job_id, (job_fragments, false));
742            }
743        }
744
745        let database_job_log_epochs: HashMap<_, _> = database_jobs
746            .keys()
747            .filter_map(|job_id| {
748                state_table_log_epochs
749                    .remove(&job_id.as_mv_table_id())
750                    .map(|epochs| (job_id.as_mv_table_id(), epochs))
751            })
752            .collect();
753
754        let prev_epoch = resolve_jobs_committed_epoch(
755            state_table_committed_epochs,
756            InflightFragmentInfo::existing_table_ids(
757                database_jobs.values().flat_map(|(job, _)| job.values()),
758            ),
759        );
760        let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
761        // Use a different `curr_epoch` for each recovery attempt.
762        let curr_epoch = prev_epoch.next();
763        let barrier_info = BarrierInfo {
764            prev_epoch,
765            curr_epoch,
766            kind: BarrierKind::Initial,
767        };
768
769        let mut ongoing_snapshot_backfill_jobs: HashMap<JobId, _> = HashMap::new();
770        for (job_id, fragment_infos) in snapshot_backfill_jobs {
771            let committed_epoch = resolve_jobs_committed_epoch(
772                state_table_committed_epochs,
773                InflightFragmentInfo::existing_table_ids(fragment_infos.values()),
774            );
775            if committed_epoch == barrier_info.prev_epoch() {
776                info!(
777                    "recovered creating snapshot backfill job {} catch up with upstream already",
778                    job_id
779                );
780                database_jobs
781                    .try_insert(job_id, (fragment_infos, true))
782                    .expect("non-duplicate");
783                continue;
784            }
785            let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
786                fragment_infos
787                    .values()
788                    .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
789            )?
790            .0
791            .ok_or_else(|| {
792                anyhow!(
793                    "recovered snapshot backfill job {} has no snapshot backfill info",
794                    job_id
795                )
796            })?;
797            let mut snapshot_epoch = None;
798            let upstream_table_ids: HashSet<_> = snapshot_backfill_info
799                .upstream_mv_table_id_to_backfill_epoch
800                .keys()
801                .cloned()
802                .collect();
803            for (upstream_table_id, epoch) in
804                snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
805            {
806                let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
807                let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
808                if *snapshot_epoch != epoch {
809                    return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
810                }
811            }
812            let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
813                anyhow!(
814                    "snapshot backfill job {} has not set snapshot epoch",
815                    job_id
816                )
817            })?;
818            for upstream_table_id in &upstream_table_ids {
819                subscribers
820                    .entry(*upstream_table_id)
821                    .or_default()
822                    .try_insert(job_id.as_subscriber_id(), SubscriberType::SnapshotBackfill)
823                    .expect("non-duplicate");
824            }
825            ongoing_snapshot_backfill_jobs
826                .try_insert(
827                    job_id,
828                    (
829                        fragment_infos,
830                        upstream_table_ids,
831                        committed_epoch,
832                        snapshot_epoch,
833                    ),
834                )
835                .expect("non-duplicated");
836        }
837
838        let mut cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits> =
839            HashMap::new();
840
841        let database_jobs: HashMap<JobId, InflightStreamingJobInfo> = {
842            database_jobs
843                .into_iter()
844                .map(|(job_id, (fragment_infos, is_background_creating))| {
845                    let status = if is_background_creating {
846                        let backfill_ordering = job_backfill_orders(job_extra_info, job_id);
847                        let backfill_ordering = StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
848                            backfill_ordering,
849                            fragment_relations,
850                            || fragment_infos.iter().map(|(fragment_id, fragment)| {
851                            (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
852                        }));
853                        let locality_fragment_state_table_mapping =
854                            build_locality_fragment_state_table_mapping(&fragment_infos);
855                        let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
856                            &backfill_ordering,
857                            &fragment_infos,
858                            locality_fragment_state_table_mapping,
859                        );
860                        CreateStreamingJobStatus::Creating {
861                            tracker: CreateMviewProgressTracker::recover(
862                                job_id,
863                                &fragment_infos,
864                                backfill_order_state,
865                                hummock_version_stats,
866                            ),
867                        }
868                    } else {
869                        CreateStreamingJobStatus::Created
870                    };
871                    let cdc_table_backfill_tracker =
872                        if let Some(splits) = cdc_table_snapshot_splits.remove(&job_id) {
873                            let cdc_fragment = fragment_infos
874                                .values()
875                                .find(|fragment| {
876                                    is_parallelized_backfill_enabled_cdc_scan_fragment(
877                                        fragment.fragment_type_mask,
878                                        &fragment.nodes,
879                                    )
880                                    .is_some()
881                                })
882                                .expect("should have parallel cdc fragment");
883                            let cdc_actors = cdc_fragment.actors.keys().copied().collect();
884                            let mut tracker =
885                                CdcTableBackfillTracker::restore(cdc_fragment.fragment_id, splits);
886                            cdc_table_snapshot_split_assignment
887                                .extend(tracker.reassign_splits(cdc_actors)?);
888                            Some(tracker)
889                        } else {
890                            None
891                        };
892                    Ok((
893                        job_id,
894                        InflightStreamingJobInfo {
895                            job_id,
896                            fragment_infos,
897                            subscribers: subscribers
898                                .remove(&job_id.as_mv_table_id())
899                                .unwrap_or_default(),
900                            status,
901                            cdc_table_backfill_tracker,
902                        },
903                    ))
904                })
905                .try_collect::<_, _, MetaError>()
906        }?;
907
908        let control_stream_manager = self.control_stream_manager();
909        let mut builder = FragmentEdgeBuilder::new(
910            database_jobs
911                .values()
912                .flat_map(|job| {
913                    let partial_graph_id = to_partial_graph_id(database_id, None);
914                    job.fragment_infos().map(move |info| {
915                        (
916                            info.fragment_id,
917                            EdgeBuilderFragmentInfo::from_inflight(
918                                info,
919                                partial_graph_id,
920                                control_stream_manager,
921                            ),
922                        )
923                    })
924                })
925                .chain(ongoing_snapshot_backfill_jobs.iter().flat_map(
926                    |(job_id, (fragments, ..))| {
927                        let partial_graph_id = to_partial_graph_id(database_id, Some(*job_id));
928                        fragments.values().map(move |fragment| {
929                            (
930                                fragment.fragment_id,
931                                EdgeBuilderFragmentInfo::from_inflight(
932                                    fragment,
933                                    partial_graph_id,
934                                    control_stream_manager,
935                                ),
936                            )
937                        })
938                    },
939                )),
940        );
941        builder.add_relations(fragment_relations);
942        let mut edges = builder.build();
943
944        {
945            let new_actors =
946                edges.collect_actors_to_create(database_jobs.values().flat_map(move |job| {
947                    job.fragment_infos.values().map(move |fragment_infos| {
948                        (
949                            fragment_infos.fragment_id,
950                            &fragment_infos.nodes,
951                            fragment_infos.actors.iter().map(move |(actor_id, actor)| {
952                                (
953                                    stream_actors.get(actor_id).expect("should exist"),
954                                    actor.worker_id,
955                                )
956                            }),
957                            job.subscribers.keys().copied(),
958                        )
959                    })
960                }));
961
962            let nodes_actors =
963                InflightFragmentInfo::actor_ids_to_collect(database_jobs.values().flatten());
964            let database_job_source_splits =
965                collect_source_splits(database_jobs.values().flatten(), source_splits);
966            let database_backfill_orders =
967                UserDefinedFragmentBackfillOrder::merge(database_jobs.values().map(|job| {
968                    if matches!(job.status, CreateStreamingJobStatus::Creating { .. }) {
969                        job_backfill_orders(job_extra_info, job.job_id)
970                    } else {
971                        UserDefinedFragmentBackfillOrder::default()
972                    }
973                }));
974            let database_backfill_orders =
975                StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
976                    database_backfill_orders,
977                    fragment_relations,
978                    || {
979                        database_jobs.values().flat_map(|job_fragments| {
980                            job_fragments
981                                .fragment_infos
982                                .iter()
983                                .map(|(fragment_id, fragment)| {
984                                    (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
985                                })
986                        })
987                    },
988                );
989            let mutation = build_mutation(
990                &database_job_source_splits,
991                cdc_table_snapshot_split_assignment,
992                &database_backfill_orders,
993                is_paused,
994            );
995
996            let partial_graph_id = to_partial_graph_id(database_id, None);
997            self.recover_graph(
998                partial_graph_id,
999                mutation,
1000                &barrier_info,
1001                &nodes_actors,
1002                InflightFragmentInfo::existing_table_ids(database_jobs.values().flatten()),
1003                new_actors,
1004                DatabaseCheckpointControlMetrics::new(database_id),
1005            )?;
1006            debug!(
1007                %database_id,
1008                "inject initial barrier"
1009            );
1010        };
1011
1012        let mut independent_checkpoint_job_controls: HashMap<
1013            JobId,
1014            IndependentCheckpointJobControl,
1015        > = HashMap::new();
1016        for (job_id, (info, upstream_table_ids, committed_epoch, snapshot_epoch)) in
1017            ongoing_snapshot_backfill_jobs
1018        {
1019            let node_actors = edges.collect_actors_to_create(info.values().map(|fragment_infos| {
1020                (
1021                    fragment_infos.fragment_id,
1022                    &fragment_infos.nodes,
1023                    fragment_infos.actors.iter().map(move |(actor_id, actor)| {
1024                        (
1025                            stream_actors.get(actor_id).expect("should exist"),
1026                            actor.worker_id,
1027                        )
1028                    }),
1029                    vec![], // no subscribers for backfilling jobs,
1030                )
1031            }));
1032
1033            let database_job_source_splits =
1034                collect_source_splits(database_jobs.values().flatten(), source_splits);
1035            assert!(
1036                !cdc_table_snapshot_splits.contains_key(&job_id),
1037                "snapshot backfill job {job_id} should not have cdc backfill"
1038            );
1039            if is_paused {
1040                bail!("should not pause when having snapshot backfill job {job_id}");
1041            }
1042            let job_backfill_orders = job_backfill_orders(job_extra_info, job_id);
1043            let job_backfill_orders =
1044                StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
1045                    job_backfill_orders,
1046                    fragment_relations,
1047                    || {
1048                        info.iter().map(|(fragment_id, fragment)| {
1049                            (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
1050                        })
1051                    },
1052                );
1053            let mutation = build_mutation(
1054                &database_job_source_splits,
1055                Default::default(), // no cdc backfill job for
1056                &job_backfill_orders,
1057                false,
1058            );
1059
1060            let job = CreatingStreamingJobControl::recover(
1061                database_id,
1062                job_id,
1063                upstream_table_ids,
1064                &database_job_log_epochs,
1065                snapshot_epoch,
1066                committed_epoch,
1067                &barrier_info,
1068                info,
1069                job_backfill_orders,
1070                fragment_relations,
1071                hummock_version_stats,
1072                node_actors,
1073                mutation.clone(),
1074                self,
1075            )?;
1076            independent_checkpoint_job_controls.insert(
1077                job_id,
1078                IndependentCheckpointJobControl::CreatingStreamingJob(job),
1079            );
1080        }
1081
1082        // Recover batch refresh jobs (both idle and consuming snapshot).
1083        // Actors were already rendered by `render_runtime_info()`.
1084        for (job_id, render_result) in batch_refresh {
1085            background_jobs.remove(&job_id);
1086            debug!(%job_id, "recovered batch refresh job");
1087
1088            // Resolve committed epoch from state tables.
1089            let committed_epoch = resolve_jobs_committed_epoch(
1090                state_table_committed_epochs,
1091                InflightFragmentInfo::existing_table_ids(render_result.fragment_infos.values()),
1092            );
1093
1094            let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
1095                render_result
1096                    .fragment_infos
1097                    .values()
1098                    .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
1099            )?
1100            .0
1101            .ok_or_else(|| anyhow!("batch refresh job {} has no snapshot backfill info", job_id))?;
1102
1103            let upstream_table_ids: HashSet<TableId> = snapshot_backfill_info
1104                .upstream_mv_table_id_to_backfill_epoch
1105                .keys()
1106                .copied()
1107                .collect();
1108            let snapshot_epoch = snapshot_backfill_info
1109                .upstream_mv_table_id_to_backfill_epoch
1110                .values()
1111                .find_map(|e| *e)
1112                .unwrap_or(committed_epoch);
1113
1114            // Register subscribers for the upstream tables.
1115            for upstream_table_id in &upstream_table_ids {
1116                subscribers
1117                    .entry(*upstream_table_id)
1118                    .or_default()
1119                    .try_insert(job_id.as_subscriber_id(), SubscriberType::SnapshotBackfill)
1120                    .expect("non-duplicate");
1121            }
1122
1123            let job_backfill_orders = job_backfill_orders(job_extra_info, job_id);
1124            let job_backfill_orders =
1125                StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
1126                    job_backfill_orders,
1127                    fragment_relations,
1128                    || {
1129                        render_result
1130                            .fragment_infos
1131                            .iter()
1132                            .map(|(fid, f)| (*fid, f.fragment_type_mask, &f.nodes))
1133                    },
1134                );
1135            let mutation = build_mutation(
1136                &Default::default(), // batch refresh has no source splits
1137                Default::default(),
1138                &job_backfill_orders,
1139                false,
1140            );
1141
1142            let job = BatchRefreshJobCheckpointControl::recover(
1143                database_id,
1144                job_id,
1145                upstream_table_ids,
1146                snapshot_epoch,
1147                committed_epoch,
1148                job_backfill_orders,
1149                hummock_version_stats,
1150                mutation,
1151                render_result,
1152                self,
1153            )?;
1154            independent_checkpoint_job_controls
1155                .insert(job_id, IndependentCheckpointJobControl::BatchRefresh(job));
1156        }
1157
1158        self.control_stream_manager()
1159            .env
1160            .shared_actor_infos()
1161            .recover_database(
1162                database_id,
1163                database_jobs
1164                    .values()
1165                    .flat_map(|info| {
1166                        info.fragment_infos()
1167                            .map(move |fragment| (fragment, info.job_id))
1168                    })
1169                    .chain(
1170                        independent_checkpoint_job_controls
1171                            .iter()
1172                            .flat_map(|(job_id, job)| {
1173                                let job_id = *job_id;
1174                                job.fragment_infos()
1175                                    .into_iter()
1176                                    .flat_map(move |infos| infos.values().map(move |f| (f, job_id)))
1177                            }),
1178                    ),
1179            );
1180
1181        let committed_epoch = barrier_info.prev_epoch();
1182        let new_epoch = barrier_info.curr_epoch;
1183        let database_info = InflightDatabaseInfo::recover(
1184            database_id,
1185            database_jobs.into_values(),
1186            self.control_stream_manager()
1187                .env
1188                .shared_actor_infos()
1189                .clone(),
1190        );
1191        let database_state = BarrierWorkerState::recovery(new_epoch, is_paused);
1192        Ok(DatabaseCheckpointControl::recovery(
1193            database_id,
1194            database_state,
1195            committed_epoch,
1196            database_info,
1197            independent_checkpoint_job_controls,
1198        ))
1199    }
1200}
1201
1202impl ControlStreamManager {
1203    fn connected_workers(&self) -> impl Iterator<Item = (WorkerId, &ControlStreamNode)> + '_ {
1204        self.workers
1205            .iter()
1206            .filter_map(|(worker_id, (_, worker_state))| match worker_state {
1207                WorkerNodeState::Connected { control_stream, .. } => {
1208                    Some((*worker_id, control_stream))
1209                }
1210                WorkerNodeState::Reconnecting(_) => None,
1211            })
1212    }
1213
1214    pub(super) fn inject_barrier(
1215        &mut self,
1216        partial_graph_id: PartialGraphId,
1217        mutation: Option<Mutation>,
1218        barrier_info: &BarrierInfo,
1219        node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
1220        table_ids_to_sync: impl Iterator<Item = TableId>,
1221        nodes_to_sync_table: impl Iterator<Item = WorkerId>,
1222        mut new_actors: Option<StreamJobActorsToCreate>,
1223    ) -> MetaResult<NodeToCollect> {
1224        fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
1225            "inject_barrier_err"
1226        ));
1227
1228        let nodes_to_sync_table: HashSet<_> = nodes_to_sync_table.collect();
1229
1230        nodes_to_sync_table.iter().for_each(|worker_id| {
1231            assert!(node_actors.contains_key(worker_id), "worker_id {worker_id} in nodes_to_sync_table {nodes_to_sync_table:?} but not in node_actors {node_actors:?}");
1232        });
1233
1234        let mut node_need_collect = NodeToCollect::new();
1235        let table_ids_to_sync = table_ids_to_sync.collect_vec();
1236
1237        node_actors.iter()
1238            .try_for_each(|(worker_id, actor_ids_to_collect)| {
1239                assert!(!actor_ids_to_collect.is_empty(), "empty actor_ids_to_collect on worker {worker_id} in node_actors {node_actors:?}");
1240                let table_ids_to_sync = if nodes_to_sync_table.contains(worker_id) {
1241                    table_ids_to_sync.clone()
1242                } else {
1243                    vec![]
1244                };
1245
1246                let node = if let Some((_, worker_state)) = self.workers.get(worker_id)
1247                    &&
1248                    let WorkerNodeState::Connected { control_stream, .. } = worker_state
1249                {
1250                    control_stream
1251                } else {
1252                    return Err(anyhow!("unconnected worker node {}", worker_id).into());
1253                };
1254
1255                {
1256                    let mutation = mutation.clone();
1257                    let barrier = Barrier {
1258                        epoch: Some(risingwave_pb::data::Epoch {
1259                            curr: barrier_info.curr_epoch(),
1260                            prev: barrier_info.prev_epoch(),
1261                        }),
1262                        mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
1263                        tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
1264                            .to_protobuf(),
1265                        kind: barrier_info.kind.to_protobuf() as i32,
1266                    };
1267
1268                    node.handle
1269                        .request_sender
1270                        .send(StreamingControlStreamRequest {
1271                            request: Some(
1272                                streaming_control_stream_request::Request::InjectBarrier(
1273                                    InjectBarrierRequest {
1274                                        request_id: Uuid::new_v4().to_string(),
1275                                        barrier: Some(barrier),
1276                                        actor_ids_to_collect: actor_ids_to_collect.iter().copied().collect(),
1277                                        table_ids_to_sync,
1278                                        partial_graph_id,
1279                                        actors_to_build: new_actors
1280                                            .as_mut()
1281                                            .map(|new_actors| new_actors.remove(worker_id))
1282                                            .into_iter()
1283                                            .flatten()
1284                                            .flatten()
1285                                            .map(|(fragment_id, (node, actors, initial_subscriber_ids))| {
1286                                                FragmentBuildActorInfo {
1287                                                    fragment_id,
1288                                                    node: Some(node),
1289                                                    actors: actors
1290                                                        .into_iter()
1291                                                        .map(|(actor, upstreams, dispatchers)| {
1292                                                            BuildActorInfo {
1293                                                                actor_id: actor.actor_id,
1294                                                                fragment_upstreams: upstreams
1295                                                                    .into_iter()
1296                                                                    .map(|(fragment_id, upstreams)| {
1297                                                                        (
1298                                                                            fragment_id,
1299                                                                            UpstreamActors {
1300                                                                                actors: upstreams
1301                                                                                    .into_values()
1302                                                                                    .collect(),
1303                                                                            },
1304                                                                        )
1305                                                                    })
1306                                                                    .collect(),
1307                                                                dispatchers,
1308                                                                vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
1309                                                                mview_definition: actor.mview_definition,
1310                                                                expr_context: actor.expr_context,
1311                                                                config_override: actor.config_override.to_string(),
1312                                                                initial_subscriber_ids: initial_subscriber_ids.iter().copied().collect(),
1313                                                            }
1314                                                        })
1315                                                        .collect(),
1316                                                }
1317                                            })
1318                                            .collect(),
1319                                    },
1320                                ),
1321                            ),
1322                        })
1323                        .map_err(|_| {
1324                            MetaError::from(anyhow!(
1325                                "failed to send request to {} {:?}",
1326                                node.worker_id,
1327                                node.host
1328                            ))
1329                        })?;
1330
1331                    node_need_collect.insert(*worker_id);
1332                    Result::<_, MetaError>::Ok(())
1333                }
1334            })
1335            .inspect_err(|e| {
1336                // Record failure in event log.
1337                use risingwave_pb::meta::event_log;
1338                let event = event_log::EventInjectBarrierFail {
1339                    prev_epoch: barrier_info.prev_epoch(),
1340                    cur_epoch: barrier_info.curr_epoch(),
1341                    error: e.to_report_string(),
1342                };
1343                self.env
1344                    .event_log_manager_ref()
1345                    .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
1346            })?;
1347        Ok(node_need_collect)
1348    }
1349
1350    pub(super) fn add_partial_graph(&mut self, partial_graph_id: PartialGraphId) {
1351        self.connected_workers().for_each(|(_, node)| {
1352            if node
1353                .handle
1354                .request_sender
1355                .send(StreamingControlStreamRequest {
1356                    request: Some(
1357                        streaming_control_stream_request::Request::CreatePartialGraph(
1358                            CreatePartialGraphRequest {
1359                                partial_graph_id,
1360                            },
1361                        ),
1362                    ),
1363                }).is_err() {
1364                let (database_id, creating_job_id) = from_partial_graph_id(partial_graph_id);
1365                warn!(%database_id, ?creating_job_id, worker_id = %node.worker_id, "fail to add partial graph to worker")
1366            }
1367        });
1368    }
1369
1370    pub(super) fn remove_partial_graphs(&mut self, partial_graph_ids: Vec<PartialGraphId>) {
1371        self.connected_workers().for_each(|(_, node)| {
1372            if node.handle
1373                .request_sender
1374                .send(StreamingControlStreamRequest {
1375                    request: Some(
1376                        streaming_control_stream_request::Request::RemovePartialGraph(
1377                            RemovePartialGraphRequest {
1378                                partial_graph_ids: partial_graph_ids.clone(),
1379                            },
1380                        ),
1381                    ),
1382                })
1383                .is_err()
1384            {
1385                warn!(worker_id = %node.worker_id,node = ?node.host,"failed to send remove partial graph request");
1386            }
1387        })
1388    }
1389
1390    pub(super) fn reset_partial_graphs(
1391        &mut self,
1392        partial_graph_ids: Vec<PartialGraphId>,
1393    ) -> HashSet<WorkerId> {
1394        self.connected_workers()
1395            .filter_map(|(worker_id, node)| {
1396                if node
1397                    .handle
1398                    .request_sender
1399                    .send(StreamingControlStreamRequest {
1400                        request: Some(
1401                            streaming_control_stream_request::Request::ResetPartialGraphs(
1402                                ResetPartialGraphsRequest {
1403                                    partial_graph_ids: partial_graph_ids.clone(),
1404                                },
1405                            ),
1406                        ),
1407                    })
1408                    .is_err()
1409                {
1410                    warn!(%worker_id, node = ?node.host,"failed to send reset database request");
1411                    None
1412                } else {
1413                    Some(worker_id)
1414                }
1415            })
1416            .collect()
1417    }
1418}
1419
1420impl GlobalBarrierWorkerContextImpl {
1421    pub(super) async fn new_control_stream_impl(
1422        &self,
1423        node: &WorkerNode,
1424        init_request: &PbInitRequest,
1425    ) -> MetaResult<StreamingControlHandle> {
1426        let handle = self
1427            .env
1428            .stream_client_pool()
1429            .get(node)
1430            .await?
1431            .start_streaming_control(init_request.clone())
1432            .await?;
1433        Ok(handle)
1434    }
1435}
1436
1437pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1438    message: &str,
1439    errors: impl IntoIterator<Item = (WorkerId, E)>,
1440) -> MetaError {
1441    use std::fmt::Write;
1442
1443    use risingwave_common::error::error_request_copy;
1444    use risingwave_common::error::tonic::extra::Score;
1445
1446    let errors = errors.into_iter().collect_vec();
1447
1448    if errors.is_empty() {
1449        return anyhow!(message.to_owned()).into();
1450    }
1451
1452    // Create the error from the single error.
1453    let single_error = |(worker_id, e)| {
1454        anyhow::Error::from(e)
1455            .context(format!("{message}, in worker node {worker_id}"))
1456            .into()
1457    };
1458
1459    if errors.len() == 1 {
1460        return single_error(errors.into_iter().next().unwrap());
1461    }
1462
1463    // Find the error with the highest score.
1464    let max_score = errors
1465        .iter()
1466        .filter_map(|(_, e)| error_request_copy::<Score>(e))
1467        .max();
1468
1469    if let Some(max_score) = max_score {
1470        let mut errors = errors;
1471        let max_scored = errors
1472            .extract_if(.., |(_, e)| {
1473                error_request_copy::<Score>(e) == Some(max_score)
1474            })
1475            .next()
1476            .unwrap();
1477
1478        return single_error(max_scored);
1479    }
1480
1481    // The errors do not have scores, so simply concatenate them.
1482    let concat: String = errors
1483        .into_iter()
1484        .fold(format!("{message}: "), |mut s, (w, e)| {
1485            write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1486            s
1487        });
1488    anyhow!(concat).into()
1489}
1490
1491#[cfg(test)]
1492mod test_partial_graph_id {
1493    use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
1494
1495    #[test]
1496    fn test_partial_graph_id_conversion() {
1497        let database_id = 233.into();
1498        let job_id = 233.into();
1499        assert_eq!(
1500            (database_id, None),
1501            from_partial_graph_id(to_partial_graph_id(database_id, None))
1502        );
1503        assert_eq!(
1504            (database_id, Some(job_id)),
1505            from_partial_graph_id(to_partial_graph_id(database_id, Some(job_id)))
1506        );
1507    }
1508}