risingwave_meta/barrier/
worker.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::pin::pin;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use arc_swap::ArcSwap;
24use futures::{TryFutureExt, pin_mut};
25use itertools::Itertools;
26use risingwave_common::catalog::DatabaseId;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::system_param::{AdaptiveParallelismStrategy, PAUSE_ON_NEXT_BOOTSTRAP_KEY};
29use risingwave_meta_model::WorkerId;
30use risingwave_pb::common::WorkerNode;
31use risingwave_pb::meta::Recovery;
32use risingwave_pb::meta::subscribe_response::{Info, Operation};
33use thiserror_ext::AsReport;
34use tokio::select;
35use tokio::sync::mpsc;
36use tokio::sync::oneshot::{Receiver, Sender};
37use tokio::task::JoinHandle;
38use tonic::Status;
39use tracing::{Instrument, debug, error, info, warn};
40
41use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
42use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
43use crate::barrier::context::recovery::{RenderedDatabaseRuntimeInfo, render_runtime_info};
44use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
45use crate::barrier::info::InflightDatabaseInfo;
46use crate::barrier::rpc::{
47    DatabaseInitialBarrierCollector, database_partial_graphs, from_partial_graph_id,
48    merge_node_rpc_errors,
49};
50use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
51use crate::barrier::{
52    BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command,
53    RecoveryReason, RescheduleContext, UpdateDatabaseBarrierRequest, schedule,
54};
55use crate::controller::scale::{materialize_actor_assignments, preview_actor_assignments};
56use crate::error::MetaErrorInner;
57use crate::hummock::HummockManagerRef;
58use crate::manager::sink_coordination::SinkCoordinatorManager;
59use crate::manager::{
60    ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
61    MetadataManager,
62};
63use crate::rpc::metrics::GLOBAL_META_METRICS;
64use crate::stream::{
65    GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef, build_reschedule_commands,
66    rendered_layout_matches_current,
67};
68use crate::{MetaError, MetaResult};
69
70/// [`crate::barrier::worker::GlobalBarrierWorker`] sends barriers to all registered compute nodes and
71/// collect them, with monotonic increasing epoch numbers. On compute nodes, `LocalBarrierManager`
72/// in `risingwave_stream` crate will serve these requests and dispatch them to source actors.
73///
74/// Configuration change in our system is achieved by the mutation in the barrier. Thus,
75/// [`crate::barrier::worker::GlobalBarrierWorker`] provides a set of interfaces like a state machine,
76/// accepting [`crate::barrier::command::Command`] that carries info to build `Mutation`. To keep the consistency between
77/// barrier manager and meta store, some actions like "drop materialized view" or "create mv on mv"
78/// must be done in barrier manager transactional using [`crate::barrier::command::Command`].
79pub(super) struct GlobalBarrierWorker<C> {
80    /// Enable recovery or not when failover.
81    enable_recovery: bool,
82
83    /// The queue of scheduled barriers.
84    periodic_barriers: PeriodicBarriers,
85
86    /// Whether per database failure isolation is enabled in system parameters.
87    system_enable_per_database_isolation: bool,
88
89    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
90
91    pub(super) context: Arc<C>,
92
93    env: MetaSrvEnv,
94
95    checkpoint_control: CheckpointControl,
96
97    /// Command that has been collected but is still completing.
98    /// The join handle of the completing future is stored.
99    completing_task: CompletingTask,
100
101    request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
102
103    active_streaming_nodes: ActiveStreamingWorkerNodes,
104
105    partial_graph_manager: PartialGraphManager,
106}
107
108#[cfg(test)]
109mod tests {
110    use std::collections::HashMap;
111
112    use tokio::sync::oneshot;
113
114    use super::*;
115    use crate::barrier::RescheduleContext;
116    use crate::barrier::notifier::Notifier;
117
118    #[tokio::test]
119    async fn test_reschedule_intent_without_workers_notifies_start_failed() {
120        let env = MetaSrvEnv::for_test().await;
121        let database_id = DatabaseId::new(1);
122        let database_info =
123            InflightDatabaseInfo::empty(database_id, env.shared_actor_infos().clone());
124        let (started_tx, started_rx) = oneshot::channel();
125        let (_collected_tx, _collected_rx) = oneshot::channel();
126
127        let notifier = Notifier {
128            started: Some(started_tx),
129            collected: Some(_collected_tx),
130        };
131
132        let new_barrier = schedule::NewBarrier {
133            database_id,
134            command: Some((
135                Command::RescheduleIntent {
136                    context: RescheduleContext::empty(),
137                    reschedule_plan: None,
138                },
139                vec![notifier],
140            )),
141            span: tracing::Span::none(),
142            checkpoint: false,
143        };
144
145        let result = resolve_reschedule_intent(
146            env,
147            HashMap::new(),
148            AdaptiveParallelismStrategy::default(),
149            Some(&database_info),
150            new_barrier,
151        );
152
153        assert!(matches!(result, Ok(None)));
154        let started = started_rx.await.expect("started notifier dropped");
155        assert!(started.is_err());
156    }
157}
158
159impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
160    pub(super) async fn new_inner(
161        env: MetaSrvEnv,
162        request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
163        context: Arc<C>,
164    ) -> Self {
165        let enable_recovery = env.opts.enable_recovery;
166
167        let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized();
168
169        let partial_graph_manager = PartialGraphManager::uninitialized(env.clone());
170
171        let reader = env.system_params_reader().await;
172        let system_enable_per_database_isolation = reader.per_database_isolation();
173        // Load config will be performed in bootstrap phase.
174        let periodic_barriers = PeriodicBarriers::default();
175        let adaptive_parallelism_strategy = reader.adaptive_parallelism_strategy();
176
177        let checkpoint_control = CheckpointControl::new(env.clone());
178        Self {
179            enable_recovery,
180            periodic_barriers,
181            system_enable_per_database_isolation,
182            adaptive_parallelism_strategy,
183            context,
184            env,
185            checkpoint_control,
186            completing_task: CompletingTask::None,
187            request_rx,
188            active_streaming_nodes,
189            partial_graph_manager,
190        }
191    }
192}
193
194fn resolve_reschedule_intent(
195    env: MetaSrvEnv,
196    worker_nodes: HashMap<WorkerId, WorkerNode>,
197    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
198    database_info: Option<&InflightDatabaseInfo>,
199    mut new_barrier: schedule::NewBarrier,
200) -> MetaResult<Option<schedule::NewBarrier>> {
201    let Some((command, notifiers)) = new_barrier.command.take() else {
202        return Ok(Some(new_barrier));
203    };
204
205    match command {
206        Command::RescheduleIntent {
207            context,
208            reschedule_plan,
209        } => {
210            if let Some(reschedule_plan) = reschedule_plan {
211                new_barrier.command = Some((
212                    Command::RescheduleIntent {
213                        context,
214                        reschedule_plan: Some(reschedule_plan),
215                    },
216                    notifiers,
217                ));
218                return Ok(Some(new_barrier));
219            }
220            let span = tracing::info_span!(
221                "resolve_reschedule_intent",
222                database_id = %new_barrier.database_id
223            );
224            let reschedule_plan = {
225                let _guard = span.enter();
226                build_reschedule_from_context(
227                    &env,
228                    worker_nodes,
229                    adaptive_parallelism_strategy,
230                    new_barrier.database_id,
231                    context,
232                    database_info.ok_or_else(|| {
233                        anyhow!(
234                            "database {} not found when resolving reschedule intent",
235                            new_barrier.database_id
236                        )
237                    })?,
238                )
239            };
240            match reschedule_plan {
241                Ok(Some(reschedule_plan)) => {
242                    new_barrier.command = Some((
243                        Command::RescheduleIntent {
244                            context: RescheduleContext::empty(),
245                            reschedule_plan: Some(reschedule_plan),
246                        },
247                        notifiers,
248                    ));
249                    Ok(Some(new_barrier))
250                }
251                Ok(None) => {
252                    // No-op intent: notify to unblock callers even though no barrier is injected.
253                    for mut notifier in notifiers {
254                        notifier.notify_started();
255                        notifier.notify_collected();
256                    }
257                    Ok(None)
258                }
259                Err(err) => {
260                    for notifier in notifiers {
261                        notifier.notify_start_failed(err.clone());
262                    }
263                    Ok(None)
264                }
265            }
266        }
267        _ => {
268            new_barrier.command = Some((command, notifiers));
269            Ok(Some(new_barrier))
270        }
271    }
272}
273
274fn build_reschedule_from_context(
275    env: &MetaSrvEnv,
276    worker_nodes: HashMap<WorkerId, WorkerNode>,
277    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
278    database_id: DatabaseId,
279    context: RescheduleContext,
280    database_info: &InflightDatabaseInfo,
281) -> MetaResult<Option<crate::barrier::ReschedulePlan>> {
282    if worker_nodes.is_empty() {
283        return Err(anyhow!("no active streaming workers for reschedule").into());
284    }
285
286    if context.is_empty() {
287        return Ok(None);
288    }
289
290    // Barrier worker resolves this intent against a stable in-flight snapshot.
291    // Reuse the same fragment view for preview comparison and command building.
292    let all_prev_fragments = database_info
293        .fragment_infos()
294        .map(|fragment| (fragment.fragment_id, fragment))
295        .collect();
296
297    let previewed = preview_actor_assignments(
298        &worker_nodes,
299        adaptive_parallelism_strategy,
300        &context.loaded,
301    )?;
302
303    if rendered_layout_matches_current(&previewed.fragments, &all_prev_fragments)? {
304        return Ok(None);
305    }
306
307    let actor_id_counter = env.actor_id_generator();
308    // Materialization only replaces preview actor ids with real ids. Worker
309    // placement, vnode ownership, and split assignment remain unchanged.
310    let rendered = materialize_actor_assignments(actor_id_counter, previewed);
311    let mut commands = build_reschedule_commands(rendered.fragments, context, all_prev_fragments)?;
312    Ok(commands.remove(&database_id))
313}
314
315impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
316    /// Create a new [`crate::barrier::worker::GlobalBarrierWorker`].
317    pub async fn new(
318        scheduled_barriers: schedule::ScheduledBarriers,
319        env: MetaSrvEnv,
320        metadata_manager: MetadataManager,
321        hummock_manager: HummockManagerRef,
322        source_manager: SourceManagerRef,
323        sink_manager: SinkCoordinatorManager,
324        scale_controller: ScaleControllerRef,
325        request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
326        barrier_scheduler: schedule::BarrierScheduler,
327        refresh_manager: GlobalRefreshManagerRef,
328    ) -> Self {
329        let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
330
331        let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
332            scheduled_barriers,
333            status,
334            metadata_manager,
335            hummock_manager,
336            source_manager,
337            scale_controller,
338            env.clone(),
339            barrier_scheduler,
340            refresh_manager,
341            sink_manager,
342        ));
343
344        Self::new_inner(env, request_rx, context).await
345    }
346
347    pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
348        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
349        let fut = (self.env.await_tree_reg())
350            .register_derived_root("Global Barrier Worker")
351            .instrument(self.run(shutdown_rx));
352        let join_handle = tokio::spawn(fut);
353
354        (join_handle, shutdown_tx)
355    }
356
357    /// Check whether we should pause on bootstrap from the system parameter and reset it.
358    async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
359        let paused = self
360            .env
361            .system_params_reader()
362            .await
363            .pause_on_next_bootstrap()
364            || self.env.opts.pause_on_next_bootstrap_offline;
365
366        if paused {
367            warn!(
368                "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
369                 It will now be reset to `false`. \
370                 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
371                PAUSE_ON_NEXT_BOOTSTRAP_KEY
372            );
373            self.env
374                .system_params_manager_impl_ref()
375                .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
376                .await?;
377        }
378        Ok(paused)
379    }
380
381    /// Start an infinite loop to take scheduled barriers and send them.
382    async fn run(mut self, shutdown_rx: Receiver<()>) {
383        tracing::info!(
384            "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
385            self.enable_recovery,
386            self.checkpoint_control.in_flight_barrier_nums,
387        );
388
389        if !self.enable_recovery {
390            let job_exist = self
391                .context
392                .metadata_manager
393                .catalog_controller
394                .has_any_streaming_jobs()
395                .await
396                .unwrap();
397            if job_exist {
398                panic!(
399                    "Some streaming jobs already exist in meta, please start with recovery enabled \
400                or clean up the metadata using `./risedev clean-data`"
401                );
402            }
403        }
404
405        {
406            // Bootstrap recovery. Here we simply trigger a recovery process to achieve the
407            // consistency.
408            // Even if there's no actor to recover, we still go through the recovery process to
409            // inject the first `Initial` barrier.
410            let span = tracing::info_span!("bootstrap_recovery");
411            crate::telemetry::report_event(
412                risingwave_pb::telemetry::TelemetryEventStage::Recovery,
413                "normal_recovery",
414                0,
415                None,
416                None,
417                None,
418            );
419
420            let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
421
422            self.recovery(paused, RecoveryReason::Bootstrap)
423                .instrument(span)
424                .await;
425        }
426
427        Box::pin(self.run_inner(shutdown_rx)).await
428    }
429}
430
431impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
432    fn enable_per_database_isolation(&self) -> bool {
433        self.system_enable_per_database_isolation && {
434            if let Err(e) =
435                risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()
436            {
437                warn!(error = %e.as_report(), "DatabaseFailureIsolation disabled by license");
438                false
439            } else {
440                true
441            }
442        }
443    }
444
445    pub(super) async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
446        let (local_notification_tx, mut local_notification_rx) =
447            tokio::sync::mpsc::unbounded_channel();
448        self.env
449            .notification_manager()
450            .insert_local_sender(local_notification_tx);
451
452        // Start the event loop.
453        loop {
454            tokio::select! {
455                biased;
456
457                // Shutdown
458                _ = &mut shutdown_rx => {
459                    tracing::info!("Barrier manager is stopped");
460                    break;
461                }
462
463                request = self.request_rx.recv() => {
464                    if let Some(request) = request {
465                        match request {
466                            BarrierManagerRequest::GetBackfillProgress(result_tx) => {
467                                let progress = self.checkpoint_control.gen_backfill_progress();
468                                if result_tx.send(Ok(progress)).is_err() {
469                                    error!("failed to send get ddl progress");
470                                }
471                            }
472                            BarrierManagerRequest::GetFragmentBackfillProgress(result_tx) => {
473                                let progress =
474                                    self.checkpoint_control.gen_fragment_backfill_progress();
475                                if result_tx.send(Ok(progress)).is_err() {
476                                    error!("failed to send get fragment backfill progress");
477                                }
478                            }
479                            BarrierManagerRequest::GetCdcProgress(result_tx) => {
480                                let progress = self.checkpoint_control.gen_cdc_progress();
481                                if result_tx.send(Ok(progress)).is_err() {
482                                    error!("failed to send get ddl progress");
483                                }
484                            }
485                            // Handle adhoc recovery triggered by user.
486                            BarrierManagerRequest::AdhocRecovery(sender) => {
487                                self.adhoc_recovery().await;
488                                if sender.send(()).is_err() {
489                                    warn!("failed to notify finish of adhoc recovery");
490                                }
491                            }
492                            BarrierManagerRequest::UpdateDatabaseBarrier( UpdateDatabaseBarrierRequest {
493                                database_id,
494                                barrier_interval_ms,
495                                checkpoint_frequency,
496                                sender,
497                            }) => {
498                                self.periodic_barriers
499                                    .update_database_barrier(
500                                        database_id,
501                                        barrier_interval_ms,
502                                        checkpoint_frequency,
503                                    );
504                                if sender.send(()).is_err() {
505                                    warn!("failed to notify finish of update database barrier");
506                                }
507                            }
508                            BarrierManagerRequest::MayHaveSnapshotBackfillingJob(tx) => {
509                                if tx.send(self.checkpoint_control.may_have_snapshot_backfilling_jobs()).is_err() {
510                                    warn!("failed to may have snapshot backfill job");
511                                }
512                            }
513                        }
514                    } else {
515                        tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
516                        return;
517                    }
518                }
519
520                changed_worker = self.active_streaming_nodes.changed() => {
521                    #[cfg(debug_assertions)]
522                    {
523                        self.active_streaming_nodes.validate_change().await;
524                    }
525
526                    info!(?changed_worker, "worker changed");
527
528                    match changed_worker {
529                        ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) => {
530                            self.partial_graph_manager.add_worker(node, &*self.context).await;
531                        }
532                        ActiveStreamingWorkerChange::Remove(node) => {
533                            self.partial_graph_manager.remove_worker(node);
534                        }
535                    }
536                }
537
538                notification = local_notification_rx.recv() => {
539                    let notification = notification.unwrap();
540                    if let LocalNotification::SystemParamsChange(p) = notification {
541                        {
542                            self.periodic_barriers.set_sys_barrier_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
543                            self.periodic_barriers
544                                .set_sys_checkpoint_frequency(p.checkpoint_frequency());
545                            self.system_enable_per_database_isolation = p.per_database_isolation();
546                            self.adaptive_parallelism_strategy = p.adaptive_parallelism_strategy();
547                        }
548                    }
549                }
550                complete_result = self
551                    .completing_task
552                    .next_completed_barrier(
553                        &mut self.periodic_barriers,
554                        &mut self.checkpoint_control,
555                        &mut self.partial_graph_manager,
556                        &self.context,
557                        &self.env,
558                ) => {
559                    match complete_result {
560                        Ok(output) => {
561                            self.checkpoint_control.ack_completed(output);
562                        }
563                        Err(e) => {
564                            self.failure_recovery(e).await;
565                        }
566                    }
567                },
568                event = self.checkpoint_control.next_event() => {
569                    let result: MetaResult<()> = try {
570                        match event {
571                            CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
572                                let database_id = entering_initializing.database_id();
573                                let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
574                                    resp.root_err.as_ref().map(|root_err| {
575                                        (*worker_id, ScoredError {
576                                            error: Status::internal(&root_err.err_msg),
577                                            score: Score(root_err.score)
578                                        })
579                                    })
580                                }));
581                                Self::report_collect_failure(&self.env, &error);
582                                self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
583                                let result: MetaResult<_> = try {
584                                    let runtime_info = self.context.reload_database_runtime_info(database_id).await.inspect_err(|err| {
585                                        warn!(%database_id, err = %err.as_report(), "reload runtime info failed");
586                                    })?;
587                                    let rendered_info = render_runtime_info(
588                                        self.env.actor_id_generator(),
589                                        &self.active_streaming_nodes,
590                                        self.adaptive_parallelism_strategy,
591                                        &runtime_info.recovery_context,
592                                        database_id,
593                                    )
594                                    .inspect_err(|err: &MetaError| {
595                                        warn!(%database_id, err = %err.as_report(), "render runtime info failed");
596                                    })?;
597                                    if let Some(rendered_info) = rendered_info {
598                                        BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
599                                            database_id,
600                                            &rendered_info.job_infos,
601                                            &self.active_streaming_nodes,
602                                            &rendered_info.stream_actors,
603                                            &runtime_info.state_table_committed_epochs,
604                                        )
605                                        .inspect_err(|err| {
606                                            warn!(%database_id, err = ?err.as_report(), "database runtime info failed validation");
607                                        })?;
608                                        Some((runtime_info, rendered_info))
609                                    } else {
610                                        None
611                                    }
612                                };
613                                match result {
614                                    Ok(Some((runtime_info, rendered_info))) => {
615                                        entering_initializing.enter(
616                                            runtime_info,
617                                            rendered_info,
618                                            &mut self.partial_graph_manager,
619                                        );
620                                    }
621                                    Ok(None) => {
622                                        info!(%database_id, "database removed after reloading empty runtime info");
623                                        // mark ready to unblock subsequent request
624                                        self.context.mark_ready(MarkReadyOptions::Database(database_id));
625                                        entering_initializing.remove();
626                                    }
627                                    Err(e) => {
628                                        entering_initializing.fail_reload_runtime_info(e);
629                                    }
630                                }
631                            }
632                            CheckpointControlEvent::EnteringRunning(entering_running) => {
633                                self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
634                                entering_running.enter();
635                            }
636                        }
637                    };
638                    if let Err(e) = result {
639                        self.failure_recovery(e).await;
640                    }
641                }
642                event = self.partial_graph_manager.next_event(&self.context) => {
643                    let result: MetaResult<()> = try {
644                        match event {
645                            PartialGraphManagerEvent::Worker(_worker_id, WorkerEvent::WorkerConnected) => {
646                                // no handling on new worker connected event yet
647                            }
648                            PartialGraphManagerEvent::Worker(worker_id, WorkerEvent::WorkerError { err, affected_partial_graphs }) => {
649                                let failed_databases = self
650                                    .checkpoint_control
651                                    .databases_failed_at_worker_err(worker_id)
652                                    .chain(
653                                        affected_partial_graphs
654                                        .into_iter()
655                                        .map(|partial_graph_id| {
656                                            let (database_id, _) = from_partial_graph_id(partial_graph_id);
657                                            database_id
658                                        })
659                                    )
660                                    .collect::<HashSet<_>>();
661                                if !failed_databases.is_empty() {
662                                    if !self.enable_recovery {
663                                        panic!("control stream to worker {} failed but recovery not enabled: {}", worker_id, err.as_report());
664                                    }
665                                    if !self.enable_per_database_isolation() {
666                                        Err(err.clone())?;
667                                    }
668                                    Self::report_collect_failure(&self.env, &err);
669                                    for database_id in failed_databases {
670                                        if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
671                                            warn!(%worker_id, %database_id, "database entering recovery on node failure");
672                                            self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
673                                            self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
674                                            // TODO: add log on blocking time
675                                            let output = self.completing_task.wait_completing_task().await?;
676                                            entering_recovery.enter(output, &mut self.partial_graph_manager);
677                                        }
678                                    }
679                                }  else {
680                                    warn!(%worker_id, "no barrier to collect from worker, ignore err");
681                                }
682                                continue;
683                            }
684                            PartialGraphManagerEvent::PartialGraph(partial_graph_id, event) => {
685                                let (database_id, _creating_job_id) = from_partial_graph_id(partial_graph_id);
686                                match event {
687                                    PartialGraphEvent::BarrierCollected(collected_barrier) => {
688                                        self.checkpoint_control.barrier_collected(partial_graph_id, collected_barrier, &mut self.periodic_barriers)?;
689                                    }
690                                    PartialGraphEvent::Error(worker_id) => {
691                                        if !self.enable_recovery {
692                                            panic!("database {database_id} failure reported from {worker_id} but recovery not enabled")
693                                        }
694                                        if !self.enable_per_database_isolation() {
695                                                Err(anyhow!("database {database_id} report failure from {worker_id}"))?;
696                                            }
697                                        if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
698                                            warn!(%database_id, "database entering recovery");
699                                            self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
700                                            // TODO: add log on blocking time
701                                            let output = self.completing_task.wait_completing_task().await?;
702                                            entering_recovery.enter(output, &mut self.partial_graph_manager);
703                                        }
704                                    }
705                                    PartialGraphEvent::Reset(reset_resps) => {
706                                        self.checkpoint_control.on_partial_graph_reset(partial_graph_id, reset_resps);
707                                    }
708                                    PartialGraphEvent::Initialized => {
709                                        self.checkpoint_control.on_partial_graph_initialized(partial_graph_id);
710                                    }
711                                }
712                            }
713                        };
714                    };
715                    if let Err(e) = result {
716                        self.failure_recovery(e).await;
717                    }
718                }
719                new_barrier = self.periodic_barriers.next_barrier(&*self.context) => {
720                    let database_id = new_barrier.database_id;
721                    let new_barrier = if matches!(
722                        new_barrier.command,
723                        Some((Command::RescheduleIntent { .. }, _))
724                    ) {
725                        let env = self.env.clone();
726                        let worker_nodes = self
727                            .active_streaming_nodes
728                            .current()
729                            .iter()
730                            .map(|(worker_id, worker)| (*worker_id, worker.clone()))
731                            .collect();
732                        let adaptive_parallelism_strategy = self.adaptive_parallelism_strategy;
733                        let database_info = self.checkpoint_control.database_info(database_id);
734                        match resolve_reschedule_intent(
735                            env,
736                            worker_nodes,
737                            adaptive_parallelism_strategy,
738                            database_info,
739                            new_barrier,
740                        ) {
741                            Ok(Some(new_barrier)) => new_barrier,
742                            Ok(None) => continue,
743                            Err(err) => {
744                                self.failure_recovery(err).await;
745                                continue;
746                            }
747                        }
748                    } else {
749                        new_barrier
750                    };
751                    if let Err(e) = self.checkpoint_control.handle_new_barrier(
752                        new_barrier,
753                        &mut self.partial_graph_manager,
754                        self.adaptive_parallelism_strategy,
755                        self.active_streaming_nodes.current()
756                    ) {
757                        if !self.enable_recovery {
758                            panic!(
759                                "failed to inject barrier to some databases but recovery not enabled: {:?}", (
760                                    database_id,
761                                    e.as_report()
762                                )
763                            );
764                        }
765                        let result: MetaResult<_> = try {
766                            if !self.enable_per_database_isolation() {
767                                let err = anyhow!("failed to inject barrier to databases: {:?}", (database_id, e.as_report()));
768                                Err(err)?;
769                            } else if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
770                                warn!(%database_id, e = %e.as_report(),"database entering recovery on inject failure");
771                                self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(e).context("inject barrier failure").into()));
772                                // TODO: add log on blocking time
773                                let output = self.completing_task.wait_completing_task().await?;
774                                entering_recovery.enter(output, &mut self.partial_graph_manager);
775                            }
776                        };
777                        if let Err(e) = result {
778                            self.failure_recovery(e).await;
779                        }
780                    }
781                }
782            }
783        }
784    }
785}
786
787impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
788    /// We need to make sure there are no changes when doing recovery
789    pub async fn clear_on_err(&mut self, err: &MetaError) {
790        // join spawned completing command to finish no matter it succeeds or not.
791        match replace(&mut self.completing_task, CompletingTask::None) {
792            CompletingTask::None | CompletingTask::Err(_) => {}
793            CompletingTask::Completing {
794                epochs_to_ack,
795                join_handle,
796                ..
797            } => {
798                info!("waiting for completing command to finish in recovery");
799                match join_handle.await {
800                    Err(e) => {
801                        warn!(err = %e.as_report(), "failed to join completing task");
802                    }
803                    Ok(Err(e)) => {
804                        warn!(
805                            err = %e.as_report(),
806                            "failed to complete barrier during clear"
807                        );
808                    }
809                    Ok(Ok(hummock_version_stats)) => {
810                        self.checkpoint_control
811                            .ack_completed(BarrierCompleteOutput {
812                                epochs_to_ack,
813                                hummock_version_stats,
814                            });
815                    }
816                }
817            }
818        };
819        self.partial_graph_manager.notify_all_err(err);
820    }
821}
822
823impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
824    /// Set barrier manager status.
825    async fn failure_recovery(&mut self, err: MetaError) {
826        self.clear_on_err(&err).await;
827
828        if self.enable_recovery {
829            let span = tracing::info_span!(
830                "failure_recovery",
831                error = %err.as_report(),
832            );
833
834            crate::telemetry::report_event(
835                risingwave_pb::telemetry::TelemetryEventStage::Recovery,
836                "failure_recovery",
837                0,
838                None,
839                None,
840                None,
841            );
842
843            let reason = RecoveryReason::Failover(err);
844
845            // No need to clean dirty tables for barrier recovery,
846            // The foreground stream job should cleanup their own tables.
847            self.recovery(false, reason).instrument(span).await;
848        } else {
849            panic!(
850                "a streaming error occurred while recovery is disabled, aborting: {:?}",
851                err.as_report()
852            );
853        }
854    }
855
856    async fn adhoc_recovery(&mut self) {
857        let err = MetaErrorInner::AdhocRecovery.into();
858        self.clear_on_err(&err).await;
859
860        let span = tracing::info_span!(
861            "adhoc_recovery",
862            error = %err.as_report(),
863        );
864
865        crate::telemetry::report_event(
866            risingwave_pb::telemetry::TelemetryEventStage::Recovery,
867            "adhoc_recovery",
868            0,
869            None,
870            None,
871            None,
872        );
873
874        // No need to clean dirty tables for barrier recovery,
875        // The foreground stream job should cleanup their own tables.
876        self.recovery(false, RecoveryReason::Adhoc)
877            .instrument(span)
878            .await;
879    }
880}
881
882impl<C> GlobalBarrierWorker<C> {
883    /// Send barrier-complete-rpc and wait for responses from all CNs
884    pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
885        // Record failure in event log.
886        use risingwave_pb::meta::event_log;
887        let event = event_log::EventCollectBarrierFail {
888            error: error.to_report_string(),
889        };
890        env.event_log_manager_ref()
891            .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
892    }
893}
894
895mod retry_strategy {
896    use std::time::Duration;
897
898    use tokio_retry::strategy::{ExponentialBackoff, jitter};
899
900    // Retry base interval in milliseconds.
901    const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
902    // Retry max interval.
903    const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
904
905    // MrCroxx: Use concrete type here to prevent unsolved compiler issue.
906    // Feel free to replace the concrete type with TAIT after fixed.
907
908    // mod retry_backoff_future {
909    //     use std::future::Future;
910    //     use std::time::Duration;
911    //
912    //     use tokio::time::sleep;
913    //
914    //     pub(crate) type RetryBackoffFuture = impl Future<Output = ()> + Unpin + Send + 'static;
915    //
916    //     #[define_opaque(RetryBackoffFuture)]
917    //     pub(super) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
918    //         Box::pin(sleep(duration))
919    //     }
920    // }
921    // pub(crate) use retry_backoff_future::*;
922
923    pub(crate) type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
924
925    pub(crate) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
926        Box::pin(tokio::time::sleep(duration))
927    }
928
929    pub(crate) type RetryBackoffStrategy =
930        impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
931
932    /// Initialize a retry strategy for operation in recovery.
933    #[inline(always)]
934    pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> + Send + 'static {
935        ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
936            .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
937            .map(jitter)
938    }
939
940    #[define_opaque(RetryBackoffStrategy)]
941    pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
942        get_retry_strategy().map(get_retry_backoff_future)
943    }
944}
945
946pub(crate) use retry_strategy::*;
947use risingwave_common::error::tonic::extra::{Score, ScoredError};
948use risingwave_pb::meta::event_log::{Event, EventRecovery};
949
950use crate::barrier::partial_graph::{
951    PartialGraphEvent, PartialGraphManager, PartialGraphManagerEvent, WorkerEvent,
952};
953
954impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
955    /// Recovery the whole cluster from the latest epoch.
956    ///
957    /// If `paused_reason` is `Some`, all data sources (including connectors and DMLs) will be
958    /// immediately paused after recovery, until the user manually resume them either by restarting
959    /// the cluster or `risectl` command. Used for debugging purpose.
960    ///
961    /// Returns the new state of the barrier manager after recovery.
962    pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
963        // Clear all control streams to release resources (connections to compute nodes) first.
964        self.partial_graph_manager.clear_worker();
965
966        let reason_str = match &recovery_reason {
967            RecoveryReason::Bootstrap => "bootstrap".to_owned(),
968            RecoveryReason::Failover(err) => {
969                format!("failed over: {}", err.as_report())
970            }
971            RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
972        };
973        self.context.abort_and_mark_blocked(None, recovery_reason);
974
975        self.recovery_inner(is_paused, reason_str).await;
976        self.context.mark_ready(MarkReadyOptions::Global {
977            blocked_databases: self.checkpoint_control.recovering_databases().collect(),
978        });
979    }
980
981    #[await_tree::instrument("recovery({recovery_reason})")]
982    async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
983        let event_log_manager_ref = self.env.event_log_manager_ref();
984
985        tracing::info!("recovery start!");
986        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
987            EventRecovery::global_recovery_start(recovery_reason.clone()),
988        )]);
989
990        let retry_strategy = get_retry_strategy();
991
992        // We take retry into consideration because this is the latency user sees for a cluster to
993        // get recovered.
994        let recovery_timer = GLOBAL_META_METRICS
995            .recovery_latency
996            .with_label_values(&["global"])
997            .start_timer();
998
999        let enable_per_database_isolation = self.enable_per_database_isolation();
1000
1001        let recovery_future = tokio_retry::Retry::spawn(retry_strategy, || async {
1002            self.env.stream_client_pool().invalidate_all();
1003            // We need to notify_creating_job_failed in every recovery retry, because in outer create_streaming_job handler,
1004            // it holds the reschedule_read_lock and wait for creating job to finish, and caused the following scale_actor fail
1005            // to acquire the reschedule_write_lock, and then keep recovering, and then deadlock.
1006            // TODO: refactor and fix this hacky implementation.
1007            self.context
1008                .notify_creating_job_failed(None, recovery_reason.clone())
1009                .await;
1010
1011            let runtime_info_snapshot = self
1012                .context
1013                .reload_runtime_info()
1014                .await?;
1015            let BarrierWorkerRuntimeInfoSnapshot {
1016                active_streaming_nodes,
1017                recovery_context,
1018                mut state_table_committed_epochs,
1019                mut state_table_log_epochs,
1020                mut mv_depended_subscriptions,
1021                mut background_jobs,
1022                hummock_version_stats,
1023                database_infos,
1024                mut cdc_table_snapshot_splits,
1025            } = runtime_info_snapshot;
1026
1027            let mut partial_graph_manager = PartialGraphManager::recover(
1028                    self.env.clone(),
1029                    active_streaming_nodes.current(),
1030                    self.context.clone(),
1031                )
1032                .await;
1033            {
1034                let mut empty_databases = HashSet::new();
1035                let mut collected_databases = HashMap::new();
1036                let mut collecting_databases = HashMap::new();
1037                let mut failed_databases = HashMap::new();
1038                for &database_id in recovery_context.fragment_context.database_map.keys() {
1039                    let mut recoverer = partial_graph_manager.start_recover();
1040                    let result: MetaResult<_> = try {
1041                        let Some(rendered_info) = render_runtime_info(
1042                            self.env.actor_id_generator(),
1043                            &active_streaming_nodes,
1044                            self.adaptive_parallelism_strategy,
1045                            &recovery_context,
1046                            database_id,
1047                        )
1048                            .inspect_err(|err: &MetaError| {
1049                                warn!(%database_id, err = %err.as_report(), "render runtime info failed");
1050                            })? else {
1051                            empty_databases.insert(database_id);
1052                            continue;
1053                        };
1054                        BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
1055                            database_id,
1056                            &rendered_info.job_infos,
1057                            &active_streaming_nodes,
1058                            &rendered_info.stream_actors,
1059                            &state_table_committed_epochs,
1060                        )
1061                        .inspect_err(|err| {
1062                            warn!(%database_id, err = %err.as_report(), "rendered runtime info failed validation");
1063                        })?;
1064                        let RenderedDatabaseRuntimeInfo {
1065                            job_infos,
1066                            stream_actors,
1067                            mut source_splits,
1068                        } = rendered_info;
1069                        recoverer.inject_database_initial_barrier(
1070                            database_id,
1071                            job_infos,
1072                            &recovery_context.job_extra_info,
1073                            &mut state_table_committed_epochs,
1074                            &mut state_table_log_epochs,
1075                            &recovery_context.fragment_relations,
1076                            &stream_actors,
1077                            &mut source_splits,
1078                            &mut background_jobs,
1079                            &mut mv_depended_subscriptions,
1080                            is_paused,
1081                            &hummock_version_stats,
1082                            &mut cdc_table_snapshot_splits,
1083                        )?
1084                    };
1085                    let collector = match result {
1086                        Ok(database) => {
1087                            DatabaseInitialBarrierCollector {
1088                                database_id,
1089                                initializing_partial_graphs: recoverer.all_initializing(),
1090                                database,
1091                            }
1092                        }
1093                        Err(e) => {
1094                            warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
1095                            assert!(failed_databases.insert(database_id, recoverer.failed()).is_none(), "non-duplicate");
1096                            continue;
1097                        }
1098                    };
1099                    if !collector.is_collected() {
1100                        assert!(collecting_databases.insert(database_id, collector).is_none());
1101                    } else {
1102                        warn!(%database_id, "database has no node to inject initial barrier");
1103                        assert!(collected_databases.insert(database_id, collector.finish()).is_none());
1104                    }
1105                }
1106                if !empty_databases.is_empty() {
1107                    info!(?empty_databases, "empty database in global recovery");
1108                }
1109                while !collecting_databases.is_empty() {
1110                    match partial_graph_manager.next_event(&self.context).await {
1111                        PartialGraphManagerEvent::Worker(_, WorkerEvent::WorkerConnected) => {
1112                            // not handle WorkerConnected yet
1113                        }
1114                        PartialGraphManagerEvent::Worker(worker_id, WorkerEvent::WorkerError { err, affected_partial_graphs }) => {
1115                            let affected_databases: HashSet<_> = affected_partial_graphs.into_iter().map(|partial_graph_id| {
1116                                let (database_id, _) = from_partial_graph_id(partial_graph_id);
1117                                database_id
1118                            }).collect();
1119                            warn!(%worker_id, err = %err.as_report(), "worker node failure during recovery");
1120                            for (failed_database_id, collector) in collecting_databases.extract_if(|database_id, collector| {
1121                                !collector.is_valid_after_worker_err(worker_id) || affected_databases.contains(database_id)
1122                            }) {
1123                                warn!(%failed_database_id, %worker_id, "database failed to recovery in global recovery due to worker node err");
1124                                let resetting_partial_graphs: HashSet<_> = collector.all_partial_graphs().collect();
1125                                partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1126                                assert!(failed_databases.insert(failed_database_id, resetting_partial_graphs).is_none());
1127                            }
1128                        }
1129                        PartialGraphManagerEvent::PartialGraph(partial_graph_id, event) => {
1130                            match event {
1131                                PartialGraphEvent::BarrierCollected(_) => {
1132                                    unreachable!("no barrier collected event on initializing")
1133                                }
1134                                PartialGraphEvent::Reset(_) => {
1135                                    unreachable!("no partial graph reset on initializing")
1136                                }
1137                                PartialGraphEvent::Error(worker_id) => {
1138                                    let (database_id, _) = from_partial_graph_id(partial_graph_id);
1139                                    if let Some(collector) = collecting_databases.remove(&database_id) {
1140                                        warn!(%database_id, %worker_id, "database reset during global recovery");
1141                                        let resetting_partial_graphs: HashSet<_> = collector.all_partial_graphs().collect();
1142                                        partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1143                                        assert!(failed_databases.insert(database_id, resetting_partial_graphs).is_none());
1144                                    } else if let Some(database) = collected_databases.remove(&database_id) {
1145                                        warn!(%database_id, %worker_id, "database initialized but later reset during global recovery");
1146                                        let resetting_partial_graphs: HashSet<_> = database_partial_graphs(database_id, database.creating_streaming_job_controls.keys().copied()).collect();
1147                                        partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1148                                        assert!(failed_databases.insert(database_id, resetting_partial_graphs).is_none());
1149                                    } else {
1150                                        assert!(failed_databases.contains_key(&database_id));
1151                                    }
1152                                }
1153                                PartialGraphEvent::Initialized => {
1154                                    let (database_id, _) = from_partial_graph_id(partial_graph_id);
1155                                    if failed_databases.contains_key(&database_id) {
1156                                        assert!(!collecting_databases.contains_key(&database_id));
1157                                        // ignore the lately initialized partial graph of failed database
1158                                        continue;
1159                                    }
1160                                    let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
1161                                        unreachable!("should exist")
1162                                    };
1163                                    let collector = entry.get_mut();
1164                                    collector.partial_graph_initialized(partial_graph_id);
1165                                    if collector.is_collected() {
1166                                        let collector = entry.remove();
1167                                        assert!(collected_databases.insert(database_id, collector.finish()).is_none());
1168                                    }
1169                                }
1170                            }
1171                        }
1172                    }
1173                }
1174                debug!("collected initial barrier");
1175                if !background_jobs.is_empty() {
1176                    warn!(job_ids = ?background_jobs.iter().collect_vec(), "unused recovered background mview in recovery");
1177                }
1178                if !mv_depended_subscriptions.is_empty() {
1179                    warn!(?mv_depended_subscriptions, "unused subscription infos in recovery");
1180                }
1181                if !state_table_committed_epochs.is_empty() {
1182                    warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
1183                }
1184                if !enable_per_database_isolation && !failed_databases.is_empty() {
1185                    return Err(anyhow!(
1186                        "global recovery failed due to failure of databases {:?}",
1187                        failed_databases.keys().collect_vec()).into()
1188                    );
1189                }
1190                let checkpoint_control = CheckpointControl::recover(
1191                    collected_databases,
1192                    failed_databases,
1193                    hummock_version_stats,
1194                    self.env.clone(),
1195                );
1196
1197                let reader = self.env.system_params_reader().await;
1198                let checkpoint_frequency = reader.checkpoint_frequency();
1199                let barrier_interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
1200                let periodic_barriers = PeriodicBarriers::new(
1201                    barrier_interval,
1202                    checkpoint_frequency,
1203                    database_infos,
1204                );
1205
1206                Ok((
1207                    active_streaming_nodes,
1208                    partial_graph_manager,
1209                    checkpoint_control,
1210                    periodic_barriers,
1211                ))
1212            }
1213        }.inspect_err(|err: &MetaError| {
1214            tracing::error!(error = %err.as_report(), "recovery failed");
1215            event_log_manager_ref.add_event_logs(vec![Event::Recovery(
1216                EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
1217            )]);
1218            GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
1219        }))
1220        .instrument(tracing::info_span!("recovery_attempt"));
1221
1222        let mut recover_txs = vec![];
1223        let mut update_barrier_requests = vec![];
1224        pin_mut!(recovery_future);
1225        let mut request_rx_closed = false;
1226        let new_state = loop {
1227            select! {
1228                biased;
1229                new_state = &mut recovery_future => {
1230                    break new_state.expect("Retry until recovery success.");
1231                }
1232                request = pin!(self.request_rx.recv()), if !request_rx_closed => {
1233                    let Some(request) = request else {
1234                        warn!("request rx channel closed during recovery");
1235                        request_rx_closed = true;
1236                        continue;
1237                    };
1238                    match request {
1239                        BarrierManagerRequest::GetBackfillProgress(tx) => {
1240                            let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1241                        }
1242                        BarrierManagerRequest::GetFragmentBackfillProgress(tx) => {
1243                            let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1244                        }
1245                        BarrierManagerRequest::GetCdcProgress(tx) => {
1246                            let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1247                        }
1248                        BarrierManagerRequest::AdhocRecovery(tx) => {
1249                            recover_txs.push(tx);
1250                        }
1251                        BarrierManagerRequest::UpdateDatabaseBarrier(request) => {
1252                            update_barrier_requests.push(request);
1253                        }
1254                        BarrierManagerRequest::MayHaveSnapshotBackfillingJob(tx) => {
1255                            // may recover snapshot backfill jobs
1256                            let _ = tx.send(true);
1257                        }
1258                    }
1259                }
1260            }
1261        };
1262
1263        let duration = recovery_timer.stop_and_record();
1264
1265        (
1266            self.active_streaming_nodes,
1267            self.partial_graph_manager,
1268            self.checkpoint_control,
1269            self.periodic_barriers,
1270        ) = new_state;
1271
1272        tracing::info!("recovery success");
1273
1274        for UpdateDatabaseBarrierRequest {
1275            database_id,
1276            barrier_interval_ms,
1277            checkpoint_frequency,
1278            sender,
1279        } in update_barrier_requests
1280        {
1281            self.periodic_barriers.update_database_barrier(
1282                database_id,
1283                barrier_interval_ms,
1284                checkpoint_frequency,
1285            );
1286            let _ = sender.send(());
1287        }
1288
1289        for tx in recover_txs {
1290            let _ = tx.send(());
1291        }
1292
1293        let recovering_databases = self
1294            .checkpoint_control
1295            .recovering_databases()
1296            .map(|database| database.as_raw_id())
1297            .collect_vec();
1298        let running_databases = self
1299            .checkpoint_control
1300            .running_databases()
1301            .map(|database| database.as_raw_id())
1302            .collect_vec();
1303
1304        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
1305            EventRecovery::global_recovery_success(
1306                recovery_reason.clone(),
1307                duration as f32,
1308                running_databases,
1309                recovering_databases,
1310            ),
1311        )]);
1312
1313        self.env
1314            .notification_manager()
1315            .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
1316        self.env
1317            .notification_manager()
1318            .notify_compute_without_version(Operation::Update, Info::Recovery(Recovery {}));
1319    }
1320}