Skip to main content

risingwave_meta/barrier/
rpc.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::error::Error;
18use std::fmt::{Debug, Formatter};
19use std::future::poll_fn;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::future::{BoxFuture, join_all};
27use futures::{FutureExt, StreamExt};
28use itertools::Itertools;
29use risingwave_common::bail;
30use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
31use risingwave_common::id::JobId;
32use risingwave_common::util::epoch::Epoch;
33use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
34use risingwave_common::util::tracing::TracingContext;
35use risingwave_connector::source::SplitImpl;
36use risingwave_meta_model::WorkerId;
37use risingwave_pb::common::{HostAddress, WorkerNode};
38use risingwave_pb::hummock::HummockVersionStats;
39use risingwave_pb::id::PartialGraphId;
40use risingwave_pb::source::{PbCdcTableSnapshotSplits, PbCdcTableSnapshotSplitsWithGeneration};
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::stream_node::NodeBody;
43use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation};
44use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
45use risingwave_pb::stream_service::inject_barrier_request::{
46    BuildActorInfo, FragmentBuildActorInfo,
47};
48use risingwave_pb::stream_service::streaming_control_stream_request::{
49    CreatePartialGraphRequest, PbCreatePartialGraphRequest, PbInitRequest,
50    RemovePartialGraphRequest, ResetPartialGraphsRequest,
51};
52use risingwave_pb::stream_service::{
53    InjectBarrierRequest, StreamingControlStreamRequest, streaming_control_stream_request,
54    streaming_control_stream_response,
55};
56use risingwave_rpc_client::StreamingControlHandle;
57use thiserror_ext::AsReport;
58use tokio::time::{Instant, sleep};
59use tokio_retry::strategy::ExponentialBackoff;
60use tracing::{debug, error, info, warn};
61use uuid::Uuid;
62
63use super::{BarrierKind, TracedEpoch};
64use crate::barrier::BackfillOrderState;
65use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
66use crate::barrier::cdc_progress::CdcTableBackfillTracker;
67use crate::barrier::checkpoint::{
68    BarrierWorkerState, BatchRefreshJobCheckpointControl, BatchRefreshRenderResult,
69    CreatingStreamingJobControl, DatabaseCheckpointControl, DatabaseCheckpointControlMetrics,
70    IndependentCheckpointJobControl,
71};
72use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
73use crate::barrier::edge_builder::{EdgeBuilderFragmentInfo, FragmentEdgeBuilder};
74use crate::barrier::info::{
75    BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
76    SubscriberType,
77};
78use crate::barrier::partial_graph::PartialGraphRecoverer;
79use crate::barrier::progress::CreateMviewProgressTracker;
80use crate::barrier::utils::NodeToCollect;
81use crate::controller::fragment::InflightFragmentInfo;
82use crate::controller::utils::StreamingJobExtraInfo;
83use crate::manager::MetaSrvEnv;
84use crate::model::{
85    ActorId, FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
86    SubscriptionId,
87};
88use crate::stream::cdc::{
89    CdcTableSnapshotSplits, is_parallelized_backfill_enabled_cdc_scan_fragment,
90};
91use crate::stream::{
92    ExtendedFragmentBackfillOrder, StreamFragmentGraph, UserDefinedFragmentBackfillOrder,
93    build_actor_connector_splits,
94};
95use crate::{MetaError, MetaResult};
96
97pub(super) fn to_partial_graph_id(
98    database_id: DatabaseId,
99    creating_job_id: Option<JobId>,
100) -> PartialGraphId {
101    let raw_job_id = creating_job_id
102        .map(|job_id| {
103            assert_ne!(job_id, u32::MAX);
104            job_id.as_raw_id()
105        })
106        .unwrap_or(u32::MAX);
107    (((database_id.as_raw_id() as u64) << 32) | (raw_job_id as u64)).into()
108}
109
110pub(super) fn from_partial_graph_id(
111    partial_graph_id: PartialGraphId,
112) -> (DatabaseId, Option<JobId>) {
113    let id = partial_graph_id.as_raw_id();
114    let database_id = (id >> 32) as u32;
115    let raw_creating_job_id = (id & ((1 << 32) - 1)) as u32;
116    let creating_job_id = if raw_creating_job_id == u32::MAX {
117        None
118    } else {
119        Some(JobId::new(raw_creating_job_id))
120    };
121    (database_id.into(), creating_job_id)
122}
123
124pub(super) fn build_locality_fragment_state_table_mapping(
125    fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
126) -> HashMap<FragmentId, Vec<TableId>> {
127    let mut mapping = HashMap::new();
128
129    for (fragment_id, fragment_info) in fragment_infos {
130        let mut state_table_ids = Vec::new();
131        visit_stream_node_cont(&fragment_info.nodes, |stream_node| {
132            if let Some(NodeBody::LocalityProvider(locality_provider)) =
133                stream_node.node_body.as_ref()
134            {
135                let state_table_id = locality_provider
136                    .state_table
137                    .as_ref()
138                    .expect("must have state table")
139                    .id;
140                state_table_ids.push(state_table_id);
141                false
142            } else {
143                true
144            }
145        });
146        if !state_table_ids.is_empty() {
147            mapping.insert(*fragment_id, state_table_ids);
148        }
149    }
150
151    mapping
152}
153
154pub(super) fn database_partial_graphs<'a>(
155    database_id: DatabaseId,
156    creating_jobs: impl Iterator<Item = JobId> + Sized + 'a,
157) -> impl Iterator<Item = PartialGraphId> + 'a {
158    creating_jobs
159        .map(Some)
160        .chain([None])
161        .map(move |creating_job_id| to_partial_graph_id(database_id, creating_job_id))
162}
163
164struct ControlStreamNode {
165    worker_id: WorkerId,
166    host: HostAddress,
167    handle: StreamingControlHandle,
168}
169
170enum WorkerNodeState {
171    Connected {
172        control_stream: ControlStreamNode,
173        removed: bool,
174    },
175    Reconnecting(BoxFuture<'static, StreamingControlHandle>),
176}
177
178pub(super) struct ControlStreamManager {
179    workers: HashMap<WorkerId, (WorkerNode, WorkerNodeState)>,
180    pub env: MetaSrvEnv,
181}
182
183impl ControlStreamManager {
184    pub(super) fn new(env: MetaSrvEnv) -> Self {
185        Self {
186            workers: Default::default(),
187            env,
188        }
189    }
190
191    pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
192        self.workers[&worker_id].0.host.clone().unwrap()
193    }
194
195    pub(super) async fn add_worker(
196        &mut self,
197        node: WorkerNode,
198        partial_graphs: impl Iterator<Item = PartialGraphId>,
199        term_id: &String,
200        context: Arc<impl GlobalBarrierWorkerContext>,
201    ) {
202        let node_id = node.id;
203        if let Entry::Occupied(entry) = self.workers.entry(node_id) {
204            let (existing_node, worker_state) = entry.get();
205            assert_eq!(existing_node.host, node.host);
206            warn!(id = %node.id, host = ?node.host, "node already exists");
207            match worker_state {
208                WorkerNodeState::Connected { .. } => {
209                    warn!(id = %node.id, host = ?node.host, "new node already connected");
210                    return;
211                }
212                WorkerNodeState::Reconnecting(_) => {
213                    warn!(id = %node.id, host = ?node.host, "remove previous pending worker connect request and reconnect");
214                    entry.remove();
215                }
216            }
217        }
218        let node_host = node.host.clone().unwrap();
219        let mut backoff = ExponentialBackoff::from_millis(100)
220            .max_delay(Duration::from_secs(3))
221            .factor(5);
222        const MAX_RETRY: usize = 5;
223        for i in 1..=MAX_RETRY {
224            match context
225                .new_control_stream(
226                    &node,
227                    &PbInitRequest {
228                        term_id: term_id.clone(),
229                    },
230                )
231                .await
232            {
233                Ok(mut handle) => {
234                    WorkerNodeConnected {
235                        handle: &mut handle,
236                        node: &node,
237                    }
238                    .initialize(partial_graphs);
239                    info!(?node_host, "add control stream worker");
240                    assert!(
241                        self.workers
242                            .insert(
243                                node_id,
244                                (
245                                    node,
246                                    WorkerNodeState::Connected {
247                                        control_stream: ControlStreamNode {
248                                            worker_id: node_id as _,
249                                            host: node_host,
250                                            handle,
251                                        },
252                                        removed: false
253                                    }
254                                )
255                            )
256                            .is_none()
257                    );
258                    return;
259                }
260                Err(e) => {
261                    // It may happen that the dns information of newly registered worker node
262                    // has not been propagated to the meta node and cause error. Wait for a while and retry
263                    let delay = backoff.next().unwrap();
264                    error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
265                    sleep(delay).await;
266                }
267            }
268        }
269        error!(?node_host, "fail to create worker node after retry");
270        assert!(
271            self.workers
272                .insert(
273                    node_id,
274                    (
275                        node.clone(),
276                        WorkerNodeState::Reconnecting(ControlStreamManager::retry_connect(
277                            node,
278                            term_id.to_owned(),
279                            context,
280                        ))
281                    )
282                )
283                .is_none()
284        );
285    }
286
287    pub(super) fn remove_worker(&mut self, node: WorkerNode) {
288        if let Entry::Occupied(mut entry) = self.workers.entry(node.id) {
289            let (_, worker_state) = entry.get_mut();
290            match worker_state {
291                WorkerNodeState::Connected { removed, .. } => {
292                    info!(worker_id = %node.id, "mark connected worker as removed");
293                    *removed = true;
294                }
295                WorkerNodeState::Reconnecting(_) => {
296                    info!(worker_id = %node.id, "remove worker");
297                    entry.remove();
298                }
299            }
300        }
301    }
302
303    fn retry_connect(
304        node: WorkerNode,
305        term_id: String,
306        context: Arc<impl GlobalBarrierWorkerContext>,
307    ) -> BoxFuture<'static, StreamingControlHandle> {
308        async move {
309            let mut attempt = 0;
310            let backoff = ExponentialBackoff::from_millis(100)
311                .max_delay(Duration::from_mins(1))
312                .factor(5);
313            let init_request = PbInitRequest { term_id };
314            for delay in backoff {
315                attempt += 1;
316                sleep(delay).await;
317                match context.new_control_stream(&node, &init_request).await {
318                    Ok(handle) => {
319                        return handle;
320                    }
321                    Err(e) => {
322                        warn!(e = %e.as_report(), ?node, attempt, "fail to create control stream worker");
323                    }
324                }
325            }
326            unreachable!("end of retry backoff")
327        }.boxed()
328    }
329
330    pub(super) async fn recover(
331        env: MetaSrvEnv,
332        nodes: &HashMap<WorkerId, WorkerNode>,
333        term_id: &str,
334        context: Arc<impl GlobalBarrierWorkerContext>,
335    ) -> Self {
336        let reset_start_time = Instant::now();
337        let init_request = PbInitRequest {
338            term_id: term_id.to_owned(),
339        };
340        let init_request = &init_request;
341        let nodes = join_all(nodes.iter().map(|(worker_id, node)| async {
342            let result = context.new_control_stream(node, init_request).await;
343            (*worker_id, node.clone(), result)
344        }))
345        .await;
346        let mut unconnected_workers = HashSet::new();
347        let mut workers = HashMap::new();
348        for (worker_id, node, result) in nodes {
349            match result {
350                Ok(handle) => {
351                    let control_stream = ControlStreamNode {
352                        worker_id: node.id,
353                        host: node.host.clone().unwrap(),
354                        handle,
355                    };
356                    assert!(
357                        workers
358                            .insert(
359                                worker_id,
360                                (
361                                    node,
362                                    WorkerNodeState::Connected {
363                                        control_stream,
364                                        removed: false
365                                    }
366                                )
367                            )
368                            .is_none()
369                    );
370                }
371                Err(e) => {
372                    unconnected_workers.insert(worker_id);
373                    warn!(
374                        e = %e.as_report(),
375                        %worker_id,
376                        ?node,
377                        "failed to connect to node"
378                    );
379                    assert!(
380                        workers
381                            .insert(
382                                worker_id,
383                                (
384                                    node.clone(),
385                                    WorkerNodeState::Reconnecting(Self::retry_connect(
386                                        node,
387                                        term_id.to_owned(),
388                                        context.clone()
389                                    ))
390                                )
391                            )
392                            .is_none()
393                    );
394                }
395            }
396        }
397
398        info!(elapsed=?reset_start_time.elapsed(), ?unconnected_workers, "control stream reset");
399
400        Self { workers, env }
401    }
402
403    /// Clear all nodes and response streams in the manager.
404    pub(super) fn clear(&mut self) {
405        *self = Self::new(self.env.clone());
406    }
407}
408
409pub(super) struct WorkerNodeConnected<'a> {
410    node: &'a WorkerNode,
411    handle: &'a mut StreamingControlHandle,
412}
413
414impl<'a> WorkerNodeConnected<'a> {
415    pub(super) fn initialize(self, partial_graphs: impl Iterator<Item = PartialGraphId>) {
416        for partial_graph_id in partial_graphs {
417            if let Err(e) = self.handle.send_request(StreamingControlStreamRequest {
418                request: Some(
419                    streaming_control_stream_request::Request::CreatePartialGraph(
420                        PbCreatePartialGraphRequest { partial_graph_id },
421                    ),
422                ),
423            }) {
424                warn!(e = %e.as_report(), node = ?self.node, "failed to send initial partial graph request");
425            }
426        }
427    }
428}
429
430pub(super) enum WorkerNodeEvent<'a> {
431    Response(MetaResult<streaming_control_stream_response::Response>),
432    Connected(WorkerNodeConnected<'a>),
433}
434
435impl ControlStreamManager {
436    fn poll_next_event<'a>(
437        this_opt: &mut Option<&'a mut Self>,
438        cx: &mut Context<'_>,
439        term_id: &str,
440        context: &Arc<impl GlobalBarrierWorkerContext>,
441        poll_reconnect: bool,
442    ) -> Poll<(WorkerId, WorkerNodeEvent<'a>)> {
443        let this = this_opt.as_mut().expect("Future polled after completion");
444        if this.workers.is_empty() {
445            return Poll::Pending;
446        }
447        {
448            for (&worker_id, (node, worker_state)) in &mut this.workers {
449                let control_stream = match worker_state {
450                    WorkerNodeState::Connected { control_stream, .. } => control_stream,
451                    WorkerNodeState::Reconnecting(_) if !poll_reconnect => {
452                        continue;
453                    }
454                    WorkerNodeState::Reconnecting(join_handle) => {
455                        match join_handle.poll_unpin(cx) {
456                            Poll::Ready(handle) => {
457                                info!(id=%node.id, host=?node.host, "reconnected to worker");
458                                *worker_state = WorkerNodeState::Connected {
459                                    control_stream: ControlStreamNode {
460                                        worker_id: node.id,
461                                        host: node.host.clone().unwrap(),
462                                        handle,
463                                    },
464                                    removed: false,
465                                };
466                                let this = this_opt.take().expect("should exist");
467                                let (node, worker_state) =
468                                    this.workers.get_mut(&worker_id).expect("should exist");
469                                let WorkerNodeState::Connected { control_stream, .. } =
470                                    worker_state
471                                else {
472                                    unreachable!()
473                                };
474                                return Poll::Ready((
475                                    worker_id,
476                                    WorkerNodeEvent::Connected(WorkerNodeConnected {
477                                        node,
478                                        handle: &mut control_stream.handle,
479                                    }),
480                                ));
481                            }
482                            Poll::Pending => {
483                                continue;
484                            }
485                        }
486                    }
487                };
488                match control_stream.handle.response_stream.poll_next_unpin(cx) {
489                    Poll::Ready(result) => {
490                        {
491                            let result = result
492                                .ok_or_else(|| (false, anyhow!("end of stream").into()))
493                                .and_then(|result| {
494                                    result.map_err(|err| -> (bool, MetaError) { (false, err.into()) }).and_then(|resp| {
495                                        match resp
496                                            .response
497                                            .ok_or_else(|| (false, anyhow!("empty response").into()))?
498                                        {
499                                            streaming_control_stream_response::Response::Shutdown(_) => Err((true, anyhow!(
500                                                "worker node {worker_id} is shutting down"
501                                            )
502                                                .into())),
503                                            streaming_control_stream_response::Response::Init(_) => {
504                                                // This arm should be unreachable.
505                                                Err((false, anyhow!("get unexpected init response").into()))
506                                            }
507                                            resp => {
508                                                if let streaming_control_stream_response::Response::CompleteBarrier(barrier_resp) = &resp {
509                                                    assert_eq!(worker_id, barrier_resp.worker_id);
510                                                }
511                                                Ok(resp)
512                                            }
513                                        }
514                                    })
515                                });
516                            let result = match result {
517                                Ok(resp) => Ok(resp),
518                                Err((shutdown, err)) => {
519                                    warn!(worker_id = %node.id, host = ?node.host, err = %err.as_report(), "get error from response stream");
520                                    let WorkerNodeState::Connected { removed, .. } = worker_state
521                                    else {
522                                        unreachable!("checked connected")
523                                    };
524                                    if *removed || shutdown {
525                                        this.workers.remove(&worker_id);
526                                    } else {
527                                        *worker_state = WorkerNodeState::Reconnecting(
528                                            ControlStreamManager::retry_connect(
529                                                node.clone(),
530                                                term_id.to_owned(),
531                                                context.clone(),
532                                            ),
533                                        );
534                                    }
535                                    Err(err)
536                                }
537                            };
538                            return Poll::Ready((worker_id, WorkerNodeEvent::Response(result)));
539                        }
540                    }
541                    Poll::Pending => {
542                        continue;
543                    }
544                }
545            }
546        };
547
548        Poll::Pending
549    }
550
551    #[await_tree::instrument("control_stream_next_event")]
552    pub(super) async fn next_event<'a>(
553        &'a mut self,
554        term_id: &str,
555        context: &Arc<impl GlobalBarrierWorkerContext>,
556    ) -> (WorkerId, WorkerNodeEvent<'a>) {
557        let mut this = Some(self);
558        poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, true)).await
559    }
560
561    #[await_tree::instrument("control_stream_next_response")]
562    pub(super) async fn next_response(
563        &mut self,
564        term_id: &str,
565        context: &Arc<impl GlobalBarrierWorkerContext>,
566    ) -> (
567        WorkerId,
568        MetaResult<streaming_control_stream_response::Response>,
569    ) {
570        let mut this = Some(self);
571        let (worker_id, event) =
572            poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, false)).await;
573        match event {
574            WorkerNodeEvent::Response(result) => (worker_id, result),
575            WorkerNodeEvent::Connected(_) => {
576                unreachable!("set poll_reconnect=false")
577            }
578        }
579    }
580}
581
582pub(super) struct DatabaseInitialBarrierCollector {
583    pub(super) database_id: DatabaseId,
584    pub(super) initializing_partial_graphs: HashSet<PartialGraphId>,
585    pub(super) database: DatabaseCheckpointControl,
586}
587
588impl Debug for DatabaseInitialBarrierCollector {
589    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
590        f.debug_struct("DatabaseInitialBarrierCollector")
591            .field("database_id", &self.database_id)
592            .field("initializing_graphs", &self.initializing_partial_graphs)
593            .finish()
594    }
595}
596
597impl DatabaseInitialBarrierCollector {
598    pub(super) fn is_collected(&self) -> bool {
599        self.initializing_partial_graphs.is_empty()
600    }
601
602    pub(super) fn partial_graph_initialized(&mut self, partial_graph_id: PartialGraphId) {
603        assert!(self.initializing_partial_graphs.remove(&partial_graph_id));
604    }
605
606    pub(super) fn all_partial_graphs(&self) -> impl Iterator<Item = PartialGraphId> + '_ {
607        database_partial_graphs(
608            self.database_id,
609            self.database
610                .independent_checkpoint_job_controls
611                .keys()
612                .copied(),
613        )
614    }
615
616    pub(super) fn finish(self) -> DatabaseCheckpointControl {
617        assert!(self.is_collected());
618        self.database
619    }
620
621    pub(super) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
622        self.database.is_valid_after_worker_err(worker_id)
623    }
624}
625
626impl PartialGraphRecoverer<'_> {
627    /// Extract information from the loaded runtime barrier worker snapshot info, and inject the initial barrier.
628    #[expect(clippy::too_many_arguments)]
629    pub(super) fn inject_database_initial_barrier(
630        &mut self,
631        database_id: DatabaseId,
632        jobs: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
633        job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
634        state_table_committed_epochs: &mut HashMap<TableId, u64>,
635        state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
636        fragment_relations: &FragmentDownstreamRelation,
637        stream_actors: &HashMap<ActorId, StreamActor>,
638        source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
639        background_jobs: &mut HashSet<JobId>,
640        mv_depended_subscriptions: &mut HashMap<TableId, HashMap<SubscriptionId, u64>>,
641        is_paused: bool,
642        hummock_version_stats: &HummockVersionStats,
643        cdc_table_snapshot_splits: &mut HashMap<JobId, CdcTableSnapshotSplits>,
644        batch_refresh: HashMap<JobId, BatchRefreshRenderResult>,
645    ) -> MetaResult<DatabaseCheckpointControl> {
646        fn collect_source_splits(
647            fragment_infos: impl Iterator<Item = &InflightFragmentInfo>,
648            source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
649        ) -> HashMap<ActorId, Vec<SplitImpl>> {
650            fragment_infos
651                .flat_map(|info| info.actors.keys())
652                .filter_map(|actor_id| {
653                    let actor_id = *actor_id as ActorId;
654                    source_splits
655                        .remove(&actor_id)
656                        .map(|splits| (actor_id, splits))
657                })
658                .collect()
659        }
660        fn build_mutation(
661            splits: &HashMap<ActorId, Vec<SplitImpl>>,
662            cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits>,
663            backfill_orders: &ExtendedFragmentBackfillOrder,
664            is_paused: bool,
665        ) -> Mutation {
666            let backfill_nodes_to_pause = get_nodes_with_backfill_dependencies(backfill_orders)
667                .into_iter()
668                .collect();
669            Mutation::Add(AddMutation {
670                // Actors built during recovery is not treated as newly added actors.
671                actor_dispatchers: Default::default(),
672                added_actors: Default::default(),
673                actor_splits: build_actor_connector_splits(splits),
674                actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
675                    splits: cdc_table_snapshot_split_assignment,
676                }),
677                pause: is_paused,
678                subscriptions_to_add: Default::default(),
679                backfill_nodes_to_pause,
680                new_upstream_sinks: Default::default(),
681            })
682        }
683
684        fn resolve_jobs_committed_epoch(
685            state_table_committed_epochs: &mut HashMap<TableId, u64>,
686            table_ids: impl Iterator<Item = TableId>,
687        ) -> u64 {
688            let mut epochs = table_ids.map(|table_id| {
689                (
690                    table_id,
691                    state_table_committed_epochs
692                        .remove(&table_id)
693                        .expect("should exist"),
694                )
695            });
696            let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
697            for (table_id, epoch) in epochs {
698                assert_eq!(
699                    prev_epoch, epoch,
700                    "{} has different committed epoch to {}",
701                    first_table_id, table_id
702                );
703            }
704            prev_epoch
705        }
706        fn job_backfill_orders(
707            job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
708            job_id: JobId,
709        ) -> UserDefinedFragmentBackfillOrder {
710            UserDefinedFragmentBackfillOrder::new(
711                job_extra_info
712                    .get(&job_id)
713                    .and_then(|info| info.backfill_orders.clone())
714                    .map_or_else(HashMap::new, |orders| orders.0),
715            )
716        }
717
718        let mut subscribers: HashMap<_, HashMap<_, _>> = jobs
719            .keys()
720            .filter_map(|job_id| {
721                mv_depended_subscriptions
722                    .remove(&job_id.as_mv_table_id())
723                    .map(|subscriptions| {
724                        (
725                            job_id.as_mv_table_id(),
726                            subscriptions
727                                .into_iter()
728                                .map(|(subscription_id, retention)| {
729                                    (
730                                        subscription_id.as_subscriber_id(),
731                                        SubscriberType::Subscription(retention),
732                                    )
733                                })
734                                .collect(),
735                        )
736                    })
737            })
738            .collect();
739
740        // Batch-refresh jobs are rendered outside `jobs`, but their upstream tables
741        // must still start with log-store-enabled subscribers after recovery.
742        for (job_id, render_result) in &batch_refresh {
743            let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
744                render_result
745                    .fragment_infos
746                    .values()
747                    .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
748            )?
749            .0
750            .ok_or_else(|| anyhow!("batch refresh job {} has no snapshot backfill info", job_id))?;
751
752            for upstream_table_id in snapshot_backfill_info
753                .upstream_mv_table_id_to_backfill_epoch
754                .keys()
755            {
756                subscribers
757                    .entry(*upstream_table_id)
758                    .or_default()
759                    .try_insert(job_id.as_subscriber_id(), SubscriberType::SnapshotBackfill)
760                    .expect("non-duplicate");
761            }
762        }
763
764        let mut database_jobs = HashMap::new();
765        let mut snapshot_backfill_jobs = HashMap::new();
766
767        for (job_id, job_fragments) in jobs {
768            if background_jobs.remove(&job_id) {
769                if job_fragments.values().any(|fragment| {
770                    fragment
771                        .fragment_type_mask
772                        .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
773                }) {
774                    debug!(%job_id, "recovered snapshot backfill job");
775                    snapshot_backfill_jobs.insert(job_id, job_fragments);
776                } else {
777                    database_jobs.insert(job_id, (job_fragments, true));
778                }
779            } else {
780                database_jobs.insert(job_id, (job_fragments, false));
781            }
782        }
783
784        let database_job_log_epochs: HashMap<_, _> = database_jobs
785            .keys()
786            .filter_map(|job_id| {
787                state_table_log_epochs
788                    .remove(&job_id.as_mv_table_id())
789                    .map(|epochs| (job_id.as_mv_table_id(), epochs))
790            })
791            .collect();
792
793        let prev_epoch = resolve_jobs_committed_epoch(
794            state_table_committed_epochs,
795            InflightFragmentInfo::existing_table_ids(
796                database_jobs.values().flat_map(|(job, _)| job.values()),
797            ),
798        );
799        let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
800        // Use a different `curr_epoch` for each recovery attempt.
801        let curr_epoch = prev_epoch.next();
802        let barrier_info = BarrierInfo {
803            prev_epoch,
804            curr_epoch,
805            kind: BarrierKind::Initial,
806        };
807
808        let mut ongoing_snapshot_backfill_jobs: HashMap<JobId, _> = HashMap::new();
809        for (job_id, fragment_infos) in snapshot_backfill_jobs {
810            let committed_epoch = resolve_jobs_committed_epoch(
811                state_table_committed_epochs,
812                InflightFragmentInfo::existing_table_ids(fragment_infos.values()),
813            );
814            if committed_epoch == barrier_info.prev_epoch() {
815                info!(
816                    "recovered creating snapshot backfill job {} catch up with upstream already",
817                    job_id
818                );
819                database_jobs
820                    .try_insert(job_id, (fragment_infos, true))
821                    .expect("non-duplicate");
822                continue;
823            }
824            let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
825                fragment_infos
826                    .values()
827                    .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
828            )?
829            .0
830            .ok_or_else(|| {
831                anyhow!(
832                    "recovered snapshot backfill job {} has no snapshot backfill info",
833                    job_id
834                )
835            })?;
836            let mut snapshot_epoch = None;
837            let upstream_table_ids: HashSet<_> = snapshot_backfill_info
838                .upstream_mv_table_id_to_backfill_epoch
839                .keys()
840                .cloned()
841                .collect();
842            for (upstream_table_id, epoch) in
843                snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
844            {
845                let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
846                let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
847                if *snapshot_epoch != epoch {
848                    return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
849                }
850            }
851            let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
852                anyhow!(
853                    "snapshot backfill job {} has not set snapshot epoch",
854                    job_id
855                )
856            })?;
857            for upstream_table_id in &upstream_table_ids {
858                subscribers
859                    .entry(*upstream_table_id)
860                    .or_default()
861                    .try_insert(job_id.as_subscriber_id(), SubscriberType::SnapshotBackfill)
862                    .expect("non-duplicate");
863            }
864            ongoing_snapshot_backfill_jobs
865                .try_insert(
866                    job_id,
867                    (
868                        fragment_infos,
869                        upstream_table_ids,
870                        committed_epoch,
871                        snapshot_epoch,
872                    ),
873                )
874                .expect("non-duplicated");
875        }
876
877        let mut cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits> =
878            HashMap::new();
879
880        let database_jobs: HashMap<JobId, InflightStreamingJobInfo> = {
881            database_jobs
882                .into_iter()
883                .map(|(job_id, (fragment_infos, is_background_creating))| {
884                    let status = if is_background_creating {
885                        let backfill_ordering = job_backfill_orders(job_extra_info, job_id);
886                        let backfill_ordering = StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
887                            backfill_ordering,
888                            fragment_relations,
889                            || fragment_infos.iter().map(|(fragment_id, fragment)| {
890                            (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
891                        }));
892                        let locality_fragment_state_table_mapping =
893                            build_locality_fragment_state_table_mapping(&fragment_infos);
894                        let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
895                            &backfill_ordering,
896                            &fragment_infos,
897                            locality_fragment_state_table_mapping,
898                        );
899                        CreateStreamingJobStatus::Creating {
900                            tracker: CreateMviewProgressTracker::recover(
901                                job_id,
902                                &fragment_infos,
903                                backfill_order_state,
904                                hummock_version_stats,
905                            ),
906                        }
907                    } else {
908                        CreateStreamingJobStatus::Created
909                    };
910                    let cdc_table_backfill_tracker =
911                        if let Some(splits) = cdc_table_snapshot_splits.remove(&job_id) {
912                            let cdc_fragment = fragment_infos
913                                .values()
914                                .find(|fragment| {
915                                    is_parallelized_backfill_enabled_cdc_scan_fragment(
916                                        fragment.fragment_type_mask,
917                                        &fragment.nodes,
918                                    )
919                                    .is_some()
920                                })
921                                .expect("should have parallel cdc fragment");
922                            let cdc_actors = cdc_fragment.actors.keys().copied().collect();
923                            let mut tracker =
924                                CdcTableBackfillTracker::restore(cdc_fragment.fragment_id, splits);
925                            cdc_table_snapshot_split_assignment
926                                .extend(tracker.reassign_splits(cdc_actors)?);
927                            Some(tracker)
928                        } else {
929                            None
930                        };
931                    Ok((
932                        job_id,
933                        InflightStreamingJobInfo {
934                            job_id,
935                            fragment_infos,
936                            subscribers: subscribers
937                                .remove(&job_id.as_mv_table_id())
938                                .unwrap_or_default(),
939                            status,
940                            cdc_table_backfill_tracker,
941                        },
942                    ))
943                })
944                .try_collect::<_, _, MetaError>()
945        }?;
946
947        let control_stream_manager = self.control_stream_manager();
948        let mut builder = FragmentEdgeBuilder::new(
949            database_jobs
950                .values()
951                .flat_map(|job| {
952                    let partial_graph_id = to_partial_graph_id(database_id, None);
953                    job.fragment_infos().map(move |info| {
954                        (
955                            info.fragment_id,
956                            EdgeBuilderFragmentInfo::from_inflight(
957                                info,
958                                partial_graph_id,
959                                control_stream_manager,
960                            ),
961                        )
962                    })
963                })
964                .chain(ongoing_snapshot_backfill_jobs.iter().flat_map(
965                    |(job_id, (fragments, ..))| {
966                        let partial_graph_id = to_partial_graph_id(database_id, Some(*job_id));
967                        fragments.values().map(move |fragment| {
968                            (
969                                fragment.fragment_id,
970                                EdgeBuilderFragmentInfo::from_inflight(
971                                    fragment,
972                                    partial_graph_id,
973                                    control_stream_manager,
974                                ),
975                            )
976                        })
977                    },
978                )),
979        );
980        builder.add_relations(fragment_relations);
981        let mut edges = builder.build();
982
983        {
984            let new_actors =
985                edges.collect_actors_to_create(database_jobs.values().flat_map(move |job| {
986                    job.fragment_infos.values().map(move |fragment_infos| {
987                        (
988                            fragment_infos.fragment_id,
989                            &fragment_infos.nodes,
990                            fragment_infos.actors.iter().map(move |(actor_id, actor)| {
991                                (
992                                    stream_actors.get(actor_id).expect("should exist"),
993                                    actor.worker_id,
994                                )
995                            }),
996                            job.subscribers.keys().copied(),
997                        )
998                    })
999                }));
1000
1001            let nodes_actors =
1002                InflightFragmentInfo::actor_ids_to_collect(database_jobs.values().flatten());
1003            let database_job_source_splits =
1004                collect_source_splits(database_jobs.values().flatten(), source_splits);
1005            let database_backfill_orders =
1006                UserDefinedFragmentBackfillOrder::merge(database_jobs.values().map(|job| {
1007                    if matches!(job.status, CreateStreamingJobStatus::Creating { .. }) {
1008                        job_backfill_orders(job_extra_info, job.job_id)
1009                    } else {
1010                        UserDefinedFragmentBackfillOrder::default()
1011                    }
1012                }));
1013            let database_backfill_orders =
1014                StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
1015                    database_backfill_orders,
1016                    fragment_relations,
1017                    || {
1018                        database_jobs.values().flat_map(|job_fragments| {
1019                            job_fragments
1020                                .fragment_infos
1021                                .iter()
1022                                .map(|(fragment_id, fragment)| {
1023                                    (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
1024                                })
1025                        })
1026                    },
1027                );
1028            let mutation = build_mutation(
1029                &database_job_source_splits,
1030                cdc_table_snapshot_split_assignment,
1031                &database_backfill_orders,
1032                is_paused,
1033            );
1034
1035            let partial_graph_id = to_partial_graph_id(database_id, None);
1036            self.recover_graph(
1037                partial_graph_id,
1038                mutation,
1039                &barrier_info,
1040                &nodes_actors,
1041                InflightFragmentInfo::existing_table_ids(database_jobs.values().flatten()),
1042                new_actors,
1043                DatabaseCheckpointControlMetrics::new(database_id),
1044            )?;
1045            debug!(
1046                %database_id,
1047                "inject initial barrier"
1048            );
1049        };
1050
1051        let mut independent_checkpoint_job_controls: HashMap<
1052            JobId,
1053            IndependentCheckpointJobControl,
1054        > = HashMap::new();
1055        for (job_id, (info, upstream_table_ids, committed_epoch, snapshot_epoch)) in
1056            ongoing_snapshot_backfill_jobs
1057        {
1058            let node_actors = edges.collect_actors_to_create(info.values().map(|fragment_infos| {
1059                (
1060                    fragment_infos.fragment_id,
1061                    &fragment_infos.nodes,
1062                    fragment_infos.actors.iter().map(move |(actor_id, actor)| {
1063                        (
1064                            stream_actors.get(actor_id).expect("should exist"),
1065                            actor.worker_id,
1066                        )
1067                    }),
1068                    vec![], // no subscribers for backfilling jobs,
1069                )
1070            }));
1071
1072            let database_job_source_splits =
1073                collect_source_splits(database_jobs.values().flatten(), source_splits);
1074            assert!(
1075                !cdc_table_snapshot_splits.contains_key(&job_id),
1076                "snapshot backfill job {job_id} should not have cdc backfill"
1077            );
1078            if is_paused {
1079                bail!("should not pause when having snapshot backfill job {job_id}");
1080            }
1081            let job_backfill_orders = job_backfill_orders(job_extra_info, job_id);
1082            let job_backfill_orders =
1083                StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
1084                    job_backfill_orders,
1085                    fragment_relations,
1086                    || {
1087                        info.iter().map(|(fragment_id, fragment)| {
1088                            (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
1089                        })
1090                    },
1091                );
1092            let mutation = build_mutation(
1093                &database_job_source_splits,
1094                Default::default(), // no cdc backfill job for
1095                &job_backfill_orders,
1096                false,
1097            );
1098
1099            let job = CreatingStreamingJobControl::recover(
1100                database_id,
1101                job_id,
1102                upstream_table_ids,
1103                &database_job_log_epochs,
1104                snapshot_epoch,
1105                committed_epoch,
1106                &barrier_info,
1107                info,
1108                job_backfill_orders,
1109                fragment_relations,
1110                hummock_version_stats,
1111                node_actors,
1112                mutation.clone(),
1113                self,
1114            )?;
1115            independent_checkpoint_job_controls.insert(
1116                job_id,
1117                IndependentCheckpointJobControl::CreatingStreamingJob(job),
1118            );
1119        }
1120
1121        // Recover batch refresh jobs (both idle and consuming snapshot).
1122        // Actors were already rendered by `render_runtime_info()`.
1123        for (job_id, render_result) in batch_refresh {
1124            background_jobs.remove(&job_id);
1125            debug!(%job_id, "recovered batch refresh job");
1126
1127            // Resolve committed epoch from state tables.
1128            let committed_epoch = resolve_jobs_committed_epoch(
1129                state_table_committed_epochs,
1130                InflightFragmentInfo::existing_table_ids(render_result.fragment_infos.values()),
1131            );
1132
1133            let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
1134                render_result
1135                    .fragment_infos
1136                    .values()
1137                    .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
1138            )?
1139            .0
1140            .ok_or_else(|| anyhow!("batch refresh job {} has no snapshot backfill info", job_id))?;
1141
1142            let upstream_table_ids: HashSet<TableId> = snapshot_backfill_info
1143                .upstream_mv_table_id_to_backfill_epoch
1144                .keys()
1145                .copied()
1146                .collect();
1147            let snapshot_epoch = snapshot_backfill_info
1148                .upstream_mv_table_id_to_backfill_epoch
1149                .values()
1150                .find_map(|e| *e)
1151                .unwrap_or(committed_epoch);
1152
1153            let job_backfill_orders = job_backfill_orders(job_extra_info, job_id);
1154            let job_backfill_orders =
1155                StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
1156                    job_backfill_orders,
1157                    fragment_relations,
1158                    || {
1159                        render_result
1160                            .fragment_infos
1161                            .iter()
1162                            .map(|(fid, f)| (*fid, f.fragment_type_mask, &f.nodes))
1163                    },
1164                );
1165            let mutation = build_mutation(
1166                &Default::default(), // batch refresh has no source splits
1167                Default::default(),
1168                &job_backfill_orders,
1169                false,
1170            );
1171
1172            let refresh_interval_sec = job_extra_info
1173                .get(&job_id)
1174                .and_then(|info| info.refresh_interval_sec)
1175                .expect("batch refresh job should have refresh_interval_sec in job extra info");
1176
1177            let job = BatchRefreshJobCheckpointControl::recover(
1178                database_id,
1179                job_id,
1180                upstream_table_ids,
1181                snapshot_epoch,
1182                committed_epoch,
1183                job_backfill_orders,
1184                hummock_version_stats,
1185                mutation,
1186                render_result,
1187                self,
1188                refresh_interval_sec,
1189            )?;
1190            independent_checkpoint_job_controls
1191                .insert(job_id, IndependentCheckpointJobControl::BatchRefresh(job));
1192        }
1193
1194        self.control_stream_manager()
1195            .env
1196            .shared_actor_infos()
1197            .recover_database(
1198                database_id,
1199                database_jobs
1200                    .values()
1201                    .flat_map(|info| {
1202                        info.fragment_infos()
1203                            .map(move |fragment| (fragment, info.job_id))
1204                    })
1205                    .chain(
1206                        independent_checkpoint_job_controls
1207                            .iter()
1208                            .flat_map(|(job_id, job)| {
1209                                let job_id = *job_id;
1210                                job.fragment_infos()
1211                                    .into_iter()
1212                                    .flat_map(move |infos| infos.values().map(move |f| (f, job_id)))
1213                            }),
1214                    ),
1215            );
1216
1217        let committed_epoch = barrier_info.prev_epoch();
1218        let new_epoch = barrier_info.curr_epoch;
1219        let database_info = InflightDatabaseInfo::recover(
1220            database_id,
1221            database_jobs.into_values(),
1222            self.control_stream_manager()
1223                .env
1224                .shared_actor_infos()
1225                .clone(),
1226        );
1227        let database_state = BarrierWorkerState::recovery(new_epoch, is_paused);
1228        Ok(DatabaseCheckpointControl::recovery(
1229            database_id,
1230            database_state,
1231            committed_epoch,
1232            database_info,
1233            independent_checkpoint_job_controls,
1234        ))
1235    }
1236}
1237
1238impl ControlStreamManager {
1239    fn connected_workers(&self) -> impl Iterator<Item = (WorkerId, &ControlStreamNode)> + '_ {
1240        self.workers
1241            .iter()
1242            .filter_map(|(worker_id, (_, worker_state))| match worker_state {
1243                WorkerNodeState::Connected { control_stream, .. } => {
1244                    Some((*worker_id, control_stream))
1245                }
1246                WorkerNodeState::Reconnecting(_) => None,
1247            })
1248    }
1249
1250    pub(super) fn inject_barrier(
1251        &mut self,
1252        partial_graph_id: PartialGraphId,
1253        mutation: Option<Mutation>,
1254        barrier_info: &BarrierInfo,
1255        node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
1256        table_ids_to_sync: impl Iterator<Item = TableId>,
1257        nodes_to_sync_table: impl Iterator<Item = WorkerId>,
1258        mut new_actors: Option<StreamJobActorsToCreate>,
1259    ) -> MetaResult<NodeToCollect> {
1260        fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
1261            "inject_barrier_err"
1262        ));
1263
1264        let nodes_to_sync_table: HashSet<_> = nodes_to_sync_table.collect();
1265
1266        nodes_to_sync_table.iter().for_each(|worker_id| {
1267            assert!(node_actors.contains_key(worker_id), "worker_id {worker_id} in nodes_to_sync_table {nodes_to_sync_table:?} but not in node_actors {node_actors:?}");
1268        });
1269
1270        let mut node_need_collect = NodeToCollect::new();
1271        let table_ids_to_sync = table_ids_to_sync.collect_vec();
1272
1273        node_actors.iter()
1274            .try_for_each(|(worker_id, actor_ids_to_collect)| {
1275                assert!(!actor_ids_to_collect.is_empty(), "empty actor_ids_to_collect on worker {worker_id} in node_actors {node_actors:?}");
1276                let table_ids_to_sync = if nodes_to_sync_table.contains(worker_id) {
1277                    table_ids_to_sync.clone()
1278                } else {
1279                    vec![]
1280                };
1281
1282                let node = if let Some((_, worker_state)) = self.workers.get(worker_id)
1283                    &&
1284                    let WorkerNodeState::Connected { control_stream, .. } = worker_state
1285                {
1286                    control_stream
1287                } else {
1288                    return Err(anyhow!("unconnected worker node {}", worker_id).into());
1289                };
1290
1291                {
1292                    let mutation = mutation.clone();
1293                    let barrier = Barrier {
1294                        epoch: Some(risingwave_pb::data::Epoch {
1295                            curr: barrier_info.curr_epoch(),
1296                            prev: barrier_info.prev_epoch(),
1297                        }),
1298                        mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
1299                        tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
1300                            .to_protobuf(),
1301                        kind: barrier_info.kind.to_protobuf() as i32,
1302                    };
1303
1304                    node.handle
1305                        .request_sender
1306                        .send(StreamingControlStreamRequest {
1307                            request: Some(
1308                                streaming_control_stream_request::Request::InjectBarrier(
1309                                    InjectBarrierRequest {
1310                                        request_id: Uuid::new_v4().to_string(),
1311                                        barrier: Some(barrier),
1312                                        actor_ids_to_collect: actor_ids_to_collect.iter().copied().collect(),
1313                                        table_ids_to_sync,
1314                                        partial_graph_id,
1315                                        actors_to_build: new_actors
1316                                            .as_mut()
1317                                            .map(|new_actors| new_actors.remove(worker_id))
1318                                            .into_iter()
1319                                            .flatten()
1320                                            .flatten()
1321                                            .map(|(fragment_id, (node, actors, initial_subscriber_ids))| {
1322                                                FragmentBuildActorInfo {
1323                                                    fragment_id,
1324                                                    node: Some(node),
1325                                                    actors: actors
1326                                                        .into_iter()
1327                                                        .map(|(actor, upstreams, dispatchers)| {
1328                                                            BuildActorInfo {
1329                                                                actor_id: actor.actor_id,
1330                                                                fragment_upstreams: upstreams
1331                                                                    .into_iter()
1332                                                                    .map(|(fragment_id, upstreams)| {
1333                                                                        (
1334                                                                            fragment_id,
1335                                                                            UpstreamActors {
1336                                                                                actors: upstreams
1337                                                                                    .into_values()
1338                                                                                    .collect(),
1339                                                                            },
1340                                                                        )
1341                                                                    })
1342                                                                    .collect(),
1343                                                                dispatchers,
1344                                                                vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
1345                                                                mview_definition: actor.mview_definition,
1346                                                                expr_context: actor.expr_context,
1347                                                                config_override: actor.config_override.to_string(),
1348                                                                initial_subscriber_ids: initial_subscriber_ids.iter().copied().collect(),
1349                                                            }
1350                                                        })
1351                                                        .collect(),
1352                                                }
1353                                            })
1354                                            .collect(),
1355                                    },
1356                                ),
1357                            ),
1358                        })
1359                        .map_err(|_| {
1360                            MetaError::from(anyhow!(
1361                                "failed to send request to {} {:?}",
1362                                node.worker_id,
1363                                node.host
1364                            ))
1365                        })?;
1366
1367                    node_need_collect.insert(*worker_id);
1368                    Result::<_, MetaError>::Ok(())
1369                }
1370            })
1371            .inspect_err(|e| {
1372                // Record failure in event log.
1373                use risingwave_pb::meta::event_log;
1374                let event = event_log::EventInjectBarrierFail {
1375                    prev_epoch: barrier_info.prev_epoch(),
1376                    cur_epoch: barrier_info.curr_epoch(),
1377                    error: e.to_report_string(),
1378                };
1379                self.env
1380                    .event_log_manager_ref()
1381                    .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
1382            })?;
1383        Ok(node_need_collect)
1384    }
1385
1386    pub(super) fn add_partial_graph(&mut self, partial_graph_id: PartialGraphId) {
1387        self.connected_workers().for_each(|(_, node)| {
1388            if node
1389                .handle
1390                .request_sender
1391                .send(StreamingControlStreamRequest {
1392                    request: Some(
1393                        streaming_control_stream_request::Request::CreatePartialGraph(
1394                            CreatePartialGraphRequest {
1395                                partial_graph_id,
1396                            },
1397                        ),
1398                    ),
1399                }).is_err() {
1400                let (database_id, creating_job_id) = from_partial_graph_id(partial_graph_id);
1401                warn!(%database_id, ?creating_job_id, worker_id = %node.worker_id, "fail to add partial graph to worker")
1402            }
1403        });
1404    }
1405
1406    pub(super) fn remove_partial_graphs(&mut self, partial_graph_ids: Vec<PartialGraphId>) {
1407        self.connected_workers().for_each(|(_, node)| {
1408            if node.handle
1409                .request_sender
1410                .send(StreamingControlStreamRequest {
1411                    request: Some(
1412                        streaming_control_stream_request::Request::RemovePartialGraph(
1413                            RemovePartialGraphRequest {
1414                                partial_graph_ids: partial_graph_ids.clone(),
1415                            },
1416                        ),
1417                    ),
1418                })
1419                .is_err()
1420            {
1421                warn!(worker_id = %node.worker_id,node = ?node.host,"failed to send remove partial graph request");
1422            }
1423        })
1424    }
1425
1426    pub(super) fn reset_partial_graphs(
1427        &mut self,
1428        partial_graph_ids: Vec<PartialGraphId>,
1429    ) -> HashSet<WorkerId> {
1430        self.connected_workers()
1431            .filter_map(|(worker_id, node)| {
1432                if node
1433                    .handle
1434                    .request_sender
1435                    .send(StreamingControlStreamRequest {
1436                        request: Some(
1437                            streaming_control_stream_request::Request::ResetPartialGraphs(
1438                                ResetPartialGraphsRequest {
1439                                    partial_graph_ids: partial_graph_ids.clone(),
1440                                },
1441                            ),
1442                        ),
1443                    })
1444                    .is_err()
1445                {
1446                    warn!(%worker_id, node = ?node.host,"failed to send reset database request");
1447                    None
1448                } else {
1449                    Some(worker_id)
1450                }
1451            })
1452            .collect()
1453    }
1454}
1455
1456impl GlobalBarrierWorkerContextImpl {
1457    pub(super) async fn new_control_stream_impl(
1458        &self,
1459        node: &WorkerNode,
1460        init_request: &PbInitRequest,
1461    ) -> MetaResult<StreamingControlHandle> {
1462        let handle = self
1463            .env
1464            .stream_client_pool()
1465            .get(node)
1466            .await?
1467            .start_streaming_control(init_request.clone())
1468            .await?;
1469        Ok(handle)
1470    }
1471}
1472
1473pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1474    message: &str,
1475    errors: impl IntoIterator<Item = (WorkerId, E)>,
1476) -> MetaError {
1477    use std::fmt::Write;
1478
1479    use risingwave_common::error::error_request_copy;
1480    use risingwave_common::error::tonic::extra::Score;
1481
1482    let errors = errors.into_iter().collect_vec();
1483
1484    if errors.is_empty() {
1485        return anyhow!(message.to_owned()).into();
1486    }
1487
1488    // Create the error from the single error.
1489    let single_error = |(worker_id, e)| {
1490        anyhow::Error::from(e)
1491            .context(format!("{message}, in worker node {worker_id}"))
1492            .into()
1493    };
1494
1495    if errors.len() == 1 {
1496        return single_error(errors.into_iter().next().unwrap());
1497    }
1498
1499    // Find the error with the highest score.
1500    let max_score = errors
1501        .iter()
1502        .filter_map(|(_, e)| error_request_copy::<Score>(e))
1503        .max();
1504
1505    if let Some(max_score) = max_score {
1506        let mut errors = errors;
1507        let max_scored = errors
1508            .extract_if(.., |(_, e)| {
1509                error_request_copy::<Score>(e) == Some(max_score)
1510            })
1511            .next()
1512            .unwrap();
1513
1514        return single_error(max_scored);
1515    }
1516
1517    // The errors do not have scores, so simply concatenate them.
1518    let concat: String = errors
1519        .into_iter()
1520        .fold(format!("{message}: "), |mut s, (w, e)| {
1521            write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1522            s
1523        });
1524    anyhow!(concat).into()
1525}
1526
1527#[cfg(test)]
1528mod test_partial_graph_id {
1529    use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
1530
1531    #[test]
1532    fn test_partial_graph_id_conversion() {
1533        let database_id = 233.into();
1534        let job_id = 233.into();
1535        assert_eq!(
1536            (database_id, None),
1537            from_partial_graph_id(to_partial_graph_id(database_id, None))
1538        );
1539        assert_eq!(
1540            (database_id, Some(job_id)),
1541            from_partial_graph_id(to_partial_graph_id(database_id, Some(job_id)))
1542        );
1543    }
1544}