risingwave_meta/barrier/
rpc.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::error::Error;
18use std::fmt::{Debug, Formatter};
19use std::future::poll_fn;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::future::{BoxFuture, join_all};
27use futures::{FutureExt, StreamExt};
28use itertools::Itertools;
29use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
30use risingwave_common::id::JobId;
31use risingwave_common::util::epoch::Epoch;
32use risingwave_common::util::tracing::TracingContext;
33use risingwave_connector::source::SplitImpl;
34use risingwave_connector::source::cdc::{
35    CdcTableSnapshotSplitAssignmentWithGeneration,
36    build_pb_actor_cdc_table_snapshot_splits_with_generation,
37};
38use risingwave_meta_model::WorkerId;
39use risingwave_pb::common::{HostAddress, WorkerNode};
40use risingwave_pb::hummock::HummockVersionStats;
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation};
43use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
44use risingwave_pb::stream_service::inject_barrier_request::{
45    BuildActorInfo, FragmentBuildActorInfo,
46};
47use risingwave_pb::stream_service::streaming_control_stream_request::{
48    CreatePartialGraphRequest, PbCreatePartialGraphRequest, PbInitRequest,
49    RemovePartialGraphRequest, ResetDatabaseRequest,
50};
51use risingwave_pb::stream_service::{
52    BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest,
53    streaming_control_stream_request, streaming_control_stream_response,
54};
55use risingwave_rpc_client::StreamingControlHandle;
56use thiserror_ext::AsReport;
57use tokio::time::{Instant, sleep};
58use tokio_retry::strategy::ExponentialBackoff;
59use tracing::{debug, error, info, warn};
60use uuid::Uuid;
61
62use super::{BarrierKind, Command, TracedEpoch};
63use crate::barrier::cdc_progress::CdcTableBackfillTrackerRef;
64use crate::barrier::checkpoint::{
65    BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
66};
67use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
68use crate::barrier::edge_builder::FragmentEdgeBuildResult;
69use crate::barrier::info::{
70    BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
71    SubscriberType,
72};
73use crate::barrier::progress::CreateMviewProgressTracker;
74use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
75use crate::controller::fragment::InflightFragmentInfo;
76use crate::manager::MetaSrvEnv;
77use crate::model::{ActorId, FragmentId, StreamActor, StreamJobActorsToCreate, SubscriptionId};
78use crate::stream::{StreamFragmentGraph, build_actor_connector_splits};
79use crate::{MetaError, MetaResult};
80
81fn to_partial_graph_id(job_id: Option<JobId>) -> u32 {
82    job_id
83        .map(|job_id| {
84            assert_ne!(job_id, u32::MAX);
85            job_id.as_raw_id()
86        })
87        .unwrap_or(u32::MAX)
88}
89
90pub(super) fn from_partial_graph_id(partial_graph_id: u32) -> Option<JobId> {
91    if partial_graph_id == u32::MAX {
92        None
93    } else {
94        Some(JobId::new(partial_graph_id))
95    }
96}
97
98struct ControlStreamNode {
99    worker_id: WorkerId,
100    host: HostAddress,
101    handle: StreamingControlHandle,
102}
103
104enum WorkerNodeState {
105    Connected {
106        control_stream: ControlStreamNode,
107        removed: bool,
108    },
109    Reconnecting(BoxFuture<'static, StreamingControlHandle>),
110}
111
112pub(super) struct ControlStreamManager {
113    workers: HashMap<WorkerId, (WorkerNode, WorkerNodeState)>,
114    pub env: MetaSrvEnv,
115}
116
117impl ControlStreamManager {
118    pub(super) fn new(env: MetaSrvEnv) -> Self {
119        Self {
120            workers: Default::default(),
121            env,
122        }
123    }
124
125    pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
126        self.workers[&worker_id].0.host.clone().unwrap()
127    }
128
129    pub(super) async fn add_worker(
130        &mut self,
131        node: WorkerNode,
132        inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
133        term_id: String,
134        context: &impl GlobalBarrierWorkerContext,
135    ) {
136        let node_id = node.id;
137        if let Entry::Occupied(entry) = self.workers.entry(node_id) {
138            let (existing_node, worker_state) = entry.get();
139            assert_eq!(existing_node.host, node.host);
140            warn!(id = %node.id, host = ?node.host, "node already exists");
141            match worker_state {
142                WorkerNodeState::Connected { .. } => {
143                    warn!(id = %node.id, host = ?node.host, "new node already connected");
144                    return;
145                }
146                WorkerNodeState::Reconnecting(_) => {
147                    warn!(id = %node.id, host = ?node.host, "remove previous pending worker connect request and reconnect");
148                    entry.remove();
149                }
150            }
151        }
152        let node_host = node.host.clone().unwrap();
153        let mut backoff = ExponentialBackoff::from_millis(100)
154            .max_delay(Duration::from_secs(3))
155            .factor(5);
156        const MAX_RETRY: usize = 5;
157        for i in 1..=MAX_RETRY {
158            match context
159                .new_control_stream(
160                    &node,
161                    &PbInitRequest {
162                        term_id: term_id.clone(),
163                    },
164                )
165                .await
166            {
167                Ok(mut handle) => {
168                    WorkerNodeConnected {
169                        handle: &mut handle,
170                        node: &node,
171                    }
172                    .initialize(inflight_infos);
173                    info!(?node_host, "add control stream worker");
174                    assert!(
175                        self.workers
176                            .insert(
177                                node_id,
178                                (
179                                    node,
180                                    WorkerNodeState::Connected {
181                                        control_stream: ControlStreamNode {
182                                            worker_id: node_id as _,
183                                            host: node_host,
184                                            handle,
185                                        },
186                                        removed: false
187                                    }
188                                )
189                            )
190                            .is_none()
191                    );
192                    return;
193                }
194                Err(e) => {
195                    // It may happen that the dns information of newly registered worker node
196                    // has not been propagated to the meta node and cause error. Wait for a while and retry
197                    let delay = backoff.next().unwrap();
198                    error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
199                    sleep(delay).await;
200                }
201            }
202        }
203        error!(?node_host, "fail to create worker node after retry");
204    }
205
206    pub(super) fn remove_worker(&mut self, node: WorkerNode) {
207        if let Entry::Occupied(mut entry) = self.workers.entry(node.id) {
208            let (_, worker_state) = entry.get_mut();
209            match worker_state {
210                WorkerNodeState::Connected { removed, .. } => {
211                    info!(worker_id = %node.id, "mark connected worker as removed");
212                    *removed = true;
213                }
214                WorkerNodeState::Reconnecting(_) => {
215                    info!(worker_id = %node.id, "remove worker");
216                    entry.remove();
217                }
218            }
219        }
220    }
221
222    fn retry_connect(
223        node: WorkerNode,
224        term_id: String,
225        context: Arc<impl GlobalBarrierWorkerContext>,
226    ) -> BoxFuture<'static, StreamingControlHandle> {
227        async move {
228            let mut attempt = 0;
229            let backoff = ExponentialBackoff::from_millis(100)
230                .max_delay(Duration::from_mins(1))
231                .factor(5);
232            let init_request = PbInitRequest { term_id };
233            for delay in backoff {
234                attempt += 1;
235                sleep(delay).await;
236                match context.new_control_stream(&node, &init_request).await {
237                    Ok(handle) => {
238                        return handle;
239                    }
240                    Err(e) => {
241                        warn!(e = %e.as_report(), ?node, attempt, "fail to create control stream worker");
242                    }
243                }
244            }
245            unreachable!("end of retry backoff")
246        }.boxed()
247    }
248
249    pub(super) async fn recover(
250        env: MetaSrvEnv,
251        nodes: &HashMap<WorkerId, WorkerNode>,
252        term_id: &str,
253        context: Arc<impl GlobalBarrierWorkerContext>,
254    ) -> Self {
255        let reset_start_time = Instant::now();
256        let init_request = PbInitRequest {
257            term_id: term_id.to_owned(),
258        };
259        let init_request = &init_request;
260        let nodes = join_all(nodes.iter().map(|(worker_id, node)| async {
261            let result = context.new_control_stream(node, init_request).await;
262            (*worker_id, node.clone(), result)
263        }))
264        .await;
265        let mut unconnected_workers = HashSet::new();
266        let mut workers = HashMap::new();
267        for (worker_id, node, result) in nodes {
268            match result {
269                Ok(handle) => {
270                    let control_stream = ControlStreamNode {
271                        worker_id: node.id,
272                        host: node.host.clone().unwrap(),
273                        handle,
274                    };
275                    assert!(
276                        workers
277                            .insert(
278                                worker_id,
279                                (
280                                    node,
281                                    WorkerNodeState::Connected {
282                                        control_stream,
283                                        removed: false
284                                    }
285                                )
286                            )
287                            .is_none()
288                    );
289                }
290                Err(e) => {
291                    unconnected_workers.insert(worker_id);
292                    warn!(
293                        e = %e.as_report(),
294                        %worker_id,
295                        ?node,
296                        "failed to connect to node"
297                    );
298                    assert!(
299                        workers
300                            .insert(
301                                worker_id,
302                                (
303                                    node.clone(),
304                                    WorkerNodeState::Reconnecting(Self::retry_connect(
305                                        node,
306                                        term_id.to_owned(),
307                                        context.clone()
308                                    ))
309                                )
310                            )
311                            .is_none()
312                    );
313                }
314            }
315        }
316
317        info!(elapsed=?reset_start_time.elapsed(), ?unconnected_workers, "control stream reset");
318
319        Self { workers, env }
320    }
321
322    /// Clear all nodes and response streams in the manager.
323    pub(super) fn clear(&mut self) {
324        *self = Self::new(self.env.clone());
325    }
326}
327
328pub(super) struct WorkerNodeConnected<'a> {
329    node: &'a WorkerNode,
330    handle: &'a mut StreamingControlHandle,
331}
332
333impl<'a> WorkerNodeConnected<'a> {
334    pub(super) fn initialize(
335        self,
336        inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
337    ) {
338        for request in ControlStreamManager::collect_init_partial_graph(inflight_infos) {
339            if let Err(e) = self.handle.send_request(StreamingControlStreamRequest {
340                request: Some(
341                    streaming_control_stream_request::Request::CreatePartialGraph(request),
342                ),
343            }) {
344                warn!(e = %e.as_report(), node = ?self.node, "failed to send initial partial graph request");
345            }
346        }
347    }
348}
349
350pub(super) enum WorkerNodeEvent<'a> {
351    Response(MetaResult<streaming_control_stream_response::Response>),
352    Connected(WorkerNodeConnected<'a>),
353}
354
355impl ControlStreamManager {
356    fn poll_next_event<'a>(
357        this_opt: &mut Option<&'a mut Self>,
358        cx: &mut Context<'_>,
359        term_id: &str,
360        context: &Arc<impl GlobalBarrierWorkerContext>,
361        poll_reconnect: bool,
362    ) -> Poll<(WorkerId, WorkerNodeEvent<'a>)> {
363        let this = this_opt.as_mut().expect("Future polled after completion");
364        if this.workers.is_empty() {
365            return Poll::Pending;
366        }
367        {
368            for (&worker_id, (node, worker_state)) in &mut this.workers {
369                let control_stream = match worker_state {
370                    WorkerNodeState::Connected { control_stream, .. } => control_stream,
371                    WorkerNodeState::Reconnecting(_) if !poll_reconnect => {
372                        continue;
373                    }
374                    WorkerNodeState::Reconnecting(join_handle) => {
375                        match join_handle.poll_unpin(cx) {
376                            Poll::Ready(handle) => {
377                                info!(id=%node.id, host=?node.host, "reconnected to worker");
378                                *worker_state = WorkerNodeState::Connected {
379                                    control_stream: ControlStreamNode {
380                                        worker_id: node.id,
381                                        host: node.host.clone().unwrap(),
382                                        handle,
383                                    },
384                                    removed: false,
385                                };
386                                let this = this_opt.take().expect("should exist");
387                                let (node, worker_state) =
388                                    this.workers.get_mut(&worker_id).expect("should exist");
389                                let WorkerNodeState::Connected { control_stream, .. } =
390                                    worker_state
391                                else {
392                                    unreachable!()
393                                };
394                                return Poll::Ready((
395                                    worker_id,
396                                    WorkerNodeEvent::Connected(WorkerNodeConnected {
397                                        node,
398                                        handle: &mut control_stream.handle,
399                                    }),
400                                ));
401                            }
402                            Poll::Pending => {
403                                continue;
404                            }
405                        }
406                    }
407                };
408                match control_stream.handle.response_stream.poll_next_unpin(cx) {
409                    Poll::Ready(result) => {
410                        {
411                            let result = result
412                                .ok_or_else(|| (false, anyhow!("end of stream").into()))
413                                .and_then(|result| {
414                                    result.map_err(|err| -> (bool, MetaError) {(false, err.into())}).and_then(|resp| {
415                                        match resp
416                                            .response
417                                            .ok_or_else(|| (false, anyhow!("empty response").into()))?
418                                        {
419                                            streaming_control_stream_response::Response::Shutdown(_) => Err((true, anyhow!(
420                                                "worker node {worker_id} is shutting down"
421                                            )
422                                                .into())),
423                                            streaming_control_stream_response::Response::Init(_) => {
424                                                // This arm should be unreachable.
425                                                Err((false, anyhow!("get unexpected init response").into()))
426                                            }
427                                            resp => {
428                                                if let streaming_control_stream_response::Response::CompleteBarrier(barrier_resp) = &resp {
429                                                    assert_eq!(worker_id, barrier_resp.worker_id);
430                                                }
431                                                Ok(resp)
432                                            },
433                                        }
434                                    })
435                                });
436                            let result = match result {
437                                Ok(resp) => Ok(resp),
438                                Err((shutdown, err)) => {
439                                    warn!(worker_id = %node.id, host = ?node.host, err = %err.as_report(), "get error from response stream");
440                                    let WorkerNodeState::Connected { removed, .. } = worker_state
441                                    else {
442                                        unreachable!("checked connected")
443                                    };
444                                    if *removed || shutdown {
445                                        this.workers.remove(&worker_id);
446                                    } else {
447                                        *worker_state = WorkerNodeState::Reconnecting(
448                                            ControlStreamManager::retry_connect(
449                                                node.clone(),
450                                                term_id.to_owned(),
451                                                context.clone(),
452                                            ),
453                                        );
454                                    }
455                                    Err(err)
456                                }
457                            };
458                            return Poll::Ready((worker_id, WorkerNodeEvent::Response(result)));
459                        }
460                    }
461                    Poll::Pending => {
462                        continue;
463                    }
464                }
465            }
466        };
467
468        Poll::Pending
469    }
470
471    #[await_tree::instrument("control_stream_next_event")]
472    pub(super) async fn next_event<'a>(
473        &'a mut self,
474        term_id: &str,
475        context: &Arc<impl GlobalBarrierWorkerContext>,
476    ) -> (WorkerId, WorkerNodeEvent<'a>) {
477        let mut this = Some(self);
478        poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, true)).await
479    }
480
481    #[await_tree::instrument("control_stream_next_response")]
482    pub(super) async fn next_response(
483        &mut self,
484        term_id: &str,
485        context: &Arc<impl GlobalBarrierWorkerContext>,
486    ) -> (
487        WorkerId,
488        MetaResult<streaming_control_stream_response::Response>,
489    ) {
490        let mut this = Some(self);
491        let (worker_id, event) =
492            poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, false)).await;
493        match event {
494            WorkerNodeEvent::Response(result) => (worker_id, result),
495            WorkerNodeEvent::Connected(_) => {
496                unreachable!("set poll_reconnect=false")
497            }
498        }
499    }
500
501    fn collect_init_partial_graph(
502        initial_inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
503    ) -> impl Iterator<Item = PbCreatePartialGraphRequest> {
504        initial_inflight_infos.flat_map(|(database_id, creating_job_ids)| {
505            [PbCreatePartialGraphRequest {
506                partial_graph_id: to_partial_graph_id(None),
507                database_id,
508            }]
509            .into_iter()
510            .chain(
511                creating_job_ids.map(move |job_id| PbCreatePartialGraphRequest {
512                    partial_graph_id: to_partial_graph_id(Some(job_id)),
513                    database_id,
514                }),
515            )
516        })
517    }
518}
519
520pub(super) struct DatabaseInitialBarrierCollector {
521    database_id: DatabaseId,
522    node_to_collect: NodeToCollect,
523    database_state: BarrierWorkerState,
524    creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
525    committed_epoch: u64,
526    cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
527}
528
529impl Debug for DatabaseInitialBarrierCollector {
530    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
531        f.debug_struct("DatabaseInitialBarrierCollector")
532            .field("database_id", &self.database_id)
533            .field("node_to_collect", &self.node_to_collect)
534            .finish()
535    }
536}
537
538impl DatabaseInitialBarrierCollector {
539    pub(super) fn is_collected(&self) -> bool {
540        self.node_to_collect.is_empty()
541            && self
542                .creating_streaming_job_controls
543                .values()
544                .all(|job| job.is_empty())
545    }
546
547    pub(super) fn database_state(
548        &self,
549    ) -> (
550        &BarrierWorkerState,
551        &HashMap<JobId, CreatingStreamingJobControl>,
552    ) {
553        (&self.database_state, &self.creating_streaming_job_controls)
554    }
555
556    pub(super) fn collect_resp(&mut self, resp: BarrierCompleteResponse) {
557        assert_eq!(self.database_id, resp.database_id);
558        if let Some(creating_job_id) = from_partial_graph_id(resp.partial_graph_id) {
559            self.creating_streaming_job_controls
560                .get_mut(&creating_job_id)
561                .expect("should exist")
562                .collect(resp);
563        } else {
564            assert_eq!(resp.epoch, self.committed_epoch);
565            assert!(self.node_to_collect.remove(&resp.worker_id).is_some());
566        }
567    }
568
569    pub(super) fn finish(self) -> DatabaseCheckpointControl {
570        assert!(self.is_collected());
571        DatabaseCheckpointControl::recovery(
572            self.database_id,
573            self.database_state,
574            self.committed_epoch,
575            self.creating_streaming_job_controls,
576            self.cdc_table_backfill_tracker,
577        )
578    }
579
580    pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
581        is_valid_after_worker_err(&mut self.node_to_collect, worker_id)
582            && self
583                .creating_streaming_job_controls
584                .values_mut()
585                .all(|job| job.is_valid_after_worker_err(worker_id))
586    }
587}
588
589impl ControlStreamManager {
590    /// Extract information from the loaded runtime barrier worker snapshot info, and inject the initial barrier.
591    #[expect(clippy::too_many_arguments)]
592    pub(super) fn inject_database_initial_barrier(
593        &mut self,
594        database_id: DatabaseId,
595        jobs: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
596        state_table_committed_epochs: &mut HashMap<TableId, u64>,
597        state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
598        edges: &mut FragmentEdgeBuildResult,
599        stream_actors: &HashMap<ActorId, StreamActor>,
600        source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
601        background_jobs: &mut HashMap<JobId, String>,
602        mv_depended_subscriptions: &mut HashMap<TableId, HashMap<SubscriptionId, u64>>,
603        is_paused: bool,
604        hummock_version_stats: &HummockVersionStats,
605        cdc_table_snapshot_split_assignment: &mut CdcTableSnapshotSplitAssignmentWithGeneration,
606    ) -> MetaResult<DatabaseInitialBarrierCollector> {
607        self.add_partial_graph(database_id, None);
608        let source_split_assignments = jobs
609            .values()
610            .flat_map(|fragments| fragments.values())
611            .flat_map(|info| info.actors.keys())
612            .filter_map(|actor_id| {
613                let actor_id = *actor_id as ActorId;
614                source_splits
615                    .remove(&actor_id)
616                    .map(|splits| (actor_id, splits))
617            })
618            .collect();
619        let database_cdc_table_snapshot_split_assignment = jobs
620            .values()
621            .flat_map(|fragments| fragments.values())
622            .flat_map(|info| info.actors.keys())
623            .filter_map(|actor_id| {
624                let actor_id = *actor_id as ActorId;
625                cdc_table_snapshot_split_assignment
626                    .splits
627                    .remove(&actor_id)
628                    .map(|splits| (actor_id, splits))
629            })
630            .collect();
631        let database_cdc_table_snapshot_split_assignment =
632            CdcTableSnapshotSplitAssignmentWithGeneration::new(
633                database_cdc_table_snapshot_split_assignment,
634                cdc_table_snapshot_split_assignment.generation,
635            );
636        let mutation = Mutation::Add(AddMutation {
637            // Actors built during recovery is not treated as newly added actors.
638            actor_dispatchers: Default::default(),
639            added_actors: Default::default(),
640            actor_splits: build_actor_connector_splits(&source_split_assignments),
641            actor_cdc_table_snapshot_splits:
642                build_pb_actor_cdc_table_snapshot_splits_with_generation(
643                    database_cdc_table_snapshot_split_assignment,
644                )
645                .into(),
646            pause: is_paused,
647            subscriptions_to_add: Default::default(),
648            // TODO(kwannoel): recover using backfill order plan
649            backfill_nodes_to_pause: Default::default(),
650            new_upstream_sinks: Default::default(),
651        });
652
653        fn resolve_jobs_committed_epoch<'a>(
654            state_table_committed_epochs: &mut HashMap<TableId, u64>,
655            fragments: impl Iterator<Item = &'a InflightFragmentInfo> + 'a,
656        ) -> u64 {
657            let mut epochs = InflightFragmentInfo::existing_table_ids(fragments).map(|table_id| {
658                (
659                    table_id,
660                    state_table_committed_epochs
661                        .remove(&table_id)
662                        .expect("should exist"),
663                )
664            });
665            let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
666            for (table_id, epoch) in epochs {
667                assert_eq!(
668                    prev_epoch, epoch,
669                    "{} has different committed epoch to {}",
670                    first_table_id, table_id
671                );
672            }
673            prev_epoch
674        }
675
676        let mut subscribers: HashMap<_, HashMap<_, _>> = jobs
677            .keys()
678            .filter_map(|job_id| {
679                mv_depended_subscriptions
680                    .remove(&job_id.as_mv_table_id())
681                    .map(|subscriptions| {
682                        (
683                            job_id.as_mv_table_id(),
684                            subscriptions
685                                .into_iter()
686                                .map(|(subscription_id, retention)| {
687                                    (
688                                        subscription_id.as_raw_id(),
689                                        SubscriberType::Subscription(retention),
690                                    )
691                                })
692                                .collect(),
693                        )
694                    })
695            })
696            .collect();
697
698        let mut database_jobs = HashMap::new();
699        let mut snapshot_backfill_jobs = HashMap::new();
700
701        for (job_id, job_fragments) in jobs {
702            if let Some(definition) = background_jobs.remove(&job_id) {
703                if job_fragments.values().any(|fragment| {
704                    fragment
705                        .fragment_type_mask
706                        .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
707                }) {
708                    debug!(%job_id, definition, "recovered snapshot backfill job");
709                    snapshot_backfill_jobs.insert(job_id, (job_fragments, definition));
710                } else {
711                    database_jobs.insert(job_id, (job_fragments, Some(definition)));
712                }
713            } else {
714                database_jobs.insert(job_id, (job_fragments, None));
715            }
716        }
717
718        let database_job_log_epochs: HashMap<_, _> = database_jobs
719            .keys()
720            .filter_map(|job_id| {
721                state_table_log_epochs
722                    .remove(&job_id.as_mv_table_id())
723                    .map(|epochs| (job_id.as_mv_table_id(), epochs))
724            })
725            .collect();
726
727        let prev_epoch = resolve_jobs_committed_epoch(
728            state_table_committed_epochs,
729            database_jobs.values().flat_map(|(job, _)| job.values()),
730        );
731        let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
732        // Use a different `curr_epoch` for each recovery attempt.
733        let curr_epoch = prev_epoch.next();
734        let barrier_info = BarrierInfo {
735            prev_epoch,
736            curr_epoch,
737            kind: BarrierKind::Initial,
738        };
739
740        let mut ongoing_snapshot_backfill_jobs: HashMap<JobId, _> = HashMap::new();
741        for (job_id, (fragment_infos, definition)) in snapshot_backfill_jobs {
742            let committed_epoch =
743                resolve_jobs_committed_epoch(state_table_committed_epochs, fragment_infos.values());
744            if committed_epoch == barrier_info.prev_epoch() {
745                info!(
746                    "recovered creating snapshot backfill job {} catch up with upstream already",
747                    job_id
748                );
749                database_jobs
750                    .try_insert(job_id, (fragment_infos, Some(definition)))
751                    .expect("non-duplicate");
752                continue;
753            }
754            let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
755                fragment_infos
756                    .values()
757                    .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
758            )?
759            .0
760            .ok_or_else(|| {
761                anyhow!(
762                    "recovered snapshot backfill job {} has no snapshot backfill info",
763                    job_id
764                )
765            })?;
766            let mut snapshot_epoch = None;
767            let upstream_table_ids: HashSet<_> = snapshot_backfill_info
768                .upstream_mv_table_id_to_backfill_epoch
769                .keys()
770                .cloned()
771                .collect();
772            for (upstream_table_id, epoch) in
773                snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
774            {
775                let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
776                let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
777                if *snapshot_epoch != epoch {
778                    return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
779                }
780            }
781            let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
782                anyhow!(
783                    "snapshot backfill job {} has not set snapshot epoch",
784                    job_id
785                )
786            })?;
787            for upstream_table_id in &upstream_table_ids {
788                subscribers
789                    .entry(*upstream_table_id)
790                    .or_default()
791                    .try_insert(job_id.as_raw_id(), SubscriberType::SnapshotBackfill)
792                    .expect("non-duplicate");
793            }
794            ongoing_snapshot_backfill_jobs
795                .try_insert(
796                    job_id,
797                    (
798                        fragment_infos,
799                        definition,
800                        upstream_table_ids,
801                        committed_epoch,
802                        snapshot_epoch,
803                    ),
804                )
805                .expect("non-duplicated");
806        }
807
808        let database_jobs: HashMap<JobId, InflightStreamingJobInfo> = {
809            database_jobs
810                .into_iter()
811                .map(|(job_id, (fragment_infos, background_job_definition))| {
812                    let status = if let Some(definition) = background_job_definition {
813                        CreateStreamingJobStatus::Creating(CreateMviewProgressTracker::recover(
814                            job_id,
815                            definition,
816                            &fragment_infos,
817                            Default::default(),
818                            hummock_version_stats,
819                        ))
820                    } else {
821                        CreateStreamingJobStatus::Created
822                    };
823                    (
824                        job_id,
825                        InflightStreamingJobInfo {
826                            job_id,
827                            fragment_infos,
828                            subscribers: subscribers
829                                .remove(&job_id.as_mv_table_id())
830                                .unwrap_or_default(),
831                            status,
832                        },
833                    )
834                })
835                .collect()
836        };
837
838        let node_to_collect = {
839            let node_actors =
840                edges.collect_actors_to_create(database_jobs.values().flat_map(move |job| {
841                    job.fragment_infos.values().map(move |fragment_info| {
842                        (
843                            fragment_info.fragment_id,
844                            &fragment_info.nodes,
845                            fragment_info.actors.iter().map(move |(actor_id, actor)| {
846                                (
847                                    stream_actors.get(actor_id).expect("should exist"),
848                                    actor.worker_id,
849                                )
850                            }),
851                            job.subscribers.keys().copied(),
852                        )
853                    })
854                }));
855
856            let node_to_collect = self.inject_barrier(
857                database_id,
858                None,
859                Some(mutation.clone()),
860                &barrier_info,
861                database_jobs.values().flatten(),
862                database_jobs.values().flatten(),
863                Some(node_actors),
864            )?;
865            debug!(
866                ?node_to_collect,
867                %database_id,
868                "inject initial barrier"
869            );
870            node_to_collect
871        };
872
873        let mut creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl> =
874            HashMap::new();
875        for (job_id, (info, definition, upstream_table_ids, committed_epoch, snapshot_epoch)) in
876            ongoing_snapshot_backfill_jobs
877        {
878            let node_actors = edges.collect_actors_to_create(info.values().map(|fragment_info| {
879                (
880                    fragment_info.fragment_id,
881                    &fragment_info.nodes,
882                    fragment_info.actors.iter().map(move |(actor_id, actor)| {
883                        (
884                            stream_actors.get(actor_id).expect("should exist"),
885                            actor.worker_id,
886                        )
887                    }),
888                    vec![], // no subscribers for backfilling jobs,
889                )
890            }));
891
892            creating_streaming_job_controls.insert(
893                job_id,
894                CreatingStreamingJobControl::recover(
895                    database_id,
896                    job_id,
897                    definition,
898                    upstream_table_ids,
899                    &database_job_log_epochs,
900                    snapshot_epoch,
901                    committed_epoch,
902                    barrier_info.curr_epoch.value().0,
903                    info,
904                    hummock_version_stats,
905                    node_actors,
906                    mutation.clone(),
907                    self,
908                )?,
909            );
910        }
911
912        self.env.shared_actor_infos().recover_database(
913            database_id,
914            database_jobs
915                .values()
916                .flat_map(|info| {
917                    info.fragment_infos()
918                        .map(move |fragment| (fragment, info.job_id))
919                })
920                .chain(
921                    creating_streaming_job_controls
922                        .iter()
923                        .flat_map(|(job_id, job)| {
924                            job.graph_info()
925                                .values()
926                                .map(|fragment| (fragment, *job_id))
927                        }),
928                ),
929        );
930
931        let committed_epoch = barrier_info.prev_epoch();
932        let new_epoch = barrier_info.curr_epoch;
933        let database_state = BarrierWorkerState::recovery(
934            database_id,
935            self.env.shared_actor_infos().clone(),
936            new_epoch,
937            database_jobs.into_values(),
938            is_paused,
939        );
940        let cdc_table_backfill_tracker = self.env.cdc_table_backfill_tracker();
941        Ok(DatabaseInitialBarrierCollector {
942            database_id,
943            node_to_collect,
944            database_state,
945            creating_streaming_job_controls,
946            committed_epoch,
947            cdc_table_backfill_tracker,
948        })
949    }
950
951    pub(super) fn inject_command_ctx_barrier(
952        &mut self,
953        database_id: DatabaseId,
954        command: Option<&Command>,
955        barrier_info: &BarrierInfo,
956        is_paused: bool,
957        pre_applied_graph_info: &InflightDatabaseInfo,
958        applied_graph_info: &InflightDatabaseInfo,
959        edges: &mut Option<FragmentEdgeBuildResult>,
960    ) -> MetaResult<NodeToCollect> {
961        let mutation = command.and_then(|c| c.to_mutation(is_paused, edges, self));
962        self.inject_barrier(
963            database_id,
964            None,
965            mutation,
966            barrier_info,
967            pre_applied_graph_info.fragment_infos(),
968            applied_graph_info.fragment_infos(),
969            command
970                .as_ref()
971                .map(|command| command.actors_to_create(pre_applied_graph_info, edges, self))
972                .unwrap_or_default(),
973        )
974    }
975
976    fn connected_workers(&self) -> impl Iterator<Item = (WorkerId, &ControlStreamNode)> + '_ {
977        self.workers
978            .iter()
979            .filter_map(|(worker_id, (_, worker_state))| match worker_state {
980                WorkerNodeState::Connected { control_stream, .. } => {
981                    Some((*worker_id, control_stream))
982                }
983                WorkerNodeState::Reconnecting(_) => None,
984            })
985    }
986
987    pub(super) fn inject_barrier<'a>(
988        &mut self,
989        database_id: DatabaseId,
990        creating_job_id: Option<JobId>,
991        mutation: Option<Mutation>,
992        barrier_info: &BarrierInfo,
993        pre_applied_graph_info: impl IntoIterator<Item = &InflightFragmentInfo>,
994        applied_graph_info: impl IntoIterator<Item = &'a InflightFragmentInfo> + 'a,
995        mut new_actors: Option<StreamJobActorsToCreate>,
996    ) -> MetaResult<NodeToCollect> {
997        fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
998            "inject_barrier_err"
999        ));
1000
1001        let partial_graph_id = to_partial_graph_id(creating_job_id);
1002
1003        let node_actors = InflightFragmentInfo::actor_ids_to_collect(pre_applied_graph_info);
1004
1005        for worker_id in node_actors.keys() {
1006            if let Some((_, worker_state)) = self.workers.get(worker_id)
1007                && let WorkerNodeState::Connected { .. } = worker_state
1008            {
1009            } else {
1010                return Err(anyhow!("unconnected worker node {}", worker_id).into());
1011            }
1012        }
1013
1014        let table_ids_to_sync: HashSet<_> =
1015            InflightFragmentInfo::existing_table_ids(applied_graph_info).collect();
1016
1017        let mut node_need_collect = HashMap::new();
1018
1019        self.connected_workers()
1020            .try_for_each(|(node_id, node)| {
1021                let actor_ids_to_collect = node_actors
1022                    .get(&node_id)
1023                    .map(|actors| actors.iter().cloned())
1024                    .into_iter()
1025                    .flatten()
1026                    .collect_vec();
1027                let is_empty = actor_ids_to_collect.is_empty();
1028                {
1029                    let mutation = mutation.clone();
1030                    let barrier = Barrier {
1031                        epoch: Some(risingwave_pb::data::Epoch {
1032                            curr: barrier_info.curr_epoch.value().0,
1033                            prev: barrier_info.prev_epoch(),
1034                        }),
1035                        mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
1036                        tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
1037                            .to_protobuf(),
1038                        kind: barrier_info.kind.to_protobuf() as i32,
1039                        passed_actors: vec![],
1040                    };
1041
1042                    node.handle
1043                        .request_sender
1044                        .send(StreamingControlStreamRequest {
1045                            request: Some(
1046                                streaming_control_stream_request::Request::InjectBarrier(
1047                                    InjectBarrierRequest {
1048                                        request_id: Uuid::new_v4().to_string(),
1049                                        barrier: Some(barrier),
1050                                        database_id,
1051                                        actor_ids_to_collect,
1052                                        table_ids_to_sync: table_ids_to_sync
1053                                            .iter()
1054                                            .cloned()
1055                                            .collect(),
1056                                        partial_graph_id,
1057                                        actors_to_build: new_actors
1058                                            .as_mut()
1059                                            .map(|new_actors| new_actors.remove(&(node_id as _)))
1060                                            .into_iter()
1061                                            .flatten()
1062                                            .flatten()
1063                                            .map(|(fragment_id, (node, actors, initial_subscriber_ids))| {
1064                                                FragmentBuildActorInfo {
1065                                                    fragment_id,
1066                                                    node: Some(node),
1067                                                    actors: actors
1068                                                        .into_iter()
1069                                                        .map(|(actor, upstreams, dispatchers)| {
1070                                                            BuildActorInfo {
1071                                                                actor_id: actor.actor_id,
1072                                                                fragment_upstreams: upstreams
1073                                                                    .into_iter()
1074                                                                    .map(|(fragment_id, upstreams)| {
1075                                                                        (
1076                                                                            fragment_id,
1077                                                                            UpstreamActors {
1078                                                                                actors: upstreams
1079                                                                                    .into_values()
1080                                                                                    .collect(),
1081                                                                            },
1082                                                                        )
1083                                                                    })
1084                                                                    .collect(),
1085                                                                dispatchers,
1086                                                                vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
1087                                                                mview_definition: actor.mview_definition,
1088                                                                expr_context: actor.expr_context,
1089                                                                config_override: actor.config_override.to_string(),
1090                                                                initial_subscriber_ids: initial_subscriber_ids.iter().copied().collect(),
1091                                                            }
1092                                                        })
1093                                                        .collect(),
1094                                                }
1095                                            })
1096                                            .collect(),
1097                                    },
1098                                ),
1099                            ),
1100                        })
1101                        .map_err(|_| {
1102                            MetaError::from(anyhow!(
1103                                "failed to send request to {} {:?}",
1104                                node.worker_id,
1105                                node.host
1106                            ))
1107                        })?;
1108
1109                    node_need_collect.insert(node_id as WorkerId, is_empty);
1110                    Result::<_, MetaError>::Ok(())
1111                }
1112            })
1113            .inspect_err(|e| {
1114                // Record failure in event log.
1115                use risingwave_pb::meta::event_log;
1116                let event = event_log::EventInjectBarrierFail {
1117                    prev_epoch: barrier_info.prev_epoch(),
1118                    cur_epoch: barrier_info.curr_epoch.value().0,
1119                    error: e.to_report_string(),
1120                };
1121                self.env
1122                    .event_log_manager_ref()
1123                    .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
1124            })?;
1125        Ok(node_need_collect)
1126    }
1127
1128    pub(super) fn add_partial_graph(
1129        &mut self,
1130        database_id: DatabaseId,
1131        creating_job_id: Option<JobId>,
1132    ) {
1133        let partial_graph_id = to_partial_graph_id(creating_job_id);
1134        self.connected_workers().for_each(|(_, node)| {
1135            if node
1136                .handle
1137                .request_sender
1138                .send(StreamingControlStreamRequest {
1139                    request: Some(
1140                        streaming_control_stream_request::Request::CreatePartialGraph(
1141                            CreatePartialGraphRequest {
1142                                database_id,
1143                                partial_graph_id,
1144                            },
1145                        ),
1146                    ),
1147                }).is_err() {
1148                warn!(%database_id, ?creating_job_id, worker_id = %node.worker_id, "fail to add partial graph to worker")
1149            }
1150        });
1151    }
1152
1153    pub(super) fn remove_partial_graph(
1154        &mut self,
1155        database_id: DatabaseId,
1156        creating_job_ids: Vec<JobId>,
1157    ) {
1158        if creating_job_ids.is_empty() {
1159            return;
1160        }
1161        let partial_graph_ids = creating_job_ids
1162            .into_iter()
1163            .map(|job_id| to_partial_graph_id(Some(job_id)))
1164            .collect_vec();
1165        self.connected_workers().for_each(|(_, node)| {
1166            if node.handle
1167                .request_sender
1168                .send(StreamingControlStreamRequest {
1169                    request: Some(
1170                        streaming_control_stream_request::Request::RemovePartialGraph(
1171                            RemovePartialGraphRequest {
1172                                partial_graph_ids: partial_graph_ids.clone(),
1173                                database_id,
1174                            },
1175                        ),
1176                    ),
1177                })
1178                .is_err()
1179            {
1180                warn!(worker_id = %node.worker_id,node = ?node.host,"failed to send remove partial graph request");
1181            }
1182        })
1183    }
1184
1185    pub(super) fn reset_database(
1186        &mut self,
1187        database_id: DatabaseId,
1188        reset_request_id: u32,
1189    ) -> HashSet<WorkerId> {
1190        self.connected_workers()
1191            .filter_map(|(worker_id, node)| {
1192                if node
1193                    .handle
1194                    .request_sender
1195                    .send(StreamingControlStreamRequest {
1196                        request: Some(streaming_control_stream_request::Request::ResetDatabase(
1197                            ResetDatabaseRequest {
1198                                database_id,
1199                                reset_request_id,
1200                            },
1201                        )),
1202                    })
1203                    .is_err()
1204                {
1205                    warn!(%worker_id, node = ?node.host,"failed to send reset database request");
1206                    None
1207                } else {
1208                    Some(worker_id)
1209                }
1210            })
1211            .collect()
1212    }
1213}
1214
1215impl GlobalBarrierWorkerContextImpl {
1216    pub(super) async fn new_control_stream_impl(
1217        &self,
1218        node: &WorkerNode,
1219        init_request: &PbInitRequest,
1220    ) -> MetaResult<StreamingControlHandle> {
1221        let handle = self
1222            .env
1223            .stream_client_pool()
1224            .get(node)
1225            .await?
1226            .start_streaming_control(init_request.clone())
1227            .await?;
1228        Ok(handle)
1229    }
1230}
1231
1232pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1233    message: &str,
1234    errors: impl IntoIterator<Item = (WorkerId, E)>,
1235) -> MetaError {
1236    use std::fmt::Write;
1237
1238    use risingwave_common::error::error_request_copy;
1239    use risingwave_common::error::tonic::extra::Score;
1240
1241    let errors = errors.into_iter().collect_vec();
1242
1243    if errors.is_empty() {
1244        return anyhow!(message.to_owned()).into();
1245    }
1246
1247    // Create the error from the single error.
1248    let single_error = |(worker_id, e)| {
1249        anyhow::Error::from(e)
1250            .context(format!("{message}, in worker node {worker_id}"))
1251            .into()
1252    };
1253
1254    if errors.len() == 1 {
1255        return single_error(errors.into_iter().next().unwrap());
1256    }
1257
1258    // Find the error with the highest score.
1259    let max_score = errors
1260        .iter()
1261        .filter_map(|(_, e)| error_request_copy::<Score>(e))
1262        .max();
1263
1264    if let Some(max_score) = max_score {
1265        let mut errors = errors;
1266        let max_scored = errors
1267            .extract_if(.., |(_, e)| {
1268                error_request_copy::<Score>(e) == Some(max_score)
1269            })
1270            .next()
1271            .unwrap();
1272
1273        return single_error(max_scored);
1274    }
1275
1276    // The errors do not have scores, so simply concatenate them.
1277    let concat: String = errors
1278        .into_iter()
1279        .fold(format!("{message}: "), |mut s, (w, e)| {
1280            write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1281            s
1282        });
1283    anyhow!(concat).into()
1284}