risingwave_meta/barrier/
rpc.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::error::Error;
18use std::fmt::{Debug, Formatter};
19use std::future::poll_fn;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::future::{BoxFuture, join_all};
27use futures::{FutureExt, StreamExt};
28use itertools::Itertools;
29use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
30use risingwave_common::id::JobId;
31use risingwave_common::util::epoch::Epoch;
32use risingwave_common::util::tracing::TracingContext;
33use risingwave_connector::source::SplitImpl;
34use risingwave_connector::source::cdc::{
35    CdcTableSnapshotSplitAssignmentWithGeneration,
36    build_pb_actor_cdc_table_snapshot_splits_with_generation,
37};
38use risingwave_meta_model::WorkerId;
39use risingwave_pb::common::{HostAddress, WorkerNode};
40use risingwave_pb::hummock::HummockVersionStats;
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation};
43use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
44use risingwave_pb::stream_service::inject_barrier_request::{
45    BuildActorInfo, FragmentBuildActorInfo,
46};
47use risingwave_pb::stream_service::streaming_control_stream_request::{
48    CreatePartialGraphRequest, PbCreatePartialGraphRequest, PbInitRequest,
49    RemovePartialGraphRequest, ResetDatabaseRequest,
50};
51use risingwave_pb::stream_service::{
52    BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest,
53    streaming_control_stream_request, streaming_control_stream_response,
54};
55use risingwave_rpc_client::StreamingControlHandle;
56use thiserror_ext::AsReport;
57use tokio::time::{Instant, sleep};
58use tokio_retry::strategy::ExponentialBackoff;
59use tracing::{debug, error, info, warn};
60use uuid::Uuid;
61
62use super::{BarrierKind, TracedEpoch};
63use crate::barrier::cdc_progress::CdcTableBackfillTrackerRef;
64use crate::barrier::checkpoint::{
65    BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
66};
67use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
68use crate::barrier::edge_builder::FragmentEdgeBuildResult;
69use crate::barrier::info::{
70    BarrierInfo, CreateStreamingJobStatus, InflightStreamingJobInfo, SubscriberType,
71};
72use crate::barrier::progress::CreateMviewProgressTracker;
73use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
74use crate::controller::fragment::InflightFragmentInfo;
75use crate::manager::MetaSrvEnv;
76use crate::model::{ActorId, FragmentId, StreamActor, StreamJobActorsToCreate, SubscriptionId};
77use crate::stream::{StreamFragmentGraph, build_actor_connector_splits};
78use crate::{MetaError, MetaResult};
79
80fn to_partial_graph_id(job_id: Option<JobId>) -> u32 {
81    job_id
82        .map(|job_id| {
83            assert_ne!(job_id, u32::MAX);
84            job_id.as_raw_id()
85        })
86        .unwrap_or(u32::MAX)
87}
88
89pub(super) fn from_partial_graph_id(partial_graph_id: u32) -> Option<JobId> {
90    if partial_graph_id == u32::MAX {
91        None
92    } else {
93        Some(JobId::new(partial_graph_id))
94    }
95}
96
97struct ControlStreamNode {
98    worker_id: WorkerId,
99    host: HostAddress,
100    handle: StreamingControlHandle,
101}
102
103enum WorkerNodeState {
104    Connected {
105        control_stream: ControlStreamNode,
106        removed: bool,
107    },
108    Reconnecting(BoxFuture<'static, StreamingControlHandle>),
109}
110
111pub(super) struct ControlStreamManager {
112    workers: HashMap<WorkerId, (WorkerNode, WorkerNodeState)>,
113    pub env: MetaSrvEnv,
114}
115
116impl ControlStreamManager {
117    pub(super) fn new(env: MetaSrvEnv) -> Self {
118        Self {
119            workers: Default::default(),
120            env,
121        }
122    }
123
124    pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
125        self.workers[&worker_id].0.host.clone().unwrap()
126    }
127
128    pub(super) async fn add_worker(
129        &mut self,
130        node: WorkerNode,
131        inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
132        term_id: String,
133        context: &impl GlobalBarrierWorkerContext,
134    ) {
135        let node_id = node.id;
136        if let Entry::Occupied(entry) = self.workers.entry(node_id) {
137            let (existing_node, worker_state) = entry.get();
138            assert_eq!(existing_node.host, node.host);
139            warn!(id = %node.id, host = ?node.host, "node already exists");
140            match worker_state {
141                WorkerNodeState::Connected { .. } => {
142                    warn!(id = %node.id, host = ?node.host, "new node already connected");
143                    return;
144                }
145                WorkerNodeState::Reconnecting(_) => {
146                    warn!(id = %node.id, host = ?node.host, "remove previous pending worker connect request and reconnect");
147                    entry.remove();
148                }
149            }
150        }
151        let node_host = node.host.clone().unwrap();
152        let mut backoff = ExponentialBackoff::from_millis(100)
153            .max_delay(Duration::from_secs(3))
154            .factor(5);
155        const MAX_RETRY: usize = 5;
156        for i in 1..=MAX_RETRY {
157            match context
158                .new_control_stream(
159                    &node,
160                    &PbInitRequest {
161                        term_id: term_id.clone(),
162                    },
163                )
164                .await
165            {
166                Ok(mut handle) => {
167                    WorkerNodeConnected {
168                        handle: &mut handle,
169                        node: &node,
170                    }
171                    .initialize(inflight_infos);
172                    info!(?node_host, "add control stream worker");
173                    assert!(
174                        self.workers
175                            .insert(
176                                node_id,
177                                (
178                                    node,
179                                    WorkerNodeState::Connected {
180                                        control_stream: ControlStreamNode {
181                                            worker_id: node_id as _,
182                                            host: node_host,
183                                            handle,
184                                        },
185                                        removed: false
186                                    }
187                                )
188                            )
189                            .is_none()
190                    );
191                    return;
192                }
193                Err(e) => {
194                    // It may happen that the dns information of newly registered worker node
195                    // has not been propagated to the meta node and cause error. Wait for a while and retry
196                    let delay = backoff.next().unwrap();
197                    error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
198                    sleep(delay).await;
199                }
200            }
201        }
202        error!(?node_host, "fail to create worker node after retry");
203    }
204
205    pub(super) fn remove_worker(&mut self, node: WorkerNode) {
206        if let Entry::Occupied(mut entry) = self.workers.entry(node.id) {
207            let (_, worker_state) = entry.get_mut();
208            match worker_state {
209                WorkerNodeState::Connected { removed, .. } => {
210                    info!(worker_id = %node.id, "mark connected worker as removed");
211                    *removed = true;
212                }
213                WorkerNodeState::Reconnecting(_) => {
214                    info!(worker_id = %node.id, "remove worker");
215                    entry.remove();
216                }
217            }
218        }
219    }
220
221    fn retry_connect(
222        node: WorkerNode,
223        term_id: String,
224        context: Arc<impl GlobalBarrierWorkerContext>,
225    ) -> BoxFuture<'static, StreamingControlHandle> {
226        async move {
227            let mut attempt = 0;
228            let backoff = ExponentialBackoff::from_millis(100)
229                .max_delay(Duration::from_mins(1))
230                .factor(5);
231            let init_request = PbInitRequest { term_id };
232            for delay in backoff {
233                attempt += 1;
234                sleep(delay).await;
235                match context.new_control_stream(&node, &init_request).await {
236                    Ok(handle) => {
237                        return handle;
238                    }
239                    Err(e) => {
240                        warn!(e = %e.as_report(), ?node, attempt, "fail to create control stream worker");
241                    }
242                }
243            }
244            unreachable!("end of retry backoff")
245        }.boxed()
246    }
247
248    pub(super) async fn recover(
249        env: MetaSrvEnv,
250        nodes: &HashMap<WorkerId, WorkerNode>,
251        term_id: &str,
252        context: Arc<impl GlobalBarrierWorkerContext>,
253    ) -> Self {
254        let reset_start_time = Instant::now();
255        let init_request = PbInitRequest {
256            term_id: term_id.to_owned(),
257        };
258        let init_request = &init_request;
259        let nodes = join_all(nodes.iter().map(|(worker_id, node)| async {
260            let result = context.new_control_stream(node, init_request).await;
261            (*worker_id, node.clone(), result)
262        }))
263        .await;
264        let mut unconnected_workers = HashSet::new();
265        let mut workers = HashMap::new();
266        for (worker_id, node, result) in nodes {
267            match result {
268                Ok(handle) => {
269                    let control_stream = ControlStreamNode {
270                        worker_id: node.id,
271                        host: node.host.clone().unwrap(),
272                        handle,
273                    };
274                    assert!(
275                        workers
276                            .insert(
277                                worker_id,
278                                (
279                                    node,
280                                    WorkerNodeState::Connected {
281                                        control_stream,
282                                        removed: false
283                                    }
284                                )
285                            )
286                            .is_none()
287                    );
288                }
289                Err(e) => {
290                    unconnected_workers.insert(worker_id);
291                    warn!(
292                        e = %e.as_report(),
293                        %worker_id,
294                        ?node,
295                        "failed to connect to node"
296                    );
297                    assert!(
298                        workers
299                            .insert(
300                                worker_id,
301                                (
302                                    node.clone(),
303                                    WorkerNodeState::Reconnecting(Self::retry_connect(
304                                        node,
305                                        term_id.to_owned(),
306                                        context.clone()
307                                    ))
308                                )
309                            )
310                            .is_none()
311                    );
312                }
313            }
314        }
315
316        info!(elapsed=?reset_start_time.elapsed(), ?unconnected_workers, "control stream reset");
317
318        Self { workers, env }
319    }
320
321    /// Clear all nodes and response streams in the manager.
322    pub(super) fn clear(&mut self) {
323        *self = Self::new(self.env.clone());
324    }
325}
326
327pub(super) struct WorkerNodeConnected<'a> {
328    node: &'a WorkerNode,
329    handle: &'a mut StreamingControlHandle,
330}
331
332impl<'a> WorkerNodeConnected<'a> {
333    pub(super) fn initialize(
334        self,
335        inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
336    ) {
337        for request in ControlStreamManager::collect_init_partial_graph(inflight_infos) {
338            if let Err(e) = self.handle.send_request(StreamingControlStreamRequest {
339                request: Some(
340                    streaming_control_stream_request::Request::CreatePartialGraph(request),
341                ),
342            }) {
343                warn!(e = %e.as_report(), node = ?self.node, "failed to send initial partial graph request");
344            }
345        }
346    }
347}
348
349pub(super) enum WorkerNodeEvent<'a> {
350    Response(MetaResult<streaming_control_stream_response::Response>),
351    Connected(WorkerNodeConnected<'a>),
352}
353
354impl ControlStreamManager {
355    fn poll_next_event<'a>(
356        this_opt: &mut Option<&'a mut Self>,
357        cx: &mut Context<'_>,
358        term_id: &str,
359        context: &Arc<impl GlobalBarrierWorkerContext>,
360        poll_reconnect: bool,
361    ) -> Poll<(WorkerId, WorkerNodeEvent<'a>)> {
362        let this = this_opt.as_mut().expect("Future polled after completion");
363        if this.workers.is_empty() {
364            return Poll::Pending;
365        }
366        {
367            for (&worker_id, (node, worker_state)) in &mut this.workers {
368                let control_stream = match worker_state {
369                    WorkerNodeState::Connected { control_stream, .. } => control_stream,
370                    WorkerNodeState::Reconnecting(_) if !poll_reconnect => {
371                        continue;
372                    }
373                    WorkerNodeState::Reconnecting(join_handle) => {
374                        match join_handle.poll_unpin(cx) {
375                            Poll::Ready(handle) => {
376                                info!(id=%node.id, host=?node.host, "reconnected to worker");
377                                *worker_state = WorkerNodeState::Connected {
378                                    control_stream: ControlStreamNode {
379                                        worker_id: node.id,
380                                        host: node.host.clone().unwrap(),
381                                        handle,
382                                    },
383                                    removed: false,
384                                };
385                                let this = this_opt.take().expect("should exist");
386                                let (node, worker_state) =
387                                    this.workers.get_mut(&worker_id).expect("should exist");
388                                let WorkerNodeState::Connected { control_stream, .. } =
389                                    worker_state
390                                else {
391                                    unreachable!()
392                                };
393                                return Poll::Ready((
394                                    worker_id,
395                                    WorkerNodeEvent::Connected(WorkerNodeConnected {
396                                        node,
397                                        handle: &mut control_stream.handle,
398                                    }),
399                                ));
400                            }
401                            Poll::Pending => {
402                                continue;
403                            }
404                        }
405                    }
406                };
407                match control_stream.handle.response_stream.poll_next_unpin(cx) {
408                    Poll::Ready(result) => {
409                        {
410                            let result = result
411                                .ok_or_else(|| (false, anyhow!("end of stream").into()))
412                                .and_then(|result| {
413                                    result.map_err(|err| -> (bool, MetaError) {(false, err.into())}).and_then(|resp| {
414                                        match resp
415                                            .response
416                                            .ok_or_else(|| (false, anyhow!("empty response").into()))?
417                                        {
418                                            streaming_control_stream_response::Response::Shutdown(_) => Err((true, anyhow!(
419                                                "worker node {worker_id} is shutting down"
420                                            )
421                                                .into())),
422                                            streaming_control_stream_response::Response::Init(_) => {
423                                                // This arm should be unreachable.
424                                                Err((false, anyhow!("get unexpected init response").into()))
425                                            }
426                                            resp => {
427                                                if let streaming_control_stream_response::Response::CompleteBarrier(barrier_resp) = &resp {
428                                                    assert_eq!(worker_id, barrier_resp.worker_id);
429                                                }
430                                                Ok(resp)
431                                            },
432                                        }
433                                    })
434                                });
435                            let result = match result {
436                                Ok(resp) => Ok(resp),
437                                Err((shutdown, err)) => {
438                                    warn!(worker_id = %node.id, host = ?node.host, err = %err.as_report(), "get error from response stream");
439                                    let WorkerNodeState::Connected { removed, .. } = worker_state
440                                    else {
441                                        unreachable!("checked connected")
442                                    };
443                                    if *removed || shutdown {
444                                        this.workers.remove(&worker_id);
445                                    } else {
446                                        *worker_state = WorkerNodeState::Reconnecting(
447                                            ControlStreamManager::retry_connect(
448                                                node.clone(),
449                                                term_id.to_owned(),
450                                                context.clone(),
451                                            ),
452                                        );
453                                    }
454                                    Err(err)
455                                }
456                            };
457                            return Poll::Ready((worker_id, WorkerNodeEvent::Response(result)));
458                        }
459                    }
460                    Poll::Pending => {
461                        continue;
462                    }
463                }
464            }
465        };
466
467        Poll::Pending
468    }
469
470    #[await_tree::instrument("control_stream_next_event")]
471    pub(super) async fn next_event<'a>(
472        &'a mut self,
473        term_id: &str,
474        context: &Arc<impl GlobalBarrierWorkerContext>,
475    ) -> (WorkerId, WorkerNodeEvent<'a>) {
476        let mut this = Some(self);
477        poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, true)).await
478    }
479
480    #[await_tree::instrument("control_stream_next_response")]
481    pub(super) async fn next_response(
482        &mut self,
483        term_id: &str,
484        context: &Arc<impl GlobalBarrierWorkerContext>,
485    ) -> (
486        WorkerId,
487        MetaResult<streaming_control_stream_response::Response>,
488    ) {
489        let mut this = Some(self);
490        let (worker_id, event) =
491            poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, false)).await;
492        match event {
493            WorkerNodeEvent::Response(result) => (worker_id, result),
494            WorkerNodeEvent::Connected(_) => {
495                unreachable!("set poll_reconnect=false")
496            }
497        }
498    }
499
500    fn collect_init_partial_graph(
501        initial_inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
502    ) -> impl Iterator<Item = PbCreatePartialGraphRequest> {
503        initial_inflight_infos.flat_map(|(database_id, creating_job_ids)| {
504            [PbCreatePartialGraphRequest {
505                partial_graph_id: to_partial_graph_id(None),
506                database_id,
507            }]
508            .into_iter()
509            .chain(
510                creating_job_ids.map(move |job_id| PbCreatePartialGraphRequest {
511                    partial_graph_id: to_partial_graph_id(Some(job_id)),
512                    database_id,
513                }),
514            )
515        })
516    }
517}
518
519pub(super) struct DatabaseInitialBarrierCollector {
520    database_id: DatabaseId,
521    node_to_collect: NodeToCollect,
522    database_state: BarrierWorkerState,
523    creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
524    committed_epoch: u64,
525    cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
526}
527
528impl Debug for DatabaseInitialBarrierCollector {
529    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
530        f.debug_struct("DatabaseInitialBarrierCollector")
531            .field("database_id", &self.database_id)
532            .field("node_to_collect", &self.node_to_collect)
533            .finish()
534    }
535}
536
537impl DatabaseInitialBarrierCollector {
538    pub(super) fn is_collected(&self) -> bool {
539        self.node_to_collect.is_empty()
540            && self
541                .creating_streaming_job_controls
542                .values()
543                .all(|job| job.is_empty())
544    }
545
546    pub(super) fn database_state(
547        &self,
548    ) -> (
549        &BarrierWorkerState,
550        &HashMap<JobId, CreatingStreamingJobControl>,
551    ) {
552        (&self.database_state, &self.creating_streaming_job_controls)
553    }
554
555    pub(super) fn collect_resp(&mut self, resp: BarrierCompleteResponse) {
556        assert_eq!(self.database_id, resp.database_id);
557        if let Some(creating_job_id) = from_partial_graph_id(resp.partial_graph_id) {
558            self.creating_streaming_job_controls
559                .get_mut(&creating_job_id)
560                .expect("should exist")
561                .collect(resp);
562        } else {
563            assert_eq!(resp.epoch, self.committed_epoch);
564            assert!(self.node_to_collect.remove(&resp.worker_id).is_some());
565        }
566    }
567
568    pub(super) fn finish(self) -> DatabaseCheckpointControl {
569        assert!(self.is_collected());
570        DatabaseCheckpointControl::recovery(
571            self.database_id,
572            self.database_state,
573            self.committed_epoch,
574            self.creating_streaming_job_controls,
575            self.cdc_table_backfill_tracker,
576        )
577    }
578
579    pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
580        is_valid_after_worker_err(&mut self.node_to_collect, worker_id)
581            && self
582                .creating_streaming_job_controls
583                .values_mut()
584                .all(|job| job.is_valid_after_worker_err(worker_id))
585    }
586}
587
588impl ControlStreamManager {
589    /// Extract information from the loaded runtime barrier worker snapshot info, and inject the initial barrier.
590    #[expect(clippy::too_many_arguments)]
591    pub(super) fn inject_database_initial_barrier(
592        &mut self,
593        database_id: DatabaseId,
594        jobs: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
595        state_table_committed_epochs: &mut HashMap<TableId, u64>,
596        state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
597        edges: &mut FragmentEdgeBuildResult,
598        stream_actors: &HashMap<ActorId, StreamActor>,
599        source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
600        background_jobs: &mut HashMap<JobId, String>,
601        mv_depended_subscriptions: &mut HashMap<TableId, HashMap<SubscriptionId, u64>>,
602        is_paused: bool,
603        hummock_version_stats: &HummockVersionStats,
604        cdc_table_snapshot_split_assignment: &mut CdcTableSnapshotSplitAssignmentWithGeneration,
605    ) -> MetaResult<DatabaseInitialBarrierCollector> {
606        self.add_partial_graph(database_id, None);
607        let source_split_assignments = jobs
608            .values()
609            .flat_map(|fragments| fragments.values())
610            .flat_map(|info| info.actors.keys())
611            .filter_map(|actor_id| {
612                let actor_id = *actor_id as ActorId;
613                source_splits
614                    .remove(&actor_id)
615                    .map(|splits| (actor_id, splits))
616            })
617            .collect();
618        let database_cdc_table_snapshot_split_assignment = jobs
619            .values()
620            .flat_map(|fragments| fragments.values())
621            .flat_map(|info| info.actors.keys())
622            .filter_map(|actor_id| {
623                let actor_id = *actor_id as ActorId;
624                cdc_table_snapshot_split_assignment
625                    .splits
626                    .remove(&actor_id)
627                    .map(|splits| (actor_id, splits))
628            })
629            .collect();
630        let database_cdc_table_snapshot_split_assignment =
631            CdcTableSnapshotSplitAssignmentWithGeneration::new(
632                database_cdc_table_snapshot_split_assignment,
633                cdc_table_snapshot_split_assignment.generation,
634            );
635        let mutation = Mutation::Add(AddMutation {
636            // Actors built during recovery is not treated as newly added actors.
637            actor_dispatchers: Default::default(),
638            added_actors: Default::default(),
639            actor_splits: build_actor_connector_splits(&source_split_assignments),
640            actor_cdc_table_snapshot_splits:
641                build_pb_actor_cdc_table_snapshot_splits_with_generation(
642                    database_cdc_table_snapshot_split_assignment,
643                )
644                .into(),
645            pause: is_paused,
646            subscriptions_to_add: Default::default(),
647            // TODO(kwannoel): recover using backfill order plan
648            backfill_nodes_to_pause: Default::default(),
649            new_upstream_sinks: Default::default(),
650        });
651
652        fn resolve_jobs_committed_epoch<'a>(
653            state_table_committed_epochs: &mut HashMap<TableId, u64>,
654            fragments: impl Iterator<Item = &'a InflightFragmentInfo> + 'a,
655        ) -> u64 {
656            let mut epochs = InflightFragmentInfo::existing_table_ids(fragments).map(|table_id| {
657                (
658                    table_id,
659                    state_table_committed_epochs
660                        .remove(&table_id)
661                        .expect("should exist"),
662                )
663            });
664            let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
665            for (table_id, epoch) in epochs {
666                assert_eq!(
667                    prev_epoch, epoch,
668                    "{} has different committed epoch to {}",
669                    first_table_id, table_id
670                );
671            }
672            prev_epoch
673        }
674
675        let mut subscribers: HashMap<_, HashMap<_, _>> = jobs
676            .keys()
677            .filter_map(|job_id| {
678                mv_depended_subscriptions
679                    .remove(&job_id.as_mv_table_id())
680                    .map(|subscriptions| {
681                        (
682                            job_id.as_mv_table_id(),
683                            subscriptions
684                                .into_iter()
685                                .map(|(subscription_id, retention)| {
686                                    (
687                                        subscription_id.as_raw_id(),
688                                        SubscriberType::Subscription(retention),
689                                    )
690                                })
691                                .collect(),
692                        )
693                    })
694            })
695            .collect();
696
697        let mut database_jobs = HashMap::new();
698        let mut snapshot_backfill_jobs = HashMap::new();
699
700        for (job_id, job_fragments) in jobs {
701            if let Some(definition) = background_jobs.remove(&job_id) {
702                if job_fragments.values().any(|fragment| {
703                    fragment
704                        .fragment_type_mask
705                        .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
706                }) {
707                    debug!(%job_id, definition, "recovered snapshot backfill job");
708                    snapshot_backfill_jobs.insert(job_id, (job_fragments, definition));
709                } else {
710                    database_jobs.insert(job_id, (job_fragments, Some(definition)));
711                }
712            } else {
713                database_jobs.insert(job_id, (job_fragments, None));
714            }
715        }
716
717        let database_job_log_epochs: HashMap<_, _> = database_jobs
718            .keys()
719            .filter_map(|job_id| {
720                state_table_log_epochs
721                    .remove(&job_id.as_mv_table_id())
722                    .map(|epochs| (job_id.as_mv_table_id(), epochs))
723            })
724            .collect();
725
726        let prev_epoch = resolve_jobs_committed_epoch(
727            state_table_committed_epochs,
728            database_jobs.values().flat_map(|(job, _)| job.values()),
729        );
730        let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
731        // Use a different `curr_epoch` for each recovery attempt.
732        let curr_epoch = prev_epoch.next();
733        let barrier_info = BarrierInfo {
734            prev_epoch,
735            curr_epoch,
736            kind: BarrierKind::Initial,
737        };
738
739        let mut ongoing_snapshot_backfill_jobs: HashMap<JobId, _> = HashMap::new();
740        for (job_id, (fragment_infos, definition)) in snapshot_backfill_jobs {
741            let committed_epoch =
742                resolve_jobs_committed_epoch(state_table_committed_epochs, fragment_infos.values());
743            if committed_epoch == barrier_info.prev_epoch() {
744                info!(
745                    "recovered creating snapshot backfill job {} catch up with upstream already",
746                    job_id
747                );
748                database_jobs
749                    .try_insert(job_id, (fragment_infos, Some(definition)))
750                    .expect("non-duplicate");
751                continue;
752            }
753            let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
754                fragment_infos
755                    .values()
756                    .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
757            )?
758            .0
759            .ok_or_else(|| {
760                anyhow!(
761                    "recovered snapshot backfill job {} has no snapshot backfill info",
762                    job_id
763                )
764            })?;
765            let mut snapshot_epoch = None;
766            let upstream_table_ids: HashSet<_> = snapshot_backfill_info
767                .upstream_mv_table_id_to_backfill_epoch
768                .keys()
769                .cloned()
770                .collect();
771            for (upstream_table_id, epoch) in
772                snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
773            {
774                let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
775                let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
776                if *snapshot_epoch != epoch {
777                    return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
778                }
779            }
780            let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
781                anyhow!(
782                    "snapshot backfill job {} has not set snapshot epoch",
783                    job_id
784                )
785            })?;
786            for upstream_table_id in &upstream_table_ids {
787                subscribers
788                    .entry(*upstream_table_id)
789                    .or_default()
790                    .try_insert(job_id.as_raw_id(), SubscriberType::SnapshotBackfill)
791                    .expect("non-duplicate");
792            }
793            ongoing_snapshot_backfill_jobs
794                .try_insert(
795                    job_id,
796                    (
797                        fragment_infos,
798                        definition,
799                        upstream_table_ids,
800                        committed_epoch,
801                        snapshot_epoch,
802                    ),
803                )
804                .expect("non-duplicated");
805        }
806
807        let database_jobs: HashMap<JobId, InflightStreamingJobInfo> = {
808            database_jobs
809                .into_iter()
810                .map(|(job_id, (fragment_infos, background_job_definition))| {
811                    let status = if let Some(definition) = background_job_definition {
812                        CreateStreamingJobStatus::Creating(CreateMviewProgressTracker::recover(
813                            job_id,
814                            definition,
815                            &fragment_infos,
816                            Default::default(),
817                            hummock_version_stats,
818                        ))
819                    } else {
820                        CreateStreamingJobStatus::Created
821                    };
822                    (
823                        job_id,
824                        InflightStreamingJobInfo {
825                            job_id,
826                            fragment_infos,
827                            subscribers: subscribers
828                                .remove(&job_id.as_mv_table_id())
829                                .unwrap_or_default(),
830                            status,
831                        },
832                    )
833                })
834                .collect()
835        };
836
837        let node_to_collect = {
838            let new_actors =
839                edges.collect_actors_to_create(database_jobs.values().flat_map(move |job| {
840                    job.fragment_infos.values().map(move |fragment_infos| {
841                        (
842                            fragment_infos.fragment_id,
843                            &fragment_infos.nodes,
844                            fragment_infos.actors.iter().map(move |(actor_id, actor)| {
845                                (
846                                    stream_actors.get(actor_id).expect("should exist"),
847                                    actor.worker_id,
848                                )
849                            }),
850                            job.subscribers.keys().copied(),
851                        )
852                    })
853                }));
854
855            let nodes_actors =
856                InflightFragmentInfo::actor_ids_to_collect(database_jobs.values().flatten());
857
858            let node_to_collect = self.inject_barrier(
859                database_id,
860                None,
861                Some(mutation.clone()),
862                &barrier_info,
863                &nodes_actors,
864                InflightFragmentInfo::existing_table_ids(database_jobs.values().flatten()),
865                Some(new_actors),
866            )?;
867            debug!(
868                ?node_to_collect,
869                %database_id,
870                "inject initial barrier"
871            );
872            node_to_collect
873        };
874
875        let mut creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl> =
876            HashMap::new();
877        for (job_id, (info, definition, upstream_table_ids, committed_epoch, snapshot_epoch)) in
878            ongoing_snapshot_backfill_jobs
879        {
880            let node_actors = edges.collect_actors_to_create(info.values().map(|fragment_infos| {
881                (
882                    fragment_infos.fragment_id,
883                    &fragment_infos.nodes,
884                    fragment_infos.actors.iter().map(move |(actor_id, actor)| {
885                        (
886                            stream_actors.get(actor_id).expect("should exist"),
887                            actor.worker_id,
888                        )
889                    }),
890                    vec![], // no subscribers for backfilling jobs,
891                )
892            }));
893
894            creating_streaming_job_controls.insert(
895                job_id,
896                CreatingStreamingJobControl::recover(
897                    database_id,
898                    job_id,
899                    definition,
900                    upstream_table_ids,
901                    &database_job_log_epochs,
902                    snapshot_epoch,
903                    committed_epoch,
904                    barrier_info.curr_epoch.value().0,
905                    info,
906                    hummock_version_stats,
907                    node_actors,
908                    mutation.clone(),
909                    self,
910                )?,
911            );
912        }
913
914        self.env.shared_actor_infos().recover_database(
915            database_id,
916            database_jobs
917                .values()
918                .flat_map(|info| {
919                    info.fragment_infos()
920                        .map(move |fragment| (fragment, info.job_id))
921                })
922                .chain(
923                    creating_streaming_job_controls
924                        .values()
925                        .flat_map(|job| job.fragment_infos_with_job_id()),
926                ),
927        );
928
929        let committed_epoch = barrier_info.prev_epoch();
930        let new_epoch = barrier_info.curr_epoch;
931        let database_state = BarrierWorkerState::recovery(
932            database_id,
933            self.env.shared_actor_infos().clone(),
934            new_epoch,
935            database_jobs.into_values(),
936            is_paused,
937        );
938        let cdc_table_backfill_tracker = self.env.cdc_table_backfill_tracker();
939        Ok(DatabaseInitialBarrierCollector {
940            database_id,
941            node_to_collect,
942            database_state,
943            creating_streaming_job_controls,
944            committed_epoch,
945            cdc_table_backfill_tracker,
946        })
947    }
948
949    fn connected_workers(&self) -> impl Iterator<Item = (WorkerId, &ControlStreamNode)> + '_ {
950        self.workers
951            .iter()
952            .filter_map(|(worker_id, (_, worker_state))| match worker_state {
953                WorkerNodeState::Connected { control_stream, .. } => {
954                    Some((*worker_id, control_stream))
955                }
956                WorkerNodeState::Reconnecting(_) => None,
957            })
958    }
959
960    pub(super) fn inject_barrier(
961        &mut self,
962        database_id: DatabaseId,
963        creating_job_id: Option<JobId>,
964        mutation: Option<Mutation>,
965        barrier_info: &BarrierInfo,
966        node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
967        table_ids_to_sync: impl Iterator<Item = TableId>,
968        mut new_actors: Option<StreamJobActorsToCreate>,
969    ) -> MetaResult<NodeToCollect> {
970        fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
971            "inject_barrier_err"
972        ));
973
974        let partial_graph_id = to_partial_graph_id(creating_job_id);
975
976        for worker_id in node_actors.keys() {
977            if let Some((_, worker_state)) = self.workers.get(worker_id)
978                && let WorkerNodeState::Connected { .. } = worker_state
979            {
980            } else {
981                return Err(anyhow!("unconnected worker node {}", worker_id).into());
982            }
983        }
984
985        let mut node_need_collect = HashMap::new();
986        let table_ids_to_sync = table_ids_to_sync.collect_vec();
987
988        self.connected_workers()
989            .try_for_each(|(node_id, node)| {
990                let actor_ids_to_collect = node_actors
991                    .get(&node_id)
992                    .map(|actors| actors.iter().cloned())
993                    .into_iter()
994                    .flatten()
995                    .collect_vec();
996                let is_empty = actor_ids_to_collect.is_empty();
997                {
998                    let mutation = mutation.clone();
999                    let barrier = Barrier {
1000                        epoch: Some(risingwave_pb::data::Epoch {
1001                            curr: barrier_info.curr_epoch.value().0,
1002                            prev: barrier_info.prev_epoch(),
1003                        }),
1004                        mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
1005                        tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
1006                            .to_protobuf(),
1007                        kind: barrier_info.kind.to_protobuf() as i32,
1008                        passed_actors: vec![],
1009                    };
1010
1011                    node.handle
1012                        .request_sender
1013                        .send(StreamingControlStreamRequest {
1014                            request: Some(
1015                                streaming_control_stream_request::Request::InjectBarrier(
1016                                    InjectBarrierRequest {
1017                                        request_id: Uuid::new_v4().to_string(),
1018                                        barrier: Some(barrier),
1019                                        database_id,
1020                                        actor_ids_to_collect,
1021                                        table_ids_to_sync: table_ids_to_sync.clone(),
1022                                        partial_graph_id,
1023                                        actors_to_build: new_actors
1024                                            .as_mut()
1025                                            .map(|new_actors| new_actors.remove(&(node_id as _)))
1026                                            .into_iter()
1027                                            .flatten()
1028                                            .flatten()
1029                                            .map(|(fragment_id, (node, actors, initial_subscriber_ids))| {
1030                                                FragmentBuildActorInfo {
1031                                                    fragment_id,
1032                                                    node: Some(node),
1033                                                    actors: actors
1034                                                        .into_iter()
1035                                                        .map(|(actor, upstreams, dispatchers)| {
1036                                                            BuildActorInfo {
1037                                                                actor_id: actor.actor_id,
1038                                                                fragment_upstreams: upstreams
1039                                                                    .into_iter()
1040                                                                    .map(|(fragment_id, upstreams)| {
1041                                                                        (
1042                                                                            fragment_id,
1043                                                                            UpstreamActors {
1044                                                                                actors: upstreams
1045                                                                                    .into_values()
1046                                                                                    .collect(),
1047                                                                            },
1048                                                                        )
1049                                                                    })
1050                                                                    .collect(),
1051                                                                dispatchers,
1052                                                                vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
1053                                                                mview_definition: actor.mview_definition,
1054                                                                expr_context: actor.expr_context,
1055                                                                config_override: actor.config_override.to_string(),
1056                                                                initial_subscriber_ids: initial_subscriber_ids.iter().copied().collect(),
1057                                                            }
1058                                                        })
1059                                                        .collect(),
1060                                                }
1061                                            })
1062                                            .collect(),
1063                                    },
1064                                ),
1065                            ),
1066                        })
1067                        .map_err(|_| {
1068                            MetaError::from(anyhow!(
1069                                "failed to send request to {} {:?}",
1070                                node.worker_id,
1071                                node.host
1072                            ))
1073                        })?;
1074
1075                    node_need_collect.insert(node_id as WorkerId, is_empty);
1076                    Result::<_, MetaError>::Ok(())
1077                }
1078            })
1079            .inspect_err(|e| {
1080                // Record failure in event log.
1081                use risingwave_pb::meta::event_log;
1082                let event = event_log::EventInjectBarrierFail {
1083                    prev_epoch: barrier_info.prev_epoch(),
1084                    cur_epoch: barrier_info.curr_epoch.value().0,
1085                    error: e.to_report_string(),
1086                };
1087                self.env
1088                    .event_log_manager_ref()
1089                    .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
1090            })?;
1091        Ok(node_need_collect)
1092    }
1093
1094    pub(super) fn add_partial_graph(
1095        &mut self,
1096        database_id: DatabaseId,
1097        creating_job_id: Option<JobId>,
1098    ) {
1099        let partial_graph_id = to_partial_graph_id(creating_job_id);
1100        self.connected_workers().for_each(|(_, node)| {
1101            if node
1102                .handle
1103                .request_sender
1104                .send(StreamingControlStreamRequest {
1105                    request: Some(
1106                        streaming_control_stream_request::Request::CreatePartialGraph(
1107                            CreatePartialGraphRequest {
1108                                database_id,
1109                                partial_graph_id,
1110                            },
1111                        ),
1112                    ),
1113                }).is_err() {
1114                warn!(%database_id, ?creating_job_id, worker_id = %node.worker_id, "fail to add partial graph to worker")
1115            }
1116        });
1117    }
1118
1119    pub(super) fn remove_partial_graph(
1120        &mut self,
1121        database_id: DatabaseId,
1122        creating_job_ids: Vec<JobId>,
1123    ) {
1124        if creating_job_ids.is_empty() {
1125            return;
1126        }
1127        let partial_graph_ids = creating_job_ids
1128            .into_iter()
1129            .map(|job_id| to_partial_graph_id(Some(job_id)))
1130            .collect_vec();
1131        self.connected_workers().for_each(|(_, node)| {
1132            if node.handle
1133                .request_sender
1134                .send(StreamingControlStreamRequest {
1135                    request: Some(
1136                        streaming_control_stream_request::Request::RemovePartialGraph(
1137                            RemovePartialGraphRequest {
1138                                partial_graph_ids: partial_graph_ids.clone(),
1139                                database_id,
1140                            },
1141                        ),
1142                    ),
1143                })
1144                .is_err()
1145            {
1146                warn!(worker_id = %node.worker_id,node = ?node.host,"failed to send remove partial graph request");
1147            }
1148        })
1149    }
1150
1151    pub(super) fn reset_database(
1152        &mut self,
1153        database_id: DatabaseId,
1154        reset_request_id: u32,
1155    ) -> HashSet<WorkerId> {
1156        self.connected_workers()
1157            .filter_map(|(worker_id, node)| {
1158                if node
1159                    .handle
1160                    .request_sender
1161                    .send(StreamingControlStreamRequest {
1162                        request: Some(streaming_control_stream_request::Request::ResetDatabase(
1163                            ResetDatabaseRequest {
1164                                database_id,
1165                                reset_request_id,
1166                            },
1167                        )),
1168                    })
1169                    .is_err()
1170                {
1171                    warn!(%worker_id, node = ?node.host,"failed to send reset database request");
1172                    None
1173                } else {
1174                    Some(worker_id)
1175                }
1176            })
1177            .collect()
1178    }
1179}
1180
1181impl GlobalBarrierWorkerContextImpl {
1182    pub(super) async fn new_control_stream_impl(
1183        &self,
1184        node: &WorkerNode,
1185        init_request: &PbInitRequest,
1186    ) -> MetaResult<StreamingControlHandle> {
1187        let handle = self
1188            .env
1189            .stream_client_pool()
1190            .get(node)
1191            .await?
1192            .start_streaming_control(init_request.clone())
1193            .await?;
1194        Ok(handle)
1195    }
1196}
1197
1198pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1199    message: &str,
1200    errors: impl IntoIterator<Item = (WorkerId, E)>,
1201) -> MetaError {
1202    use std::fmt::Write;
1203
1204    use risingwave_common::error::error_request_copy;
1205    use risingwave_common::error::tonic::extra::Score;
1206
1207    let errors = errors.into_iter().collect_vec();
1208
1209    if errors.is_empty() {
1210        return anyhow!(message.to_owned()).into();
1211    }
1212
1213    // Create the error from the single error.
1214    let single_error = |(worker_id, e)| {
1215        anyhow::Error::from(e)
1216            .context(format!("{message}, in worker node {worker_id}"))
1217            .into()
1218    };
1219
1220    if errors.len() == 1 {
1221        return single_error(errors.into_iter().next().unwrap());
1222    }
1223
1224    // Find the error with the highest score.
1225    let max_score = errors
1226        .iter()
1227        .filter_map(|(_, e)| error_request_copy::<Score>(e))
1228        .max();
1229
1230    if let Some(max_score) = max_score {
1231        let mut errors = errors;
1232        let max_scored = errors
1233            .extract_if(.., |(_, e)| {
1234                error_request_copy::<Score>(e) == Some(max_score)
1235            })
1236            .next()
1237            .unwrap();
1238
1239        return single_error(max_scored);
1240    }
1241
1242    // The errors do not have scores, so simply concatenate them.
1243    let concat: String = errors
1244        .into_iter()
1245        .fold(format!("{message}: "), |mut s, (w, e)| {
1246            write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1247            s
1248        });
1249    anyhow!(concat).into()
1250}