risingwave_meta/barrier/
rpc.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::error::Error;
18use std::fmt::{Debug, Formatter};
19use std::future::poll_fn;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::future::{BoxFuture, join_all};
27use futures::{FutureExt, StreamExt};
28use itertools::Itertools;
29use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
30use risingwave_common::id::JobId;
31use risingwave_common::util::epoch::Epoch;
32use risingwave_common::util::tracing::TracingContext;
33use risingwave_connector::source::SplitImpl;
34use risingwave_connector::source::cdc::{
35    CdcTableSnapshotSplitAssignmentWithGeneration,
36    build_pb_actor_cdc_table_snapshot_splits_with_generation,
37};
38use risingwave_meta_model::WorkerId;
39use risingwave_pb::common::{HostAddress, WorkerNode};
40use risingwave_pb::hummock::HummockVersionStats;
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation};
43use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
44use risingwave_pb::stream_service::inject_barrier_request::{
45    BuildActorInfo, FragmentBuildActorInfo,
46};
47use risingwave_pb::stream_service::streaming_control_stream_request::{
48    CreatePartialGraphRequest, PbCreatePartialGraphRequest, PbInitRequest,
49    RemovePartialGraphRequest, ResetDatabaseRequest,
50};
51use risingwave_pb::stream_service::{
52    BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest,
53    streaming_control_stream_request, streaming_control_stream_response,
54};
55use risingwave_rpc_client::StreamingControlHandle;
56use thiserror_ext::AsReport;
57use tokio::time::{Instant, sleep};
58use tokio_retry::strategy::ExponentialBackoff;
59use tracing::{debug, error, info, warn};
60use uuid::Uuid;
61
62use super::{BarrierKind, Command, TracedEpoch};
63use crate::barrier::cdc_progress::CdcTableBackfillTrackerRef;
64use crate::barrier::checkpoint::{
65    BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
66};
67use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
68use crate::barrier::edge_builder::FragmentEdgeBuildResult;
69use crate::barrier::info::{
70    BarrierInfo, InflightDatabaseInfo, InflightStreamingJobInfo, SubscriberType,
71};
72use crate::barrier::progress::CreateMviewProgressTracker;
73use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
74use crate::controller::fragment::InflightFragmentInfo;
75use crate::manager::MetaSrvEnv;
76use crate::model::{ActorId, FragmentId, StreamActor, StreamJobActorsToCreate, SubscriptionId};
77use crate::stream::{StreamFragmentGraph, build_actor_connector_splits};
78use crate::{MetaError, MetaResult};
79
80fn to_partial_graph_id(job_id: Option<JobId>) -> u32 {
81    job_id
82        .map(|job_id| {
83            assert_ne!(job_id.as_raw_id(), u32::MAX);
84            job_id.as_raw_id()
85        })
86        .unwrap_or(u32::MAX)
87}
88
89pub(super) fn from_partial_graph_id(partial_graph_id: u32) -> Option<JobId> {
90    if partial_graph_id == u32::MAX {
91        None
92    } else {
93        Some(JobId::new(partial_graph_id))
94    }
95}
96
97struct ControlStreamNode {
98    worker_id: WorkerId,
99    host: HostAddress,
100    handle: StreamingControlHandle,
101}
102
103enum WorkerNodeState {
104    Connected {
105        control_stream: ControlStreamNode,
106        removed: bool,
107    },
108    Reconnecting(BoxFuture<'static, StreamingControlHandle>),
109}
110
111pub(super) struct ControlStreamManager {
112    workers: HashMap<WorkerId, (WorkerNode, WorkerNodeState)>,
113    pub env: MetaSrvEnv,
114}
115
116impl ControlStreamManager {
117    pub(super) fn new(env: MetaSrvEnv) -> Self {
118        Self {
119            workers: Default::default(),
120            env,
121        }
122    }
123
124    pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
125        self.workers[&worker_id].0.host.clone().unwrap()
126    }
127
128    pub(super) async fn add_worker(
129        &mut self,
130        node: WorkerNode,
131        inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
132        term_id: String,
133        context: &impl GlobalBarrierWorkerContext,
134    ) {
135        let node_id = node.id as WorkerId;
136        if let Entry::Occupied(entry) = self.workers.entry(node_id) {
137            let (existing_node, worker_state) = entry.get();
138            assert_eq!(existing_node.host, node.host);
139            warn!(id = node.id, host = ?node.host, "node already exists");
140            match worker_state {
141                WorkerNodeState::Connected { .. } => {
142                    warn!(id = node.id, host = ?node.host, "new node already connected");
143                    return;
144                }
145                WorkerNodeState::Reconnecting(_) => {
146                    warn!(id = node.id, host = ?node.host, "remove previous pending worker connect request and reconnect");
147                    entry.remove();
148                }
149            }
150        }
151        let node_host = node.host.clone().unwrap();
152        let mut backoff = ExponentialBackoff::from_millis(100)
153            .max_delay(Duration::from_secs(3))
154            .factor(5);
155        const MAX_RETRY: usize = 5;
156        for i in 1..=MAX_RETRY {
157            match context
158                .new_control_stream(
159                    &node,
160                    &PbInitRequest {
161                        term_id: term_id.clone(),
162                    },
163                )
164                .await
165            {
166                Ok(mut handle) => {
167                    WorkerNodeConnected {
168                        handle: &mut handle,
169                        node: &node,
170                    }
171                    .initialize(inflight_infos);
172                    info!(?node_host, "add control stream worker");
173                    assert!(
174                        self.workers
175                            .insert(
176                                node_id,
177                                (
178                                    node,
179                                    WorkerNodeState::Connected {
180                                        control_stream: ControlStreamNode {
181                                            worker_id: node_id as _,
182                                            host: node_host,
183                                            handle,
184                                        },
185                                        removed: false
186                                    }
187                                )
188                            )
189                            .is_none()
190                    );
191                    return;
192                }
193                Err(e) => {
194                    // It may happen that the dns information of newly registered worker node
195                    // has not been propagated to the meta node and cause error. Wait for a while and retry
196                    let delay = backoff.next().unwrap();
197                    error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
198                    sleep(delay).await;
199                }
200            }
201        }
202        error!(?node_host, "fail to create worker node after retry");
203    }
204
205    pub(super) fn remove_worker(&mut self, node: WorkerNode) {
206        if let Entry::Occupied(mut entry) = self.workers.entry(node.id as _) {
207            let (_, worker_state) = entry.get_mut();
208            match worker_state {
209                WorkerNodeState::Connected { removed, .. } => {
210                    info!(worker_id = node.id, "mark connected worker as removed");
211                    *removed = true;
212                }
213                WorkerNodeState::Reconnecting(_) => {
214                    info!(worker_id = node.id, "remove worker");
215                    entry.remove();
216                }
217            }
218        }
219    }
220
221    fn retry_connect(
222        node: WorkerNode,
223        term_id: String,
224        context: Arc<impl GlobalBarrierWorkerContext>,
225    ) -> BoxFuture<'static, StreamingControlHandle> {
226        async move {
227            let mut attempt = 0;
228            let backoff = ExponentialBackoff::from_millis(100)
229                .max_delay(Duration::from_mins(1))
230                .factor(5);
231            let init_request = PbInitRequest { term_id };
232            for delay in backoff {
233                attempt += 1;
234                sleep(delay).await;
235                match context.new_control_stream(&node, &init_request).await {
236                    Ok(handle) => {
237                        return handle;
238                    }
239                    Err(e) => {
240                        warn!(e = %e.as_report(), ?node, attempt, "fail to create control stream worker");
241                    }
242                }
243            }
244            unreachable!("end of retry backoff")
245        }.boxed()
246    }
247
248    pub(super) async fn recover(
249        env: MetaSrvEnv,
250        nodes: &HashMap<WorkerId, WorkerNode>,
251        term_id: &str,
252        context: Arc<impl GlobalBarrierWorkerContext>,
253    ) -> Self {
254        let reset_start_time = Instant::now();
255        let init_request = PbInitRequest {
256            term_id: term_id.to_owned(),
257        };
258        let init_request = &init_request;
259        let nodes = join_all(nodes.iter().map(|(worker_id, node)| async {
260            let result = context.new_control_stream(node, init_request).await;
261            (*worker_id, node.clone(), result)
262        }))
263        .await;
264        let mut unconnected_workers = HashSet::new();
265        let mut workers = HashMap::new();
266        for (worker_id, node, result) in nodes {
267            match result {
268                Ok(handle) => {
269                    let control_stream = ControlStreamNode {
270                        worker_id: node.id as _,
271                        host: node.host.clone().unwrap(),
272                        handle,
273                    };
274                    assert!(
275                        workers
276                            .insert(
277                                worker_id,
278                                (
279                                    node,
280                                    WorkerNodeState::Connected {
281                                        control_stream,
282                                        removed: false
283                                    }
284                                )
285                            )
286                            .is_none()
287                    );
288                }
289                Err(e) => {
290                    unconnected_workers.insert(worker_id);
291                    warn!(
292                        e = %e.as_report(),
293                        worker_id,
294                        ?node,
295                        "failed to connect to node"
296                    );
297                    assert!(
298                        workers
299                            .insert(
300                                worker_id,
301                                (
302                                    node.clone(),
303                                    WorkerNodeState::Reconnecting(Self::retry_connect(
304                                        node,
305                                        term_id.to_owned(),
306                                        context.clone()
307                                    ))
308                                )
309                            )
310                            .is_none()
311                    );
312                }
313            }
314        }
315
316        info!(elapsed=?reset_start_time.elapsed(), ?unconnected_workers, "control stream reset");
317
318        Self { workers, env }
319    }
320
321    /// Clear all nodes and response streams in the manager.
322    pub(super) fn clear(&mut self) {
323        *self = Self::new(self.env.clone());
324    }
325}
326
327pub(super) struct WorkerNodeConnected<'a> {
328    node: &'a WorkerNode,
329    handle: &'a mut StreamingControlHandle,
330}
331
332impl<'a> WorkerNodeConnected<'a> {
333    pub(super) fn initialize(
334        self,
335        inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
336    ) {
337        for request in ControlStreamManager::collect_init_partial_graph(inflight_infos) {
338            if let Err(e) = self.handle.send_request(StreamingControlStreamRequest {
339                request: Some(
340                    streaming_control_stream_request::Request::CreatePartialGraph(request),
341                ),
342            }) {
343                warn!(e = %e.as_report(), node = ?self.node, "failed to send initial partial graph request");
344            }
345        }
346    }
347}
348
349pub(super) enum WorkerNodeEvent<'a> {
350    Response(MetaResult<streaming_control_stream_response::Response>),
351    Connected(WorkerNodeConnected<'a>),
352}
353
354impl ControlStreamManager {
355    fn poll_next_event<'a>(
356        this_opt: &mut Option<&'a mut Self>,
357        cx: &mut Context<'_>,
358        term_id: &str,
359        context: &Arc<impl GlobalBarrierWorkerContext>,
360        poll_reconnect: bool,
361    ) -> Poll<(WorkerId, WorkerNodeEvent<'a>)> {
362        let this = this_opt.as_mut().expect("Future polled after completion");
363        if this.workers.is_empty() {
364            return Poll::Pending;
365        }
366        {
367            for (&worker_id, (node, worker_state)) in &mut this.workers {
368                let control_stream = match worker_state {
369                    WorkerNodeState::Connected { control_stream, .. } => control_stream,
370                    WorkerNodeState::Reconnecting(_) if !poll_reconnect => {
371                        continue;
372                    }
373                    WorkerNodeState::Reconnecting(join_handle) => {
374                        match join_handle.poll_unpin(cx) {
375                            Poll::Ready(handle) => {
376                                info!(id=node.id, host=?node.host, "reconnected to worker");
377                                *worker_state = WorkerNodeState::Connected {
378                                    control_stream: ControlStreamNode {
379                                        worker_id: node.id as _,
380                                        host: node.host.clone().unwrap(),
381                                        handle,
382                                    },
383                                    removed: false,
384                                };
385                                let this = this_opt.take().expect("should exist");
386                                let (node, worker_state) =
387                                    this.workers.get_mut(&worker_id).expect("should exist");
388                                let WorkerNodeState::Connected { control_stream, .. } =
389                                    worker_state
390                                else {
391                                    unreachable!()
392                                };
393                                return Poll::Ready((
394                                    worker_id,
395                                    WorkerNodeEvent::Connected(WorkerNodeConnected {
396                                        node,
397                                        handle: &mut control_stream.handle,
398                                    }),
399                                ));
400                            }
401                            Poll::Pending => {
402                                continue;
403                            }
404                        }
405                    }
406                };
407                match control_stream.handle.response_stream.poll_next_unpin(cx) {
408                    Poll::Ready(result) => {
409                        {
410                            let result = result
411                                .ok_or_else(|| (false, anyhow!("end of stream").into()))
412                                .and_then(|result| {
413                                    result.map_err(|err| -> (bool, MetaError) {(false, err.into())}).and_then(|resp| {
414                                        match resp
415                                            .response
416                                            .ok_or_else(|| (false, anyhow!("empty response").into()))?
417                                        {
418                                            streaming_control_stream_response::Response::Shutdown(_) => Err((true, anyhow!(
419                                                "worker node {worker_id} is shutting down"
420                                            )
421                                                .into())),
422                                            streaming_control_stream_response::Response::Init(_) => {
423                                                // This arm should be unreachable.
424                                                Err((false, anyhow!("get unexpected init response").into()))
425                                            }
426                                            resp => {
427                                                if let streaming_control_stream_response::Response::CompleteBarrier(barrier_resp) = &resp {
428                                                    assert_eq!(worker_id, barrier_resp.worker_id as WorkerId);
429                                                }
430                                                Ok(resp)
431                                            },
432                                        }
433                                    })
434                                });
435                            let result = match result {
436                                Ok(resp) => Ok(resp),
437                                Err((shutdown, err)) => {
438                                    warn!(worker_id = node.id, host = ?node.host, err = %err.as_report(), "get error from response stream");
439                                    let WorkerNodeState::Connected { removed, .. } = worker_state
440                                    else {
441                                        unreachable!("checked connected")
442                                    };
443                                    if *removed || shutdown {
444                                        this.workers.remove(&worker_id);
445                                    } else {
446                                        *worker_state = WorkerNodeState::Reconnecting(
447                                            ControlStreamManager::retry_connect(
448                                                node.clone(),
449                                                term_id.to_owned(),
450                                                context.clone(),
451                                            ),
452                                        );
453                                    }
454                                    Err(err)
455                                }
456                            };
457                            return Poll::Ready((worker_id, WorkerNodeEvent::Response(result)));
458                        }
459                    }
460                    Poll::Pending => {
461                        continue;
462                    }
463                }
464            }
465        };
466
467        Poll::Pending
468    }
469
470    #[await_tree::instrument("control_stream_next_event")]
471    pub(super) async fn next_event<'a>(
472        &'a mut self,
473        term_id: &str,
474        context: &Arc<impl GlobalBarrierWorkerContext>,
475    ) -> (WorkerId, WorkerNodeEvent<'a>) {
476        let mut this = Some(self);
477        poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, true)).await
478    }
479
480    #[await_tree::instrument("control_stream_next_response")]
481    pub(super) async fn next_response(
482        &mut self,
483        term_id: &str,
484        context: &Arc<impl GlobalBarrierWorkerContext>,
485    ) -> (
486        WorkerId,
487        MetaResult<streaming_control_stream_response::Response>,
488    ) {
489        let mut this = Some(self);
490        let (worker_id, event) =
491            poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, false)).await;
492        match event {
493            WorkerNodeEvent::Response(result) => (worker_id, result),
494            WorkerNodeEvent::Connected(_) => {
495                unreachable!("set poll_reconnect=false")
496            }
497        }
498    }
499
500    fn collect_init_partial_graph(
501        initial_inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
502    ) -> impl Iterator<Item = PbCreatePartialGraphRequest> {
503        initial_inflight_infos.flat_map(|(database_id, creating_job_ids)| {
504            [PbCreatePartialGraphRequest {
505                partial_graph_id: to_partial_graph_id(None),
506                database_id: database_id.into(),
507            }]
508            .into_iter()
509            .chain(
510                creating_job_ids.map(move |job_id| PbCreatePartialGraphRequest {
511                    partial_graph_id: to_partial_graph_id(Some(job_id)),
512                    database_id: database_id.into(),
513                }),
514            )
515        })
516    }
517}
518
519pub(super) struct DatabaseInitialBarrierCollector {
520    database_id: DatabaseId,
521    node_to_collect: NodeToCollect,
522    database_state: BarrierWorkerState,
523    create_mview_tracker: CreateMviewProgressTracker,
524    creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
525    committed_epoch: u64,
526    cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
527}
528
529impl Debug for DatabaseInitialBarrierCollector {
530    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
531        f.debug_struct("DatabaseInitialBarrierCollector")
532            .field("database_id", &self.database_id)
533            .field("node_to_collect", &self.node_to_collect)
534            .finish()
535    }
536}
537
538impl DatabaseInitialBarrierCollector {
539    pub(super) fn is_collected(&self) -> bool {
540        self.node_to_collect.is_empty()
541            && self
542                .creating_streaming_job_controls
543                .values()
544                .all(|job| job.is_empty())
545    }
546
547    pub(super) fn database_state(
548        &self,
549    ) -> (
550        &BarrierWorkerState,
551        &HashMap<JobId, CreatingStreamingJobControl>,
552    ) {
553        (&self.database_state, &self.creating_streaming_job_controls)
554    }
555
556    pub(super) fn collect_resp(&mut self, resp: BarrierCompleteResponse) {
557        assert_eq!(self.database_id.as_raw_id(), resp.database_id);
558        if let Some(creating_job_id) = from_partial_graph_id(resp.partial_graph_id) {
559            self.creating_streaming_job_controls
560                .get_mut(&creating_job_id)
561                .expect("should exist")
562                .collect(resp);
563        } else {
564            assert_eq!(resp.epoch, self.committed_epoch);
565            assert!(
566                self.node_to_collect
567                    .remove(&(resp.worker_id as _))
568                    .is_some()
569            );
570        }
571    }
572
573    pub(super) fn finish(self) -> DatabaseCheckpointControl {
574        assert!(self.is_collected());
575        DatabaseCheckpointControl::recovery(
576            self.database_id,
577            self.create_mview_tracker,
578            self.database_state,
579            self.committed_epoch,
580            self.creating_streaming_job_controls,
581            self.cdc_table_backfill_tracker,
582        )
583    }
584
585    pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
586        is_valid_after_worker_err(&mut self.node_to_collect, worker_id)
587            && self
588                .creating_streaming_job_controls
589                .values_mut()
590                .all(|job| job.is_valid_after_worker_err(worker_id))
591    }
592}
593
594impl ControlStreamManager {
595    /// Extract information from the loaded runtime barrier worker snapshot info, and inject the initial barrier.
596    #[expect(clippy::too_many_arguments)]
597    pub(super) fn inject_database_initial_barrier(
598        &mut self,
599        database_id: DatabaseId,
600        jobs: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
601        state_table_committed_epochs: &mut HashMap<TableId, u64>,
602        state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
603        edges: &mut FragmentEdgeBuildResult,
604        stream_actors: &HashMap<ActorId, StreamActor>,
605        source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
606        background_jobs: &mut HashMap<JobId, String>,
607        mv_depended_subscriptions: &mut HashMap<TableId, HashMap<SubscriptionId, u64>>,
608        is_paused: bool,
609        hummock_version_stats: &HummockVersionStats,
610        cdc_table_snapshot_split_assignment: &mut CdcTableSnapshotSplitAssignmentWithGeneration,
611    ) -> MetaResult<DatabaseInitialBarrierCollector> {
612        self.add_partial_graph(database_id, None);
613        let source_split_assignments = jobs
614            .values()
615            .flat_map(|fragments| fragments.values())
616            .flat_map(|info| info.actors.keys())
617            .filter_map(|actor_id| {
618                let actor_id = *actor_id as ActorId;
619                source_splits
620                    .remove(&actor_id)
621                    .map(|splits| (actor_id, splits))
622            })
623            .collect();
624        let database_cdc_table_snapshot_split_assignment = jobs
625            .values()
626            .flat_map(|fragments| fragments.values())
627            .flat_map(|info| info.actors.keys())
628            .filter_map(|actor_id| {
629                let actor_id = *actor_id as ActorId;
630                cdc_table_snapshot_split_assignment
631                    .splits
632                    .remove(&actor_id)
633                    .map(|splits| (actor_id, splits))
634            })
635            .collect();
636        let database_cdc_table_snapshot_split_assignment =
637            CdcTableSnapshotSplitAssignmentWithGeneration::new(
638                database_cdc_table_snapshot_split_assignment,
639                cdc_table_snapshot_split_assignment.generation,
640            );
641        let mutation = Mutation::Add(AddMutation {
642            // Actors built during recovery is not treated as newly added actors.
643            actor_dispatchers: Default::default(),
644            added_actors: Default::default(),
645            actor_splits: build_actor_connector_splits(&source_split_assignments),
646            actor_cdc_table_snapshot_splits:
647                build_pb_actor_cdc_table_snapshot_splits_with_generation(
648                    database_cdc_table_snapshot_split_assignment,
649                )
650                .into(),
651            pause: is_paused,
652            subscriptions_to_add: Default::default(),
653            // TODO(kwannoel): recover using backfill order plan
654            backfill_nodes_to_pause: Default::default(),
655            new_upstream_sinks: Default::default(),
656        });
657
658        fn resolve_jobs_committed_epoch<'a>(
659            state_table_committed_epochs: &mut HashMap<TableId, u64>,
660            fragments: impl Iterator<Item = &'a InflightFragmentInfo> + 'a,
661        ) -> u64 {
662            let mut epochs = InflightFragmentInfo::existing_table_ids(fragments).map(|table_id| {
663                (
664                    table_id,
665                    state_table_committed_epochs
666                        .remove(&table_id)
667                        .expect("should exist"),
668                )
669            });
670            let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
671            for (table_id, epoch) in epochs {
672                assert_eq!(
673                    prev_epoch, epoch,
674                    "{} has different committed epoch to {}",
675                    first_table_id, table_id
676                );
677            }
678            prev_epoch
679        }
680
681        let mut subscribers: HashMap<_, HashMap<_, _>> = jobs
682            .keys()
683            .filter_map(|job_id| {
684                mv_depended_subscriptions
685                    .remove(&job_id.as_mv_table_id())
686                    .map(|subscriptions| {
687                        (
688                            job_id.as_mv_table_id(),
689                            subscriptions
690                                .into_iter()
691                                .map(|(subscription_id, retention)| {
692                                    (subscription_id, SubscriberType::Subscription(retention))
693                                })
694                                .collect(),
695                        )
696                    })
697            })
698            .collect();
699
700        let mut database_jobs = HashMap::new();
701        let mut snapshot_backfill_jobs = HashMap::new();
702        let mut background_mviews = HashMap::new();
703
704        for (job_id, job_fragments) in jobs {
705            if let Some(definition) = background_jobs.remove(&job_id) {
706                if job_fragments.values().any(|fragment| {
707                    fragment
708                        .fragment_type_mask
709                        .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
710                }) {
711                    debug!(%job_id, definition, "recovered snapshot backfill job");
712                    snapshot_backfill_jobs.insert(job_id, (job_fragments, definition));
713                } else {
714                    database_jobs.insert(job_id, job_fragments);
715                    background_mviews.insert(job_id, definition);
716                }
717            } else {
718                database_jobs.insert(job_id, job_fragments);
719            }
720        }
721
722        let database_job_log_epochs: HashMap<_, _> = database_jobs
723            .keys()
724            .filter_map(|job_id| {
725                state_table_log_epochs
726                    .remove(&job_id.as_mv_table_id())
727                    .map(|epochs| (job_id.as_mv_table_id(), epochs))
728            })
729            .collect();
730
731        let prev_epoch = resolve_jobs_committed_epoch(
732            state_table_committed_epochs,
733            database_jobs.values().flat_map(|job| job.values()),
734        );
735        let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
736        // Use a different `curr_epoch` for each recovery attempt.
737        let curr_epoch = prev_epoch.next();
738        let barrier_info = BarrierInfo {
739            prev_epoch,
740            curr_epoch,
741            kind: BarrierKind::Initial,
742        };
743
744        let mut ongoing_snapshot_backfill_jobs: HashMap<JobId, _> = HashMap::new();
745        for (job_id, (fragment_infos, definition)) in snapshot_backfill_jobs {
746            let committed_epoch =
747                resolve_jobs_committed_epoch(state_table_committed_epochs, fragment_infos.values());
748            if committed_epoch == barrier_info.prev_epoch() {
749                info!(
750                    "recovered creating snapshot backfill job {} catch up with upstream already",
751                    job_id
752                );
753                background_mviews
754                    .try_insert(job_id, definition)
755                    .expect("non-duplicate");
756                database_jobs
757                    .try_insert(job_id, fragment_infos)
758                    .expect("non-duplicate");
759                continue;
760            }
761            let info = InflightStreamingJobInfo {
762                job_id,
763                fragment_infos,
764                subscribers: Default::default(), /* no subscriber for ongoing snapshot backfill jobs */
765            };
766            let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
767                info.fragment_infos()
768                    .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
769            )?
770            .0
771            .ok_or_else(|| {
772                anyhow!(
773                    "recovered snapshot backfill job {} has no snapshot backfill info",
774                    job_id
775                )
776            })?;
777            let mut snapshot_epoch = None;
778            let upstream_table_ids: HashSet<_> = snapshot_backfill_info
779                .upstream_mv_table_id_to_backfill_epoch
780                .keys()
781                .cloned()
782                .collect();
783            for (upstream_table_id, epoch) in
784                snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
785            {
786                let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
787                let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
788                if *snapshot_epoch != epoch {
789                    return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
790                }
791            }
792            let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
793                anyhow!(
794                    "snapshot backfill job {} has not set snapshot epoch",
795                    job_id
796                )
797            })?;
798            for upstream_table_id in &upstream_table_ids {
799                subscribers
800                    .entry(*upstream_table_id)
801                    .or_default()
802                    .try_insert(job_id.into(), SubscriberType::SnapshotBackfill)
803                    .expect("non-duplicate");
804            }
805            ongoing_snapshot_backfill_jobs
806                .try_insert(
807                    job_id,
808                    (
809                        info,
810                        definition,
811                        upstream_table_ids,
812                        committed_epoch,
813                        snapshot_epoch,
814                    ),
815                )
816                .expect("non-duplicated");
817        }
818
819        let database_jobs: HashMap<JobId, InflightStreamingJobInfo> = {
820            database_jobs
821                .into_iter()
822                .map(|(job_id, fragment_infos)| {
823                    (
824                        job_id,
825                        InflightStreamingJobInfo {
826                            job_id,
827                            fragment_infos,
828                            subscribers: subscribers
829                                .remove(&job_id.as_mv_table_id())
830                                .unwrap_or_default(),
831                        },
832                    )
833                })
834                .collect()
835        };
836
837        let node_to_collect = {
838            let node_actors =
839                edges.collect_actors_to_create(database_jobs.values().flat_map(move |job| {
840                    job.fragment_infos.values().map(move |fragment_info| {
841                        (
842                            fragment_info.fragment_id,
843                            &fragment_info.nodes,
844                            fragment_info.actors.iter().map(move |(actor_id, actor)| {
845                                (
846                                    stream_actors.get(actor_id).expect("should exist"),
847                                    actor.worker_id,
848                                )
849                            }),
850                            job.subscribers.keys().copied(),
851                        )
852                    })
853                }));
854
855            let node_to_collect = self.inject_barrier(
856                database_id,
857                None,
858                Some(mutation.clone()),
859                &barrier_info,
860                database_jobs.values().flatten(),
861                database_jobs.values().flatten(),
862                Some(node_actors),
863            )?;
864            debug!(
865                ?node_to_collect,
866                %database_id,
867                "inject initial barrier"
868            );
869            node_to_collect
870        };
871
872        let tracker = CreateMviewProgressTracker::recover(
873            background_mviews.iter().map(|(table_id, definition)| {
874                (
875                    *table_id,
876                    (
877                        definition.clone(),
878                        &database_jobs[table_id],
879                        Default::default(),
880                    ),
881                )
882            }),
883            hummock_version_stats,
884        );
885
886        let mut creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl> =
887            HashMap::new();
888        for (job_id, (info, definition, upstream_table_ids, committed_epoch, snapshot_epoch)) in
889            ongoing_snapshot_backfill_jobs
890        {
891            let node_actors =
892                edges.collect_actors_to_create(info.fragment_infos().map(|fragment_info| {
893                    (
894                        fragment_info.fragment_id,
895                        &fragment_info.nodes,
896                        fragment_info.actors.iter().map(move |(actor_id, actor)| {
897                            (
898                                stream_actors.get(actor_id).expect("should exist"),
899                                actor.worker_id,
900                            )
901                        }),
902                        info.subscribers.keys().copied(),
903                    )
904                }));
905
906            creating_streaming_job_controls.insert(
907                job_id,
908                CreatingStreamingJobControl::recover(
909                    database_id,
910                    job_id,
911                    definition,
912                    upstream_table_ids,
913                    &database_job_log_epochs,
914                    snapshot_epoch,
915                    committed_epoch,
916                    barrier_info.curr_epoch.value().0,
917                    info,
918                    hummock_version_stats,
919                    node_actors,
920                    mutation.clone(),
921                    self,
922                )?,
923            );
924        }
925
926        self.env.shared_actor_infos().recover_database(
927            database_id,
928            database_jobs
929                .values()
930                .chain(
931                    creating_streaming_job_controls
932                        .values()
933                        .map(|job| job.graph_info()),
934                )
935                .flat_map(|info| {
936                    info.fragment_infos()
937                        .map(move |fragment| (fragment, info.job_id))
938                }),
939        );
940
941        let committed_epoch = barrier_info.prev_epoch();
942        let new_epoch = barrier_info.curr_epoch;
943        let database_state = BarrierWorkerState::recovery(
944            database_id,
945            self.env.shared_actor_infos().clone(),
946            new_epoch,
947            database_jobs.into_values(),
948            is_paused,
949        );
950        let cdc_table_backfill_tracker = self.env.cdc_table_backfill_tracker();
951        Ok(DatabaseInitialBarrierCollector {
952            database_id,
953            node_to_collect,
954            database_state,
955            create_mview_tracker: tracker,
956            creating_streaming_job_controls,
957            committed_epoch,
958            cdc_table_backfill_tracker,
959        })
960    }
961
962    pub(super) fn inject_command_ctx_barrier(
963        &mut self,
964        database_id: DatabaseId,
965        command: Option<&Command>,
966        barrier_info: &BarrierInfo,
967        is_paused: bool,
968        pre_applied_graph_info: &InflightDatabaseInfo,
969        applied_graph_info: &InflightDatabaseInfo,
970        edges: &mut Option<FragmentEdgeBuildResult>,
971    ) -> MetaResult<NodeToCollect> {
972        let mutation = command.and_then(|c| c.to_mutation(is_paused, edges, self));
973        self.inject_barrier(
974            database_id,
975            None,
976            mutation,
977            barrier_info,
978            pre_applied_graph_info.fragment_infos(),
979            applied_graph_info.fragment_infos(),
980            command
981                .as_ref()
982                .map(|command| command.actors_to_create(pre_applied_graph_info, edges, self))
983                .unwrap_or_default(),
984        )
985    }
986
987    fn connected_workers(&self) -> impl Iterator<Item = (WorkerId, &ControlStreamNode)> + '_ {
988        self.workers
989            .iter()
990            .filter_map(|(worker_id, (_, worker_state))| match worker_state {
991                WorkerNodeState::Connected { control_stream, .. } => {
992                    Some((*worker_id, control_stream))
993                }
994                WorkerNodeState::Reconnecting(_) => None,
995            })
996    }
997
998    pub(super) fn inject_barrier<'a>(
999        &mut self,
1000        database_id: DatabaseId,
1001        creating_job_id: Option<JobId>,
1002        mutation: Option<Mutation>,
1003        barrier_info: &BarrierInfo,
1004        pre_applied_graph_info: impl IntoIterator<Item = &InflightFragmentInfo>,
1005        applied_graph_info: impl IntoIterator<Item = &'a InflightFragmentInfo> + 'a,
1006        mut new_actors: Option<StreamJobActorsToCreate>,
1007    ) -> MetaResult<NodeToCollect> {
1008        fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
1009            "inject_barrier_err"
1010        ));
1011
1012        let partial_graph_id = to_partial_graph_id(creating_job_id);
1013
1014        let node_actors = InflightFragmentInfo::actor_ids_to_collect(pre_applied_graph_info);
1015
1016        for worker_id in node_actors.keys() {
1017            if let Some((_, worker_state)) = self.workers.get(worker_id)
1018                && let WorkerNodeState::Connected { .. } = worker_state
1019            {
1020            } else {
1021                return Err(anyhow!("unconnected worker node {}", worker_id).into());
1022            }
1023        }
1024
1025        let table_ids_to_sync: HashSet<_> =
1026            InflightFragmentInfo::existing_table_ids(applied_graph_info)
1027                .map(|table_id| table_id.as_raw_id())
1028                .collect();
1029
1030        let mut node_need_collect = HashMap::new();
1031
1032        self.connected_workers()
1033            .try_for_each(|(node_id, node)| {
1034                let actor_ids_to_collect = node_actors
1035                    .get(&node_id)
1036                    .map(|actors| actors.iter().cloned())
1037                    .into_iter()
1038                    .flatten()
1039                    .collect_vec();
1040                let is_empty = actor_ids_to_collect.is_empty();
1041                {
1042                    let mutation = mutation.clone();
1043                    let barrier = Barrier {
1044                        epoch: Some(risingwave_pb::data::Epoch {
1045                            curr: barrier_info.curr_epoch.value().0,
1046                            prev: barrier_info.prev_epoch(),
1047                        }),
1048                        mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
1049                        tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
1050                            .to_protobuf(),
1051                        kind: barrier_info.kind.to_protobuf() as i32,
1052                        passed_actors: vec![],
1053                    };
1054
1055                    node.handle
1056                        .request_sender
1057                        .send(StreamingControlStreamRequest {
1058                            request: Some(
1059                                streaming_control_stream_request::Request::InjectBarrier(
1060                                    InjectBarrierRequest {
1061                                        request_id: Uuid::new_v4().to_string(),
1062                                        barrier: Some(barrier),
1063                                        database_id: database_id.as_raw_id(),
1064                                        actor_ids_to_collect,
1065                                        table_ids_to_sync: table_ids_to_sync
1066                                            .iter()
1067                                            .cloned()
1068                                            .collect(),
1069                                        partial_graph_id,
1070                                        actors_to_build: new_actors
1071                                            .as_mut()
1072                                            .map(|new_actors| new_actors.remove(&(node_id as _)))
1073                                            .into_iter()
1074                                            .flatten()
1075                                            .flatten()
1076                                            .map(|(fragment_id, (node, actors, initial_subscriber_ids))| {
1077                                                FragmentBuildActorInfo {
1078                                                    fragment_id,
1079                                                    node: Some(node),
1080                                                    actors: actors
1081                                                        .into_iter()
1082                                                        .map(|(actor, upstreams, dispatchers)| {
1083                                                            BuildActorInfo {
1084                                                                actor_id: actor.actor_id,
1085                                                                fragment_upstreams: upstreams
1086                                                                    .into_iter()
1087                                                                    .map(|(fragment_id, upstreams)| {
1088                                                                        (
1089                                                                            fragment_id,
1090                                                                            UpstreamActors {
1091                                                                                actors: upstreams
1092                                                                                    .into_values()
1093                                                                                    .collect(),
1094                                                                            },
1095                                                                        )
1096                                                                    })
1097                                                                    .collect(),
1098                                                                dispatchers,
1099                                                                vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
1100                                                                mview_definition: actor.mview_definition,
1101                                                                expr_context: actor.expr_context,
1102                                                                initial_subscriber_ids: initial_subscriber_ids.iter().copied().collect(),
1103                                                            }
1104                                                        })
1105                                                        .collect(),
1106                                                }
1107                                            })
1108                                            .collect(),
1109                                    },
1110                                ),
1111                            ),
1112                        })
1113                        .map_err(|_| {
1114                            MetaError::from(anyhow!(
1115                                "failed to send request to {} {:?}",
1116                                node.worker_id,
1117                                node.host
1118                            ))
1119                        })?;
1120
1121                    node_need_collect.insert(node_id as WorkerId, is_empty);
1122                    Result::<_, MetaError>::Ok(())
1123                }
1124            })
1125            .inspect_err(|e| {
1126                // Record failure in event log.
1127                use risingwave_pb::meta::event_log;
1128                let event = event_log::EventInjectBarrierFail {
1129                    prev_epoch: barrier_info.prev_epoch(),
1130                    cur_epoch: barrier_info.curr_epoch.value().0,
1131                    error: e.to_report_string(),
1132                };
1133                self.env
1134                    .event_log_manager_ref()
1135                    .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
1136            })?;
1137        Ok(node_need_collect)
1138    }
1139
1140    pub(super) fn add_partial_graph(
1141        &mut self,
1142        database_id: DatabaseId,
1143        creating_job_id: Option<JobId>,
1144    ) {
1145        let partial_graph_id = to_partial_graph_id(creating_job_id);
1146        self.connected_workers().for_each(|(_, node)| {
1147            if node
1148                .handle
1149                .request_sender
1150                .send(StreamingControlStreamRequest {
1151                    request: Some(
1152                        streaming_control_stream_request::Request::CreatePartialGraph(
1153                            CreatePartialGraphRequest {
1154                                database_id: database_id.as_raw_id(),
1155                                partial_graph_id,
1156                            },
1157                        ),
1158                    ),
1159                }).is_err() {
1160                warn!(%database_id, ?creating_job_id, worker_id = node.worker_id, "fail to add partial graph to worker")
1161            }
1162        });
1163    }
1164
1165    pub(super) fn remove_partial_graph(
1166        &mut self,
1167        database_id: DatabaseId,
1168        creating_job_ids: Vec<JobId>,
1169    ) {
1170        if creating_job_ids.is_empty() {
1171            return;
1172        }
1173        let partial_graph_ids = creating_job_ids
1174            .into_iter()
1175            .map(|job_id| to_partial_graph_id(Some(job_id)))
1176            .collect_vec();
1177        self.connected_workers().for_each(|(_, node)| {
1178            if node.handle
1179                .request_sender
1180                .send(StreamingControlStreamRequest {
1181                    request: Some(
1182                        streaming_control_stream_request::Request::RemovePartialGraph(
1183                            RemovePartialGraphRequest {
1184                                partial_graph_ids: partial_graph_ids.clone(),
1185                                database_id: database_id.as_raw_id(),
1186                            },
1187                        ),
1188                    ),
1189                })
1190                .is_err()
1191            {
1192                warn!(worker_id = node.worker_id,node = ?node.host,"failed to send remove partial graph request");
1193            }
1194        })
1195    }
1196
1197    pub(super) fn reset_database(
1198        &mut self,
1199        database_id: DatabaseId,
1200        reset_request_id: u32,
1201    ) -> HashSet<WorkerId> {
1202        self.connected_workers()
1203            .filter_map(|(worker_id, node)| {
1204                if node
1205                    .handle
1206                    .request_sender
1207                    .send(StreamingControlStreamRequest {
1208                        request: Some(streaming_control_stream_request::Request::ResetDatabase(
1209                            ResetDatabaseRequest {
1210                                database_id: database_id.as_raw_id(),
1211                                reset_request_id,
1212                            },
1213                        )),
1214                    })
1215                    .is_err()
1216                {
1217                    warn!(worker_id, node = ?node.host,"failed to send reset database request");
1218                    None
1219                } else {
1220                    Some(worker_id)
1221                }
1222            })
1223            .collect()
1224    }
1225}
1226
1227impl GlobalBarrierWorkerContextImpl {
1228    pub(super) async fn new_control_stream_impl(
1229        &self,
1230        node: &WorkerNode,
1231        init_request: &PbInitRequest,
1232    ) -> MetaResult<StreamingControlHandle> {
1233        let handle = self
1234            .env
1235            .stream_client_pool()
1236            .get(node)
1237            .await?
1238            .start_streaming_control(init_request.clone())
1239            .await?;
1240        Ok(handle)
1241    }
1242}
1243
1244pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1245    message: &str,
1246    errors: impl IntoIterator<Item = (WorkerId, E)>,
1247) -> MetaError {
1248    use std::fmt::Write;
1249
1250    use risingwave_common::error::error_request_copy;
1251    use risingwave_common::error::tonic::extra::Score;
1252
1253    let errors = errors.into_iter().collect_vec();
1254
1255    if errors.is_empty() {
1256        return anyhow!(message.to_owned()).into();
1257    }
1258
1259    // Create the error from the single error.
1260    let single_error = |(worker_id, e)| {
1261        anyhow::Error::from(e)
1262            .context(format!("{message}, in worker node {worker_id}"))
1263            .into()
1264    };
1265
1266    if errors.len() == 1 {
1267        return single_error(errors.into_iter().next().unwrap());
1268    }
1269
1270    // Find the error with the highest score.
1271    let max_score = errors
1272        .iter()
1273        .filter_map(|(_, e)| error_request_copy::<Score>(e))
1274        .max();
1275
1276    if let Some(max_score) = max_score {
1277        let mut errors = errors;
1278        let max_scored = errors
1279            .extract_if(.., |(_, e)| {
1280                error_request_copy::<Score>(e) == Some(max_score)
1281            })
1282            .next()
1283            .unwrap();
1284
1285        return single_error(max_scored);
1286    }
1287
1288    // The errors do not have scores, so simply concatenate them.
1289    let concat: String = errors
1290        .into_iter()
1291        .fold(format!("{message}: "), |mut s, (w, e)| {
1292            write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1293            s
1294        });
1295    anyhow!(concat).into()
1296}