risingwave_meta/barrier/
rpc.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::error::Error;
18use std::fmt::{Debug, Formatter};
19use std::future::poll_fn;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::future::{BoxFuture, join_all};
27use futures::{FutureExt, StreamExt};
28use itertools::Itertools;
29use risingwave_common::bail;
30use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
31use risingwave_common::id::JobId;
32use risingwave_common::util::epoch::Epoch;
33use risingwave_common::util::tracing::TracingContext;
34use risingwave_connector::source::SplitImpl;
35use risingwave_meta_model::WorkerId;
36use risingwave_pb::common::{HostAddress, WorkerNode};
37use risingwave_pb::hummock::HummockVersionStats;
38use risingwave_pb::source::{PbCdcTableSnapshotSplits, PbCdcTableSnapshotSplitsWithGeneration};
39use risingwave_pb::stream_plan::barrier_mutation::Mutation;
40use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation};
41use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
42use risingwave_pb::stream_service::inject_barrier_request::{
43    BuildActorInfo, FragmentBuildActorInfo,
44};
45use risingwave_pb::stream_service::streaming_control_stream_request::{
46    CreatePartialGraphRequest, PbCreatePartialGraphRequest, PbInitRequest,
47    RemovePartialGraphRequest, ResetDatabaseRequest,
48};
49use risingwave_pb::stream_service::{
50    BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest,
51    streaming_control_stream_request, streaming_control_stream_response,
52};
53use risingwave_rpc_client::StreamingControlHandle;
54use thiserror_ext::AsReport;
55use tokio::time::{Instant, sleep};
56use tokio_retry::strategy::ExponentialBackoff;
57use tracing::{debug, error, info, warn};
58use uuid::Uuid;
59
60use super::{BarrierKind, TracedEpoch};
61use crate::barrier::cdc_progress::CdcTableBackfillTracker;
62use crate::barrier::checkpoint::{
63    BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
64};
65use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
66use crate::barrier::edge_builder::FragmentEdgeBuildResult;
67use crate::barrier::info::{
68    BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
69    SubscriberType,
70};
71use crate::barrier::progress::CreateMviewProgressTracker;
72use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
73use crate::controller::fragment::InflightFragmentInfo;
74use crate::manager::MetaSrvEnv;
75use crate::model::{
76    ActorId, FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
77    SubscriptionId,
78};
79use crate::stream::cdc::{
80    CdcTableSnapshotSplits, is_parallelized_backfill_enabled_cdc_scan_fragment,
81};
82use crate::stream::{StreamFragmentGraph, build_actor_connector_splits};
83use crate::{MetaError, MetaResult};
84
85fn to_partial_graph_id(job_id: Option<JobId>) -> u32 {
86    job_id
87        .map(|job_id| {
88            assert_ne!(job_id, u32::MAX);
89            job_id.as_raw_id()
90        })
91        .unwrap_or(u32::MAX)
92}
93
94pub(super) fn from_partial_graph_id(partial_graph_id: u32) -> Option<JobId> {
95    if partial_graph_id == u32::MAX {
96        None
97    } else {
98        Some(JobId::new(partial_graph_id))
99    }
100}
101
102struct ControlStreamNode {
103    worker_id: WorkerId,
104    host: HostAddress,
105    handle: StreamingControlHandle,
106}
107
108enum WorkerNodeState {
109    Connected {
110        control_stream: ControlStreamNode,
111        removed: bool,
112    },
113    Reconnecting(BoxFuture<'static, StreamingControlHandle>),
114}
115
116pub(super) struct ControlStreamManager {
117    workers: HashMap<WorkerId, (WorkerNode, WorkerNodeState)>,
118    pub env: MetaSrvEnv,
119}
120
121impl ControlStreamManager {
122    pub(super) fn new(env: MetaSrvEnv) -> Self {
123        Self {
124            workers: Default::default(),
125            env,
126        }
127    }
128
129    pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
130        self.workers[&worker_id].0.host.clone().unwrap()
131    }
132
133    pub(super) async fn add_worker(
134        &mut self,
135        node: WorkerNode,
136        inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
137        term_id: String,
138        context: &impl GlobalBarrierWorkerContext,
139    ) {
140        let node_id = node.id;
141        if let Entry::Occupied(entry) = self.workers.entry(node_id) {
142            let (existing_node, worker_state) = entry.get();
143            assert_eq!(existing_node.host, node.host);
144            warn!(id = %node.id, host = ?node.host, "node already exists");
145            match worker_state {
146                WorkerNodeState::Connected { .. } => {
147                    warn!(id = %node.id, host = ?node.host, "new node already connected");
148                    return;
149                }
150                WorkerNodeState::Reconnecting(_) => {
151                    warn!(id = %node.id, host = ?node.host, "remove previous pending worker connect request and reconnect");
152                    entry.remove();
153                }
154            }
155        }
156        let node_host = node.host.clone().unwrap();
157        let mut backoff = ExponentialBackoff::from_millis(100)
158            .max_delay(Duration::from_secs(3))
159            .factor(5);
160        const MAX_RETRY: usize = 5;
161        for i in 1..=MAX_RETRY {
162            match context
163                .new_control_stream(
164                    &node,
165                    &PbInitRequest {
166                        term_id: term_id.clone(),
167                    },
168                )
169                .await
170            {
171                Ok(mut handle) => {
172                    WorkerNodeConnected {
173                        handle: &mut handle,
174                        node: &node,
175                    }
176                    .initialize(inflight_infos);
177                    info!(?node_host, "add control stream worker");
178                    assert!(
179                        self.workers
180                            .insert(
181                                node_id,
182                                (
183                                    node,
184                                    WorkerNodeState::Connected {
185                                        control_stream: ControlStreamNode {
186                                            worker_id: node_id as _,
187                                            host: node_host,
188                                            handle,
189                                        },
190                                        removed: false
191                                    }
192                                )
193                            )
194                            .is_none()
195                    );
196                    return;
197                }
198                Err(e) => {
199                    // It may happen that the dns information of newly registered worker node
200                    // has not been propagated to the meta node and cause error. Wait for a while and retry
201                    let delay = backoff.next().unwrap();
202                    error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
203                    sleep(delay).await;
204                }
205            }
206        }
207        error!(?node_host, "fail to create worker node after retry");
208    }
209
210    pub(super) fn remove_worker(&mut self, node: WorkerNode) {
211        if let Entry::Occupied(mut entry) = self.workers.entry(node.id) {
212            let (_, worker_state) = entry.get_mut();
213            match worker_state {
214                WorkerNodeState::Connected { removed, .. } => {
215                    info!(worker_id = %node.id, "mark connected worker as removed");
216                    *removed = true;
217                }
218                WorkerNodeState::Reconnecting(_) => {
219                    info!(worker_id = %node.id, "remove worker");
220                    entry.remove();
221                }
222            }
223        }
224    }
225
226    fn retry_connect(
227        node: WorkerNode,
228        term_id: String,
229        context: Arc<impl GlobalBarrierWorkerContext>,
230    ) -> BoxFuture<'static, StreamingControlHandle> {
231        async move {
232            let mut attempt = 0;
233            let backoff = ExponentialBackoff::from_millis(100)
234                .max_delay(Duration::from_mins(1))
235                .factor(5);
236            let init_request = PbInitRequest { term_id };
237            for delay in backoff {
238                attempt += 1;
239                sleep(delay).await;
240                match context.new_control_stream(&node, &init_request).await {
241                    Ok(handle) => {
242                        return handle;
243                    }
244                    Err(e) => {
245                        warn!(e = %e.as_report(), ?node, attempt, "fail to create control stream worker");
246                    }
247                }
248            }
249            unreachable!("end of retry backoff")
250        }.boxed()
251    }
252
253    pub(super) async fn recover(
254        env: MetaSrvEnv,
255        nodes: &HashMap<WorkerId, WorkerNode>,
256        term_id: &str,
257        context: Arc<impl GlobalBarrierWorkerContext>,
258    ) -> Self {
259        let reset_start_time = Instant::now();
260        let init_request = PbInitRequest {
261            term_id: term_id.to_owned(),
262        };
263        let init_request = &init_request;
264        let nodes = join_all(nodes.iter().map(|(worker_id, node)| async {
265            let result = context.new_control_stream(node, init_request).await;
266            (*worker_id, node.clone(), result)
267        }))
268        .await;
269        let mut unconnected_workers = HashSet::new();
270        let mut workers = HashMap::new();
271        for (worker_id, node, result) in nodes {
272            match result {
273                Ok(handle) => {
274                    let control_stream = ControlStreamNode {
275                        worker_id: node.id,
276                        host: node.host.clone().unwrap(),
277                        handle,
278                    };
279                    assert!(
280                        workers
281                            .insert(
282                                worker_id,
283                                (
284                                    node,
285                                    WorkerNodeState::Connected {
286                                        control_stream,
287                                        removed: false
288                                    }
289                                )
290                            )
291                            .is_none()
292                    );
293                }
294                Err(e) => {
295                    unconnected_workers.insert(worker_id);
296                    warn!(
297                        e = %e.as_report(),
298                        %worker_id,
299                        ?node,
300                        "failed to connect to node"
301                    );
302                    assert!(
303                        workers
304                            .insert(
305                                worker_id,
306                                (
307                                    node.clone(),
308                                    WorkerNodeState::Reconnecting(Self::retry_connect(
309                                        node,
310                                        term_id.to_owned(),
311                                        context.clone()
312                                    ))
313                                )
314                            )
315                            .is_none()
316                    );
317                }
318            }
319        }
320
321        info!(elapsed=?reset_start_time.elapsed(), ?unconnected_workers, "control stream reset");
322
323        Self { workers, env }
324    }
325
326    /// Clear all nodes and response streams in the manager.
327    pub(super) fn clear(&mut self) {
328        *self = Self::new(self.env.clone());
329    }
330}
331
332pub(super) struct WorkerNodeConnected<'a> {
333    node: &'a WorkerNode,
334    handle: &'a mut StreamingControlHandle,
335}
336
337impl<'a> WorkerNodeConnected<'a> {
338    pub(super) fn initialize(
339        self,
340        inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
341    ) {
342        for request in ControlStreamManager::collect_init_partial_graph(inflight_infos) {
343            if let Err(e) = self.handle.send_request(StreamingControlStreamRequest {
344                request: Some(
345                    streaming_control_stream_request::Request::CreatePartialGraph(request),
346                ),
347            }) {
348                warn!(e = %e.as_report(), node = ?self.node, "failed to send initial partial graph request");
349            }
350        }
351    }
352}
353
354pub(super) enum WorkerNodeEvent<'a> {
355    Response(MetaResult<streaming_control_stream_response::Response>),
356    Connected(WorkerNodeConnected<'a>),
357}
358
359impl ControlStreamManager {
360    fn poll_next_event<'a>(
361        this_opt: &mut Option<&'a mut Self>,
362        cx: &mut Context<'_>,
363        term_id: &str,
364        context: &Arc<impl GlobalBarrierWorkerContext>,
365        poll_reconnect: bool,
366    ) -> Poll<(WorkerId, WorkerNodeEvent<'a>)> {
367        let this = this_opt.as_mut().expect("Future polled after completion");
368        if this.workers.is_empty() {
369            return Poll::Pending;
370        }
371        {
372            for (&worker_id, (node, worker_state)) in &mut this.workers {
373                let control_stream = match worker_state {
374                    WorkerNodeState::Connected { control_stream, .. } => control_stream,
375                    WorkerNodeState::Reconnecting(_) if !poll_reconnect => {
376                        continue;
377                    }
378                    WorkerNodeState::Reconnecting(join_handle) => {
379                        match join_handle.poll_unpin(cx) {
380                            Poll::Ready(handle) => {
381                                info!(id=%node.id, host=?node.host, "reconnected to worker");
382                                *worker_state = WorkerNodeState::Connected {
383                                    control_stream: ControlStreamNode {
384                                        worker_id: node.id,
385                                        host: node.host.clone().unwrap(),
386                                        handle,
387                                    },
388                                    removed: false,
389                                };
390                                let this = this_opt.take().expect("should exist");
391                                let (node, worker_state) =
392                                    this.workers.get_mut(&worker_id).expect("should exist");
393                                let WorkerNodeState::Connected { control_stream, .. } =
394                                    worker_state
395                                else {
396                                    unreachable!()
397                                };
398                                return Poll::Ready((
399                                    worker_id,
400                                    WorkerNodeEvent::Connected(WorkerNodeConnected {
401                                        node,
402                                        handle: &mut control_stream.handle,
403                                    }),
404                                ));
405                            }
406                            Poll::Pending => {
407                                continue;
408                            }
409                        }
410                    }
411                };
412                match control_stream.handle.response_stream.poll_next_unpin(cx) {
413                    Poll::Ready(result) => {
414                        {
415                            let result = result
416                                .ok_or_else(|| (false, anyhow!("end of stream").into()))
417                                .and_then(|result| {
418                                    result.map_err(|err| -> (bool, MetaError) {(false, err.into())}).and_then(|resp| {
419                                        match resp
420                                            .response
421                                            .ok_or_else(|| (false, anyhow!("empty response").into()))?
422                                        {
423                                            streaming_control_stream_response::Response::Shutdown(_) => Err((true, anyhow!(
424                                                "worker node {worker_id} is shutting down"
425                                            )
426                                                .into())),
427                                            streaming_control_stream_response::Response::Init(_) => {
428                                                // This arm should be unreachable.
429                                                Err((false, anyhow!("get unexpected init response").into()))
430                                            }
431                                            resp => {
432                                                if let streaming_control_stream_response::Response::CompleteBarrier(barrier_resp) = &resp {
433                                                    assert_eq!(worker_id, barrier_resp.worker_id);
434                                                }
435                                                Ok(resp)
436                                            },
437                                        }
438                                    })
439                                });
440                            let result = match result {
441                                Ok(resp) => Ok(resp),
442                                Err((shutdown, err)) => {
443                                    warn!(worker_id = %node.id, host = ?node.host, err = %err.as_report(), "get error from response stream");
444                                    let WorkerNodeState::Connected { removed, .. } = worker_state
445                                    else {
446                                        unreachable!("checked connected")
447                                    };
448                                    if *removed || shutdown {
449                                        this.workers.remove(&worker_id);
450                                    } else {
451                                        *worker_state = WorkerNodeState::Reconnecting(
452                                            ControlStreamManager::retry_connect(
453                                                node.clone(),
454                                                term_id.to_owned(),
455                                                context.clone(),
456                                            ),
457                                        );
458                                    }
459                                    Err(err)
460                                }
461                            };
462                            return Poll::Ready((worker_id, WorkerNodeEvent::Response(result)));
463                        }
464                    }
465                    Poll::Pending => {
466                        continue;
467                    }
468                }
469            }
470        };
471
472        Poll::Pending
473    }
474
475    #[await_tree::instrument("control_stream_next_event")]
476    pub(super) async fn next_event<'a>(
477        &'a mut self,
478        term_id: &str,
479        context: &Arc<impl GlobalBarrierWorkerContext>,
480    ) -> (WorkerId, WorkerNodeEvent<'a>) {
481        let mut this = Some(self);
482        poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, true)).await
483    }
484
485    #[await_tree::instrument("control_stream_next_response")]
486    pub(super) async fn next_response(
487        &mut self,
488        term_id: &str,
489        context: &Arc<impl GlobalBarrierWorkerContext>,
490    ) -> (
491        WorkerId,
492        MetaResult<streaming_control_stream_response::Response>,
493    ) {
494        let mut this = Some(self);
495        let (worker_id, event) =
496            poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, false)).await;
497        match event {
498            WorkerNodeEvent::Response(result) => (worker_id, result),
499            WorkerNodeEvent::Connected(_) => {
500                unreachable!("set poll_reconnect=false")
501            }
502        }
503    }
504
505    fn collect_init_partial_graph(
506        initial_inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
507    ) -> impl Iterator<Item = PbCreatePartialGraphRequest> {
508        initial_inflight_infos.flat_map(|(database_id, creating_job_ids)| {
509            [PbCreatePartialGraphRequest {
510                partial_graph_id: to_partial_graph_id(None),
511                database_id,
512            }]
513            .into_iter()
514            .chain(
515                creating_job_ids.map(move |job_id| PbCreatePartialGraphRequest {
516                    partial_graph_id: to_partial_graph_id(Some(job_id)),
517                    database_id,
518                }),
519            )
520        })
521    }
522}
523
524pub(super) struct DatabaseInitialBarrierCollector {
525    database_id: DatabaseId,
526    node_to_collect: NodeToCollect,
527    database_state: BarrierWorkerState,
528    database_info: InflightDatabaseInfo,
529    creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
530    committed_epoch: u64,
531}
532
533impl Debug for DatabaseInitialBarrierCollector {
534    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
535        f.debug_struct("DatabaseInitialBarrierCollector")
536            .field("database_id", &self.database_id)
537            .field("node_to_collect", &self.node_to_collect)
538            .finish()
539    }
540}
541
542impl DatabaseInitialBarrierCollector {
543    pub(super) fn is_collected(&self) -> bool {
544        self.node_to_collect.is_empty()
545            && self
546                .creating_streaming_job_controls
547                .values()
548                .all(|job| job.is_empty())
549    }
550
551    pub(super) fn database_state(
552        &self,
553    ) -> (
554        &BarrierWorkerState,
555        &HashMap<JobId, CreatingStreamingJobControl>,
556    ) {
557        (&self.database_state, &self.creating_streaming_job_controls)
558    }
559
560    pub(super) fn collect_resp(&mut self, resp: BarrierCompleteResponse) {
561        assert_eq!(self.database_id, resp.database_id);
562        if let Some(creating_job_id) = from_partial_graph_id(resp.partial_graph_id) {
563            assert!(
564                !self
565                    .creating_streaming_job_controls
566                    .get_mut(&creating_job_id)
567                    .expect("should exist")
568                    .collect(resp),
569                "unlikely to finish backfill since just recovered"
570            );
571        } else {
572            assert_eq!(resp.epoch, self.committed_epoch);
573            assert!(self.node_to_collect.remove(&resp.worker_id).is_some());
574        }
575    }
576
577    pub(super) fn finish(self) -> DatabaseCheckpointControl {
578        assert!(self.is_collected());
579        DatabaseCheckpointControl::recovery(
580            self.database_id,
581            self.database_state,
582            self.committed_epoch,
583            self.database_info,
584            self.creating_streaming_job_controls,
585        )
586    }
587
588    pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
589        is_valid_after_worker_err(&mut self.node_to_collect, worker_id)
590            && self
591                .creating_streaming_job_controls
592                .values_mut()
593                .all(|job| job.is_valid_after_worker_err(worker_id))
594    }
595}
596
597impl ControlStreamManager {
598    /// Extract information from the loaded runtime barrier worker snapshot info, and inject the initial barrier.
599    #[expect(clippy::too_many_arguments)]
600    pub(super) fn inject_database_initial_barrier(
601        &mut self,
602        database_id: DatabaseId,
603        jobs: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
604        state_table_committed_epochs: &mut HashMap<TableId, u64>,
605        state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
606        fragment_relations: &FragmentDownstreamRelation,
607        edges: &mut FragmentEdgeBuildResult,
608        stream_actors: &HashMap<ActorId, StreamActor>,
609        source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
610        background_jobs: &mut HashMap<JobId, String>,
611        mv_depended_subscriptions: &mut HashMap<TableId, HashMap<SubscriptionId, u64>>,
612        is_paused: bool,
613        hummock_version_stats: &HummockVersionStats,
614        cdc_table_snapshot_splits: &mut HashMap<JobId, CdcTableSnapshotSplits>,
615    ) -> MetaResult<DatabaseInitialBarrierCollector> {
616        self.add_partial_graph(database_id, None);
617        fn collect_source_splits(
618            fragment_infos: impl Iterator<Item = &InflightFragmentInfo>,
619            source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
620        ) -> HashMap<ActorId, Vec<SplitImpl>> {
621            fragment_infos
622                .flat_map(|info| info.actors.keys())
623                .filter_map(|actor_id| {
624                    let actor_id = *actor_id as ActorId;
625                    source_splits
626                        .remove(&actor_id)
627                        .map(|splits| (actor_id, splits))
628                })
629                .collect()
630        }
631        fn build_mutation(
632            splits: &HashMap<ActorId, Vec<SplitImpl>>,
633            cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits>,
634            is_paused: bool,
635        ) -> Mutation {
636            Mutation::Add(AddMutation {
637                // Actors built during recovery is not treated as newly added actors.
638                actor_dispatchers: Default::default(),
639                added_actors: Default::default(),
640                actor_splits: build_actor_connector_splits(splits),
641                actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
642                    splits: cdc_table_snapshot_split_assignment,
643                }),
644                pause: is_paused,
645                subscriptions_to_add: Default::default(),
646                // TODO(kwannoel): recover using backfill order plan
647                backfill_nodes_to_pause: Default::default(),
648                new_upstream_sinks: Default::default(),
649            })
650        }
651
652        fn resolve_jobs_committed_epoch<'a>(
653            state_table_committed_epochs: &mut HashMap<TableId, u64>,
654            fragments: impl Iterator<Item = &'a InflightFragmentInfo> + 'a,
655        ) -> u64 {
656            let mut epochs = InflightFragmentInfo::existing_table_ids(fragments).map(|table_id| {
657                (
658                    table_id,
659                    state_table_committed_epochs
660                        .remove(&table_id)
661                        .expect("should exist"),
662                )
663            });
664            let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
665            for (table_id, epoch) in epochs {
666                assert_eq!(
667                    prev_epoch, epoch,
668                    "{} has different committed epoch to {}",
669                    first_table_id, table_id
670                );
671            }
672            prev_epoch
673        }
674
675        let mut subscribers: HashMap<_, HashMap<_, _>> = jobs
676            .keys()
677            .filter_map(|job_id| {
678                mv_depended_subscriptions
679                    .remove(&job_id.as_mv_table_id())
680                    .map(|subscriptions| {
681                        (
682                            job_id.as_mv_table_id(),
683                            subscriptions
684                                .into_iter()
685                                .map(|(subscription_id, retention)| {
686                                    (
687                                        subscription_id.as_subscriber_id(),
688                                        SubscriberType::Subscription(retention),
689                                    )
690                                })
691                                .collect(),
692                        )
693                    })
694            })
695            .collect();
696
697        let mut database_jobs = HashMap::new();
698        let mut snapshot_backfill_jobs = HashMap::new();
699
700        for (job_id, job_fragments) in jobs {
701            if let Some(definition) = background_jobs.remove(&job_id) {
702                if job_fragments.values().any(|fragment| {
703                    fragment
704                        .fragment_type_mask
705                        .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
706                }) {
707                    debug!(%job_id, definition, "recovered snapshot backfill job");
708                    snapshot_backfill_jobs.insert(job_id, (job_fragments, definition));
709                } else {
710                    database_jobs.insert(job_id, (job_fragments, Some(definition)));
711                }
712            } else {
713                database_jobs.insert(job_id, (job_fragments, None));
714            }
715        }
716
717        let database_job_log_epochs: HashMap<_, _> = database_jobs
718            .keys()
719            .filter_map(|job_id| {
720                state_table_log_epochs
721                    .remove(&job_id.as_mv_table_id())
722                    .map(|epochs| (job_id.as_mv_table_id(), epochs))
723            })
724            .collect();
725
726        let prev_epoch = resolve_jobs_committed_epoch(
727            state_table_committed_epochs,
728            database_jobs.values().flat_map(|(job, _)| job.values()),
729        );
730        let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
731        // Use a different `curr_epoch` for each recovery attempt.
732        let curr_epoch = prev_epoch.next();
733        let barrier_info = BarrierInfo {
734            prev_epoch,
735            curr_epoch,
736            kind: BarrierKind::Initial,
737        };
738
739        let mut ongoing_snapshot_backfill_jobs: HashMap<JobId, _> = HashMap::new();
740        for (job_id, (fragment_infos, definition)) in snapshot_backfill_jobs {
741            let committed_epoch =
742                resolve_jobs_committed_epoch(state_table_committed_epochs, fragment_infos.values());
743            if committed_epoch == barrier_info.prev_epoch() {
744                info!(
745                    "recovered creating snapshot backfill job {} catch up with upstream already",
746                    job_id
747                );
748                database_jobs
749                    .try_insert(job_id, (fragment_infos, Some(definition)))
750                    .expect("non-duplicate");
751                continue;
752            }
753            let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
754                fragment_infos
755                    .values()
756                    .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
757            )?
758            .0
759            .ok_or_else(|| {
760                anyhow!(
761                    "recovered snapshot backfill job {} has no snapshot backfill info",
762                    job_id
763                )
764            })?;
765            let mut snapshot_epoch = None;
766            let upstream_table_ids: HashSet<_> = snapshot_backfill_info
767                .upstream_mv_table_id_to_backfill_epoch
768                .keys()
769                .cloned()
770                .collect();
771            for (upstream_table_id, epoch) in
772                snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
773            {
774                let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
775                let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
776                if *snapshot_epoch != epoch {
777                    return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
778                }
779            }
780            let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
781                anyhow!(
782                    "snapshot backfill job {} has not set snapshot epoch",
783                    job_id
784                )
785            })?;
786            for upstream_table_id in &upstream_table_ids {
787                subscribers
788                    .entry(*upstream_table_id)
789                    .or_default()
790                    .try_insert(job_id.as_subscriber_id(), SubscriberType::SnapshotBackfill)
791                    .expect("non-duplicate");
792            }
793            ongoing_snapshot_backfill_jobs
794                .try_insert(
795                    job_id,
796                    (
797                        fragment_infos,
798                        definition,
799                        upstream_table_ids,
800                        committed_epoch,
801                        snapshot_epoch,
802                    ),
803                )
804                .expect("non-duplicated");
805        }
806
807        let mut cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits> =
808            HashMap::new();
809
810        let database_jobs: HashMap<JobId, InflightStreamingJobInfo> = {
811            database_jobs
812                .into_iter()
813                .map(|(job_id, (fragment_infos, background_job_definition))| {
814                    let status = if let Some(definition) = background_job_definition {
815                        CreateStreamingJobStatus::Creating(CreateMviewProgressTracker::recover(
816                            job_id,
817                            definition,
818                            &fragment_infos,
819                            Default::default(),
820                            hummock_version_stats,
821                        ))
822                    } else {
823                        CreateStreamingJobStatus::Created
824                    };
825                    let cdc_table_backfill_tracker =
826                        if let Some(splits) = cdc_table_snapshot_splits.remove(&job_id) {
827                            let cdc_fragment = fragment_infos
828                                .values()
829                                .find(|fragment| {
830                                    is_parallelized_backfill_enabled_cdc_scan_fragment(
831                                        fragment.fragment_type_mask,
832                                        &fragment.nodes,
833                                    )
834                                    .is_some()
835                                })
836                                .expect("should have parallel cdc fragment");
837                            let cdc_actors = cdc_fragment.actors.keys().copied().collect();
838                            let mut tracker =
839                                CdcTableBackfillTracker::restore(cdc_fragment.fragment_id, splits);
840                            cdc_table_snapshot_split_assignment
841                                .extend(tracker.reassign_splits(cdc_actors)?);
842                            Some(tracker)
843                        } else {
844                            None
845                        };
846                    Ok((
847                        job_id,
848                        InflightStreamingJobInfo {
849                            job_id,
850                            fragment_infos,
851                            subscribers: subscribers
852                                .remove(&job_id.as_mv_table_id())
853                                .unwrap_or_default(),
854                            status,
855                            cdc_table_backfill_tracker,
856                        },
857                    ))
858                })
859                .try_collect::<_, _, MetaError>()
860        }?;
861
862        let node_to_collect = {
863            let new_actors =
864                edges.collect_actors_to_create(database_jobs.values().flat_map(move |job| {
865                    job.fragment_infos.values().map(move |fragment_infos| {
866                        (
867                            fragment_infos.fragment_id,
868                            &fragment_infos.nodes,
869                            fragment_infos.actors.iter().map(move |(actor_id, actor)| {
870                                (
871                                    stream_actors.get(actor_id).expect("should exist"),
872                                    actor.worker_id,
873                                )
874                            }),
875                            job.subscribers.keys().copied(),
876                        )
877                    })
878                }));
879
880            let nodes_actors =
881                InflightFragmentInfo::actor_ids_to_collect(database_jobs.values().flatten());
882            let database_job_source_splits =
883                collect_source_splits(database_jobs.values().flatten(), source_splits);
884            let mutation = build_mutation(
885                &database_job_source_splits,
886                cdc_table_snapshot_split_assignment,
887                is_paused,
888            );
889
890            let node_to_collect = self.inject_barrier(
891                database_id,
892                None,
893                Some(mutation),
894                &barrier_info,
895                &nodes_actors,
896                InflightFragmentInfo::existing_table_ids(database_jobs.values().flatten()),
897                Some(new_actors),
898            )?;
899            debug!(
900                ?node_to_collect,
901                %database_id,
902                "inject initial barrier"
903            );
904            node_to_collect
905        };
906
907        let mut creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl> =
908            HashMap::new();
909        for (job_id, (info, definition, upstream_table_ids, committed_epoch, snapshot_epoch)) in
910            ongoing_snapshot_backfill_jobs
911        {
912            let node_actors = edges.collect_actors_to_create(info.values().map(|fragment_infos| {
913                (
914                    fragment_infos.fragment_id,
915                    &fragment_infos.nodes,
916                    fragment_infos.actors.iter().map(move |(actor_id, actor)| {
917                        (
918                            stream_actors.get(actor_id).expect("should exist"),
919                            actor.worker_id,
920                        )
921                    }),
922                    vec![], // no subscribers for backfilling jobs,
923                )
924            }));
925
926            let database_job_source_splits =
927                collect_source_splits(database_jobs.values().flatten(), source_splits);
928            assert!(
929                !cdc_table_snapshot_splits.contains_key(&job_id),
930                "snapshot backfill job {job_id} should not have cdc backfill"
931            );
932            if is_paused {
933                bail!("should not pause when having snapshot backfill job {job_id}");
934            }
935            let mutation = build_mutation(
936                &database_job_source_splits,
937                Default::default(), // no cdc backfill job for
938                false,
939            );
940
941            creating_streaming_job_controls.insert(
942                job_id,
943                CreatingStreamingJobControl::recover(
944                    database_id,
945                    job_id,
946                    definition,
947                    upstream_table_ids,
948                    &database_job_log_epochs,
949                    snapshot_epoch,
950                    committed_epoch,
951                    &barrier_info,
952                    info,
953                    fragment_relations,
954                    hummock_version_stats,
955                    node_actors,
956                    mutation.clone(),
957                    self,
958                )?,
959            );
960        }
961
962        self.env.shared_actor_infos().recover_database(
963            database_id,
964            database_jobs
965                .values()
966                .flat_map(|info| {
967                    info.fragment_infos()
968                        .map(move |fragment| (fragment, info.job_id))
969                })
970                .chain(
971                    creating_streaming_job_controls
972                        .values()
973                        .flat_map(|job| job.fragment_infos_with_job_id()),
974                ),
975        );
976
977        let committed_epoch = barrier_info.prev_epoch();
978        let new_epoch = barrier_info.curr_epoch;
979        let database_info = InflightDatabaseInfo::recover(
980            database_id,
981            database_jobs.into_values(),
982            self.env.shared_actor_infos().clone(),
983        );
984        let database_state = BarrierWorkerState::recovery(new_epoch, is_paused);
985        Ok(DatabaseInitialBarrierCollector {
986            database_id,
987            node_to_collect,
988            database_state,
989            database_info,
990            creating_streaming_job_controls,
991            committed_epoch,
992        })
993    }
994
995    fn connected_workers(&self) -> impl Iterator<Item = (WorkerId, &ControlStreamNode)> + '_ {
996        self.workers
997            .iter()
998            .filter_map(|(worker_id, (_, worker_state))| match worker_state {
999                WorkerNodeState::Connected { control_stream, .. } => {
1000                    Some((*worker_id, control_stream))
1001                }
1002                WorkerNodeState::Reconnecting(_) => None,
1003            })
1004    }
1005
1006    pub(super) fn inject_barrier(
1007        &mut self,
1008        database_id: DatabaseId,
1009        creating_job_id: Option<JobId>,
1010        mutation: Option<Mutation>,
1011        barrier_info: &BarrierInfo,
1012        node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
1013        table_ids_to_sync: impl Iterator<Item = TableId>,
1014        mut new_actors: Option<StreamJobActorsToCreate>,
1015    ) -> MetaResult<NodeToCollect> {
1016        fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
1017            "inject_barrier_err"
1018        ));
1019
1020        let partial_graph_id = to_partial_graph_id(creating_job_id);
1021
1022        for worker_id in node_actors.keys() {
1023            if let Some((_, worker_state)) = self.workers.get(worker_id)
1024                && let WorkerNodeState::Connected { .. } = worker_state
1025            {
1026            } else {
1027                return Err(anyhow!("unconnected worker node {}", worker_id).into());
1028            }
1029        }
1030
1031        let mut node_need_collect = HashMap::new();
1032        let table_ids_to_sync = table_ids_to_sync.collect_vec();
1033
1034        self.connected_workers()
1035            .try_for_each(|(node_id, node)| {
1036                let actor_ids_to_collect = node_actors
1037                    .get(&node_id)
1038                    .map(|actors| actors.iter().cloned())
1039                    .into_iter()
1040                    .flatten()
1041                    .collect_vec();
1042                let is_empty = actor_ids_to_collect.is_empty();
1043                {
1044                    let mutation = mutation.clone();
1045                    let barrier = Barrier {
1046                        epoch: Some(risingwave_pb::data::Epoch {
1047                            curr: barrier_info.curr_epoch(),
1048                            prev: barrier_info.prev_epoch(),
1049                        }),
1050                        mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
1051                        tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
1052                            .to_protobuf(),
1053                        kind: barrier_info.kind.to_protobuf() as i32,
1054                    };
1055
1056                    node.handle
1057                        .request_sender
1058                        .send(StreamingControlStreamRequest {
1059                            request: Some(
1060                                streaming_control_stream_request::Request::InjectBarrier(
1061                                    InjectBarrierRequest {
1062                                        request_id: Uuid::new_v4().to_string(),
1063                                        barrier: Some(barrier),
1064                                        database_id,
1065                                        actor_ids_to_collect,
1066                                        table_ids_to_sync: table_ids_to_sync.clone(),
1067                                        partial_graph_id,
1068                                        actors_to_build: new_actors
1069                                            .as_mut()
1070                                            .map(|new_actors| new_actors.remove(&(node_id as _)))
1071                                            .into_iter()
1072                                            .flatten()
1073                                            .flatten()
1074                                            .map(|(fragment_id, (node, actors, initial_subscriber_ids))| {
1075                                                FragmentBuildActorInfo {
1076                                                    fragment_id,
1077                                                    node: Some(node),
1078                                                    actors: actors
1079                                                        .into_iter()
1080                                                        .map(|(actor, upstreams, dispatchers)| {
1081                                                            BuildActorInfo {
1082                                                                actor_id: actor.actor_id,
1083                                                                fragment_upstreams: upstreams
1084                                                                    .into_iter()
1085                                                                    .map(|(fragment_id, upstreams)| {
1086                                                                        (
1087                                                                            fragment_id,
1088                                                                            UpstreamActors {
1089                                                                                actors: upstreams
1090                                                                                    .into_values()
1091                                                                                    .collect(),
1092                                                                            },
1093                                                                        )
1094                                                                    })
1095                                                                    .collect(),
1096                                                                dispatchers,
1097                                                                vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
1098                                                                mview_definition: actor.mview_definition,
1099                                                                expr_context: actor.expr_context,
1100                                                                config_override: actor.config_override.to_string(),
1101                                                                initial_subscriber_ids: initial_subscriber_ids.iter().copied().collect(),
1102                                                            }
1103                                                        })
1104                                                        .collect(),
1105                                                }
1106                                            })
1107                                            .collect(),
1108                                    },
1109                                ),
1110                            ),
1111                        })
1112                        .map_err(|_| {
1113                            MetaError::from(anyhow!(
1114                                "failed to send request to {} {:?}",
1115                                node.worker_id,
1116                                node.host
1117                            ))
1118                        })?;
1119
1120                    node_need_collect.insert(node_id as WorkerId, is_empty);
1121                    Result::<_, MetaError>::Ok(())
1122                }
1123            })
1124            .inspect_err(|e| {
1125                // Record failure in event log.
1126                use risingwave_pb::meta::event_log;
1127                let event = event_log::EventInjectBarrierFail {
1128                    prev_epoch: barrier_info.prev_epoch(),
1129                    cur_epoch: barrier_info.curr_epoch(),
1130                    error: e.to_report_string(),
1131                };
1132                self.env
1133                    .event_log_manager_ref()
1134                    .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
1135            })?;
1136        Ok(node_need_collect)
1137    }
1138
1139    pub(super) fn add_partial_graph(
1140        &mut self,
1141        database_id: DatabaseId,
1142        creating_job_id: Option<JobId>,
1143    ) {
1144        let partial_graph_id = to_partial_graph_id(creating_job_id);
1145        self.connected_workers().for_each(|(_, node)| {
1146            if node
1147                .handle
1148                .request_sender
1149                .send(StreamingControlStreamRequest {
1150                    request: Some(
1151                        streaming_control_stream_request::Request::CreatePartialGraph(
1152                            CreatePartialGraphRequest {
1153                                database_id,
1154                                partial_graph_id,
1155                            },
1156                        ),
1157                    ),
1158                }).is_err() {
1159                warn!(%database_id, ?creating_job_id, worker_id = %node.worker_id, "fail to add partial graph to worker")
1160            }
1161        });
1162    }
1163
1164    pub(super) fn remove_partial_graph(
1165        &mut self,
1166        database_id: DatabaseId,
1167        creating_job_ids: Vec<JobId>,
1168    ) {
1169        if creating_job_ids.is_empty() {
1170            return;
1171        }
1172        let partial_graph_ids = creating_job_ids
1173            .into_iter()
1174            .map(|job_id| to_partial_graph_id(Some(job_id)))
1175            .collect_vec();
1176        self.connected_workers().for_each(|(_, node)| {
1177            if node.handle
1178                .request_sender
1179                .send(StreamingControlStreamRequest {
1180                    request: Some(
1181                        streaming_control_stream_request::Request::RemovePartialGraph(
1182                            RemovePartialGraphRequest {
1183                                partial_graph_ids: partial_graph_ids.clone(),
1184                                database_id,
1185                            },
1186                        ),
1187                    ),
1188                })
1189                .is_err()
1190            {
1191                warn!(worker_id = %node.worker_id,node = ?node.host,"failed to send remove partial graph request");
1192            }
1193        })
1194    }
1195
1196    pub(super) fn reset_database(
1197        &mut self,
1198        database_id: DatabaseId,
1199        reset_request_id: u32,
1200    ) -> HashSet<WorkerId> {
1201        self.connected_workers()
1202            .filter_map(|(worker_id, node)| {
1203                if node
1204                    .handle
1205                    .request_sender
1206                    .send(StreamingControlStreamRequest {
1207                        request: Some(streaming_control_stream_request::Request::ResetDatabase(
1208                            ResetDatabaseRequest {
1209                                database_id,
1210                                reset_request_id,
1211                            },
1212                        )),
1213                    })
1214                    .is_err()
1215                {
1216                    warn!(%worker_id, node = ?node.host,"failed to send reset database request");
1217                    None
1218                } else {
1219                    Some(worker_id)
1220                }
1221            })
1222            .collect()
1223    }
1224}
1225
1226impl GlobalBarrierWorkerContextImpl {
1227    pub(super) async fn new_control_stream_impl(
1228        &self,
1229        node: &WorkerNode,
1230        init_request: &PbInitRequest,
1231    ) -> MetaResult<StreamingControlHandle> {
1232        let handle = self
1233            .env
1234            .stream_client_pool()
1235            .get(node)
1236            .await?
1237            .start_streaming_control(init_request.clone())
1238            .await?;
1239        Ok(handle)
1240    }
1241}
1242
1243pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1244    message: &str,
1245    errors: impl IntoIterator<Item = (WorkerId, E)>,
1246) -> MetaError {
1247    use std::fmt::Write;
1248
1249    use risingwave_common::error::error_request_copy;
1250    use risingwave_common::error::tonic::extra::Score;
1251
1252    let errors = errors.into_iter().collect_vec();
1253
1254    if errors.is_empty() {
1255        return anyhow!(message.to_owned()).into();
1256    }
1257
1258    // Create the error from the single error.
1259    let single_error = |(worker_id, e)| {
1260        anyhow::Error::from(e)
1261            .context(format!("{message}, in worker node {worker_id}"))
1262            .into()
1263    };
1264
1265    if errors.len() == 1 {
1266        return single_error(errors.into_iter().next().unwrap());
1267    }
1268
1269    // Find the error with the highest score.
1270    let max_score = errors
1271        .iter()
1272        .filter_map(|(_, e)| error_request_copy::<Score>(e))
1273        .max();
1274
1275    if let Some(max_score) = max_score {
1276        let mut errors = errors;
1277        let max_scored = errors
1278            .extract_if(.., |(_, e)| {
1279                error_request_copy::<Score>(e) == Some(max_score)
1280            })
1281            .next()
1282            .unwrap();
1283
1284        return single_error(max_scored);
1285    }
1286
1287    // The errors do not have scores, so simply concatenate them.
1288    let concat: String = errors
1289        .into_iter()
1290        .fold(format!("{message}: "), |mut s, (w, e)| {
1291            write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1292            s
1293        });
1294    anyhow!(concat).into()
1295}