risingwave_meta/barrier/
worker.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::pin::pin;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use arc_swap::ArcSwap;
24use futures::{TryFutureExt, pin_mut};
25use itertools::Itertools;
26use risingwave_common::catalog::DatabaseId;
27use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
28use risingwave_common::system_param::reader::SystemParamsRead;
29use risingwave_meta_model::WorkerId;
30use risingwave_pb::common::WorkerNode;
31use risingwave_pb::meta::Recovery;
32use risingwave_pb::meta::subscribe_response::{Info, Operation};
33use thiserror_ext::AsReport;
34use tokio::select;
35use tokio::sync::mpsc;
36use tokio::sync::oneshot::{Receiver, Sender};
37use tokio::task::JoinHandle;
38use tonic::Status;
39use tracing::{Instrument, debug, error, info, warn};
40
41use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
42use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
43use crate::barrier::context::recovery::{RenderedDatabaseRuntimeInfo, render_runtime_info};
44use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
45use crate::barrier::info::InflightDatabaseInfo;
46use crate::barrier::rpc::{
47    DatabaseInitialBarrierCollector, database_partial_graphs, from_partial_graph_id,
48    merge_node_rpc_errors,
49};
50use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
51use crate::barrier::{
52    BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command,
53    RecoveryReason, RescheduleContext, UpdateDatabaseBarrierRequest, schedule,
54};
55use crate::controller::scale::{materialize_actor_assignments, preview_actor_assignments};
56use crate::error::MetaErrorInner;
57use crate::hummock::HummockManagerRef;
58use crate::manager::sink_coordination::SinkCoordinatorManager;
59use crate::manager::{
60    ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
61    MetadataManager,
62};
63use crate::rpc::metrics::GLOBAL_META_METRICS;
64use crate::stream::{
65    GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef, build_reschedule_commands,
66    rendered_layout_matches_current,
67};
68use crate::{MetaError, MetaResult};
69
70/// [`crate::barrier::worker::GlobalBarrierWorker`] sends barriers to all registered compute nodes and
71/// collect them, with monotonic increasing epoch numbers. On compute nodes, `LocalBarrierManager`
72/// in `risingwave_stream` crate will serve these requests and dispatch them to source actors.
73///
74/// Configuration change in our system is achieved by the mutation in the barrier. Thus,
75/// [`crate::barrier::worker::GlobalBarrierWorker`] provides a set of interfaces like a state machine,
76/// accepting [`crate::barrier::command::Command`] that carries info to build `Mutation`. To keep the consistency between
77/// barrier manager and meta store, some actions like "drop materialized view" or "create mv on mv"
78/// must be done in barrier manager transactional using [`crate::barrier::command::Command`].
79pub(super) struct GlobalBarrierWorker<C> {
80    /// Enable recovery or not when failover.
81    enable_recovery: bool,
82
83    /// The queue of scheduled barriers.
84    periodic_barriers: PeriodicBarriers,
85
86    /// Whether per database failure isolation is enabled in system parameters.
87    system_enable_per_database_isolation: bool,
88
89    pub(super) context: Arc<C>,
90
91    env: MetaSrvEnv,
92
93    checkpoint_control: CheckpointControl,
94
95    /// Command that has been collected but is still completing.
96    /// The join handle of the completing future is stored.
97    completing_task: CompletingTask,
98
99    request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
100
101    active_streaming_nodes: ActiveStreamingWorkerNodes,
102
103    partial_graph_manager: PartialGraphManager,
104}
105
106#[cfg(test)]
107mod tests {
108    use std::collections::HashMap;
109
110    use tokio::sync::oneshot;
111
112    use super::*;
113    use crate::barrier::RescheduleContext;
114    use crate::barrier::notifier::Notifier;
115
116    #[tokio::test]
117    async fn test_reschedule_intent_without_workers_notifies_start_failed() {
118        let env = MetaSrvEnv::for_test().await;
119        let database_id = DatabaseId::new(1);
120        let database_info =
121            InflightDatabaseInfo::empty(database_id, env.shared_actor_infos().clone());
122        let (started_tx, started_rx) = oneshot::channel();
123        let (_collected_tx, _collected_rx) = oneshot::channel();
124
125        let notifier = Notifier {
126            started: Some(started_tx),
127            collected: Some(_collected_tx),
128        };
129
130        let new_barrier = schedule::NewBarrier {
131            database_id,
132            command: Some((
133                Command::RescheduleIntent {
134                    context: RescheduleContext::empty(),
135                    reschedule_plan: None,
136                },
137                vec![notifier],
138            )),
139            span: tracing::Span::none(),
140            checkpoint: false,
141        };
142
143        let result =
144            resolve_reschedule_intent(env, HashMap::new(), Some(&database_info), new_barrier);
145
146        assert!(matches!(result, Ok(None)));
147        let started = started_rx.await.expect("started notifier dropped");
148        assert!(started.is_err());
149    }
150}
151
152impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
153    pub(super) async fn new_inner(
154        env: MetaSrvEnv,
155        request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
156        context: Arc<C>,
157    ) -> Self {
158        let enable_recovery = env.opts.enable_recovery;
159
160        let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized();
161
162        let partial_graph_manager = PartialGraphManager::uninitialized(env.clone());
163
164        let reader = env.system_params_reader().await;
165        let system_enable_per_database_isolation = reader.per_database_isolation();
166        // Load config will be performed in bootstrap phase.
167        let periodic_barriers = PeriodicBarriers::default();
168
169        let checkpoint_control = CheckpointControl::new(env.clone());
170        Self {
171            enable_recovery,
172            periodic_barriers,
173            system_enable_per_database_isolation,
174            context,
175            env,
176            checkpoint_control,
177            completing_task: CompletingTask::None,
178            request_rx,
179            active_streaming_nodes,
180            partial_graph_manager,
181        }
182    }
183}
184
185fn resolve_reschedule_intent(
186    env: MetaSrvEnv,
187    worker_nodes: HashMap<WorkerId, WorkerNode>,
188    database_info: Option<&InflightDatabaseInfo>,
189    mut new_barrier: schedule::NewBarrier,
190) -> MetaResult<Option<schedule::NewBarrier>> {
191    let Some((command, notifiers)) = new_barrier.command.take() else {
192        return Ok(Some(new_barrier));
193    };
194
195    match command {
196        Command::RescheduleIntent {
197            context,
198            reschedule_plan,
199        } => {
200            if let Some(reschedule_plan) = reschedule_plan {
201                new_barrier.command = Some((
202                    Command::RescheduleIntent {
203                        context,
204                        reschedule_plan: Some(reschedule_plan),
205                    },
206                    notifiers,
207                ));
208                return Ok(Some(new_barrier));
209            }
210            let span = tracing::info_span!(
211                "resolve_reschedule_intent",
212                database_id = %new_barrier.database_id
213            );
214            let reschedule_plan = {
215                let _guard = span.enter();
216                build_reschedule_from_context(
217                    &env,
218                    worker_nodes,
219                    new_barrier.database_id,
220                    context,
221                    database_info.ok_or_else(|| {
222                        anyhow!(
223                            "database {} not found when resolving reschedule intent",
224                            new_barrier.database_id
225                        )
226                    })?,
227                )
228            };
229            match reschedule_plan {
230                Ok(Some(reschedule_plan)) => {
231                    new_barrier.command = Some((
232                        Command::RescheduleIntent {
233                            context: RescheduleContext::empty(),
234                            reschedule_plan: Some(reschedule_plan),
235                        },
236                        notifiers,
237                    ));
238                    Ok(Some(new_barrier))
239                }
240                Ok(None) => {
241                    // No-op intent: notify to unblock callers even though no barrier is injected.
242                    for mut notifier in notifiers {
243                        notifier.notify_started();
244                        notifier.notify_collected();
245                    }
246                    Ok(None)
247                }
248                Err(err) => {
249                    for notifier in notifiers {
250                        notifier.notify_start_failed(err.clone());
251                    }
252                    Ok(None)
253                }
254            }
255        }
256        _ => {
257            new_barrier.command = Some((command, notifiers));
258            Ok(Some(new_barrier))
259        }
260    }
261}
262
263fn build_reschedule_from_context(
264    env: &MetaSrvEnv,
265    worker_nodes: HashMap<WorkerId, WorkerNode>,
266    database_id: DatabaseId,
267    context: RescheduleContext,
268    database_info: &InflightDatabaseInfo,
269) -> MetaResult<Option<crate::barrier::ReschedulePlan>> {
270    if worker_nodes.is_empty() {
271        return Err(anyhow!("no active streaming workers for reschedule").into());
272    }
273
274    if context.is_empty() {
275        return Ok(None);
276    }
277
278    // Barrier worker resolves this intent against a stable in-flight snapshot.
279    // Reuse the same fragment view for preview comparison and command building.
280    let all_prev_fragments = database_info
281        .fragment_infos()
282        .map(|fragment| (fragment.fragment_id, fragment))
283        .collect();
284
285    let previewed = preview_actor_assignments(&worker_nodes, &context.loaded)?;
286
287    if rendered_layout_matches_current(&previewed.fragments, &all_prev_fragments)? {
288        return Ok(None);
289    }
290
291    let actor_id_counter = env.actor_id_generator();
292    // Materialization only replaces preview actor ids with real ids. Worker
293    // placement, vnode ownership, and split assignment remain unchanged.
294    let rendered = materialize_actor_assignments(actor_id_counter, previewed);
295    let mut commands = build_reschedule_commands(rendered.fragments, context, all_prev_fragments)?;
296    Ok(commands.remove(&database_id))
297}
298
299impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
300    /// Create a new [`crate::barrier::worker::GlobalBarrierWorker`].
301    pub async fn new(
302        scheduled_barriers: schedule::ScheduledBarriers,
303        env: MetaSrvEnv,
304        metadata_manager: MetadataManager,
305        hummock_manager: HummockManagerRef,
306        source_manager: SourceManagerRef,
307        sink_manager: SinkCoordinatorManager,
308        scale_controller: ScaleControllerRef,
309        request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
310        barrier_scheduler: schedule::BarrierScheduler,
311        refresh_manager: GlobalRefreshManagerRef,
312    ) -> Self {
313        let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
314
315        let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
316            scheduled_barriers,
317            status,
318            metadata_manager,
319            hummock_manager,
320            source_manager,
321            scale_controller,
322            env.clone(),
323            barrier_scheduler,
324            refresh_manager,
325            sink_manager,
326        ));
327
328        Self::new_inner(env, request_rx, context).await
329    }
330
331    pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
332        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
333        let fut = (self.env.await_tree_reg())
334            .register_derived_root("Global Barrier Worker")
335            .instrument(self.run(shutdown_rx));
336        let join_handle = tokio::spawn(fut);
337
338        (join_handle, shutdown_tx)
339    }
340
341    /// Check whether we should pause on bootstrap from the system parameter and reset it.
342    async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
343        let paused = self
344            .env
345            .system_params_reader()
346            .await
347            .pause_on_next_bootstrap()
348            || self.env.opts.pause_on_next_bootstrap_offline;
349
350        if paused {
351            warn!(
352                "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
353                 It will now be reset to `false`. \
354                 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
355                PAUSE_ON_NEXT_BOOTSTRAP_KEY
356            );
357            self.env
358                .system_params_manager_impl_ref()
359                .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
360                .await?;
361        }
362        Ok(paused)
363    }
364
365    /// Start an infinite loop to take scheduled barriers and send them.
366    async fn run(mut self, shutdown_rx: Receiver<()>) {
367        tracing::info!(
368            "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
369            self.enable_recovery,
370            self.checkpoint_control.in_flight_barrier_nums,
371        );
372
373        if !self.enable_recovery {
374            let job_exist = self
375                .context
376                .metadata_manager
377                .catalog_controller
378                .has_any_streaming_jobs()
379                .await
380                .unwrap();
381            if job_exist {
382                panic!(
383                    "Some streaming jobs already exist in meta, please start with recovery enabled \
384                or clean up the metadata using `./risedev clean-data`"
385                );
386            }
387        }
388
389        {
390            // Bootstrap recovery. Here we simply trigger a recovery process to achieve the
391            // consistency.
392            // Even if there's no actor to recover, we still go through the recovery process to
393            // inject the first `Initial` barrier.
394            let span = tracing::info_span!("bootstrap_recovery");
395            crate::telemetry::report_event(
396                risingwave_pb::telemetry::TelemetryEventStage::Recovery,
397                "normal_recovery",
398                0,
399                None,
400                None,
401                None,
402            );
403
404            let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
405
406            self.recovery(paused, RecoveryReason::Bootstrap)
407                .instrument(span)
408                .await;
409        }
410
411        Box::pin(self.run_inner(shutdown_rx)).await
412    }
413}
414
415impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
416    fn enable_per_database_isolation(&self) -> bool {
417        self.system_enable_per_database_isolation && {
418            if let Err(e) =
419                risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()
420            {
421                warn!(error = %e.as_report(), "DatabaseFailureIsolation disabled by license");
422                false
423            } else {
424                true
425            }
426        }
427    }
428
429    pub(super) async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
430        let (local_notification_tx, mut local_notification_rx) =
431            tokio::sync::mpsc::unbounded_channel();
432        self.env
433            .notification_manager()
434            .insert_local_sender(local_notification_tx);
435
436        // Start the event loop.
437        loop {
438            tokio::select! {
439                biased;
440
441                // Shutdown
442                _ = &mut shutdown_rx => {
443                    tracing::info!("Barrier manager is stopped");
444                    break;
445                }
446
447                request = self.request_rx.recv() => {
448                    if let Some(request) = request {
449                        match request {
450                            BarrierManagerRequest::GetBackfillProgress(result_tx) => {
451                                let progress = self.checkpoint_control.gen_backfill_progress();
452                                if result_tx.send(Ok(progress)).is_err() {
453                                    error!("failed to send get ddl progress");
454                                }
455                            }
456                            BarrierManagerRequest::GetFragmentBackfillProgress(result_tx) => {
457                                let progress =
458                                    self.checkpoint_control.gen_fragment_backfill_progress();
459                                if result_tx.send(Ok(progress)).is_err() {
460                                    error!("failed to send get fragment backfill progress");
461                                }
462                            }
463                            BarrierManagerRequest::GetCdcProgress(result_tx) => {
464                                let progress = self.checkpoint_control.gen_cdc_progress();
465                                if result_tx.send(Ok(progress)).is_err() {
466                                    error!("failed to send get ddl progress");
467                                }
468                            }
469                            // Handle adhoc recovery triggered by user.
470                            BarrierManagerRequest::AdhocRecovery(sender) => {
471                                self.adhoc_recovery().await;
472                                if sender.send(()).is_err() {
473                                    warn!("failed to notify finish of adhoc recovery");
474                                }
475                            }
476                            BarrierManagerRequest::UpdateDatabaseBarrier( UpdateDatabaseBarrierRequest {
477                                database_id,
478                                barrier_interval_ms,
479                                checkpoint_frequency,
480                                sender,
481                            }) => {
482                                self.periodic_barriers
483                                    .update_database_barrier(
484                                        database_id,
485                                        barrier_interval_ms,
486                                        checkpoint_frequency,
487                                    );
488                                if sender.send(()).is_err() {
489                                    warn!("failed to notify finish of update database barrier");
490                                }
491                            }
492                            BarrierManagerRequest::MayHaveSnapshotBackfillingJob(tx) => {
493                                if tx.send(self.checkpoint_control.may_have_snapshot_backfilling_jobs()).is_err() {
494                                    warn!("failed to may have snapshot backfill job");
495                                }
496                            }
497                        }
498                    } else {
499                        tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
500                        return;
501                    }
502                }
503
504                changed_worker = self.active_streaming_nodes.changed() => {
505                    #[cfg(debug_assertions)]
506                    {
507                        self.active_streaming_nodes.validate_change().await;
508                    }
509
510                    info!(?changed_worker, "worker changed");
511
512                    match changed_worker {
513                        ActiveStreamingWorkerChange::Add(node)
514                        | ActiveStreamingWorkerChange::Update(node) => {
515                            self.partial_graph_manager
516                                .add_worker(node, self.context.clone())
517                                .await;
518                        }
519                        ActiveStreamingWorkerChange::Remove(node) => {
520                            self.partial_graph_manager.remove_worker(node);
521                        }
522                    }
523                }
524
525                notification = local_notification_rx.recv() => {
526                    let notification = notification.unwrap();
527                    if let LocalNotification::SystemParamsChange(p) = notification {
528                        {
529                            self.periodic_barriers.set_sys_barrier_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
530                            self.periodic_barriers
531                                .set_sys_checkpoint_frequency(p.checkpoint_frequency());
532                            self.system_enable_per_database_isolation = p.per_database_isolation();
533                        }
534                    }
535                }
536                complete_result = self
537                    .completing_task
538                    .next_completed_barrier(
539                        &mut self.periodic_barriers,
540                        &mut self.checkpoint_control,
541                        &mut self.partial_graph_manager,
542                        &self.context,
543                        &self.env,
544                ) => {
545                    match complete_result {
546                        Ok(output) => {
547                            self.checkpoint_control.ack_completed(&mut self.partial_graph_manager, output);
548                        }
549                        Err(e) => {
550                            self.failure_recovery(e).await;
551                        }
552                    }
553                },
554                event = self.checkpoint_control.next_event() => {
555                    let result: MetaResult<()> = try {
556                        match event {
557                            CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
558                                let database_id = entering_initializing.database_id();
559                                let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
560                                    resp.root_err.as_ref().map(|root_err| {
561                                        (*worker_id, ScoredError {
562                                            error: Status::internal(&root_err.err_msg),
563                                            score: Score(root_err.score)
564                                        })
565                                    })
566                                }));
567                                Self::report_collect_failure(&self.env, &error);
568                                self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
569                                let result: MetaResult<_> = try {
570                                    let runtime_info = self.context.reload_database_runtime_info(database_id).await.inspect_err(|err| {
571                                        warn!(%database_id, err = %err.as_report(), "reload runtime info failed");
572                                    })?;
573                                    let rendered_info = render_runtime_info(
574                                        self.env.actor_id_generator(),
575                                        &self.active_streaming_nodes,
576                                        &runtime_info.recovery_context,
577                                        database_id,
578                                    )
579                                    .inspect_err(|err: &MetaError| {
580                                        warn!(%database_id, err = %err.as_report(), "render runtime info failed");
581                                    })?;
582                                    if let Some(rendered_info) = rendered_info {
583                                        BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
584                                            database_id,
585                                            &rendered_info.job_infos,
586                                            &self.active_streaming_nodes,
587                                            &rendered_info.stream_actors,
588                                            &runtime_info.state_table_committed_epochs,
589                                        )
590                                        .inspect_err(|err| {
591                                            warn!(%database_id, err = ?err.as_report(), "database runtime info failed validation");
592                                        })?;
593                                        Some((runtime_info, rendered_info))
594                                    } else {
595                                        None
596                                    }
597                                };
598                                match result {
599                                    Ok(Some((runtime_info, rendered_info))) => {
600                                        entering_initializing.enter(
601                                            runtime_info,
602                                            rendered_info,
603                                            &mut self.partial_graph_manager,
604                                        );
605                                    }
606                                    Ok(None) => {
607                                        info!(%database_id, "database removed after reloading empty runtime info");
608                                        // mark ready to unblock subsequent request
609                                        self.context.mark_ready(MarkReadyOptions::Database(database_id));
610                                        entering_initializing.remove();
611                                    }
612                                    Err(e) => {
613                                        entering_initializing.fail_reload_runtime_info(e);
614                                    }
615                                }
616                            }
617                            CheckpointControlEvent::EnteringRunning(entering_running) => {
618                                self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
619                                entering_running.enter();
620                            }
621                        }
622                    };
623                    if let Err(e) = result {
624                        self.failure_recovery(e).await;
625                    }
626                }
627                event = self.partial_graph_manager.next_event(&self.context) => {
628                    let result: MetaResult<()> = try {
629                        match event {
630                            PartialGraphManagerEvent::Worker(_worker_id, WorkerEvent::WorkerConnected) => {
631                                // no handling on new worker connected event yet
632                            }
633                            PartialGraphManagerEvent::Worker(worker_id, WorkerEvent::WorkerError { err, affected_partial_graphs }) => {
634                                let failed_databases = self
635                                    .checkpoint_control
636                                    .databases_failed_at_worker_err(worker_id)
637                                    .chain(
638                                        affected_partial_graphs
639                                        .into_iter()
640                                        .map(|partial_graph_id| {
641                                            let (database_id, _) = from_partial_graph_id(partial_graph_id);
642                                            database_id
643                                        })
644                                    )
645                                    .collect::<HashSet<_>>();
646                                if !failed_databases.is_empty() {
647                                    if !self.enable_recovery {
648                                        panic!("control stream to worker {} failed but recovery not enabled: {}", worker_id, err.as_report());
649                                    }
650                                    if !self.enable_per_database_isolation() {
651                                        Err(err.clone())?;
652                                    }
653                                    Self::report_collect_failure(&self.env, &err);
654                                    for database_id in failed_databases {
655                                        if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
656                                            warn!(%worker_id, %database_id, "database entering recovery on node failure");
657                                            self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
658                                            self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
659                                            // TODO: add log on blocking time
660                                            let output = self.completing_task.wait_completing_task().await?;
661                                            entering_recovery.enter(output, &mut self.partial_graph_manager);
662                                        }
663                                    }
664                                }  else {
665                                    warn!(%worker_id, "no barrier to collect from worker, ignore err");
666                                }
667                                continue;
668                            }
669                            PartialGraphManagerEvent::PartialGraph(partial_graph_id, event) => {
670                                let (database_id, _creating_job_id) = from_partial_graph_id(partial_graph_id);
671                                match event {
672                                    PartialGraphEvent::BarrierCollected(collected_barrier) => {
673                                        self.checkpoint_control.barrier_collected(partial_graph_id, collected_barrier, &mut self.periodic_barriers)?;
674                                    }
675                                    PartialGraphEvent::Error(worker_id) => {
676                                        if !self.enable_recovery {
677                                            panic!("database {database_id} failure reported from {worker_id} but recovery not enabled")
678                                        }
679                                        if !self.enable_per_database_isolation() {
680                                                Err(anyhow!("database {database_id} report failure from {worker_id}"))?;
681                                            }
682                                        if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
683                                            warn!(%database_id, "database entering recovery");
684                                            self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
685                                            // TODO: add log on blocking time
686                                            let output = self.completing_task.wait_completing_task().await?;
687                                            entering_recovery.enter(output, &mut self.partial_graph_manager);
688                                        }
689                                    }
690                                    PartialGraphEvent::Reset(reset_resps) => {
691                                        self.checkpoint_control.on_partial_graph_reset(partial_graph_id, reset_resps);
692                                    }
693                                    PartialGraphEvent::Initialized => {
694                                        self.checkpoint_control.on_partial_graph_initialized(partial_graph_id);
695                                    }
696                                }
697                            }
698                        };
699                    };
700                    if let Err(e) = result {
701                        self.failure_recovery(e).await;
702                    }
703                }
704                new_barrier = self.periodic_barriers.next_barrier(&*self.context) => {
705                    let database_id = new_barrier.database_id;
706                    let new_barrier = if matches!(
707                        new_barrier.command,
708                        Some((Command::RescheduleIntent { .. }, _))
709                    ) {
710                        let env = self.env.clone();
711                        let worker_nodes = self
712                            .active_streaming_nodes
713                            .current()
714                            .iter()
715                            .map(|(worker_id, worker)| (*worker_id, worker.clone()))
716                            .collect();
717                        let database_info = self.checkpoint_control.database_info(database_id);
718                        match resolve_reschedule_intent(
719                            env,
720                            worker_nodes,
721                            database_info,
722                            new_barrier,
723                        ) {
724                            Ok(Some(new_barrier)) => new_barrier,
725                            Ok(None) => continue,
726                            Err(err) => {
727                                self.failure_recovery(err).await;
728                                continue;
729                            }
730                        }
731                    } else {
732                        new_barrier
733                    };
734                    if let Err(e) = self.checkpoint_control.handle_new_barrier(
735                        new_barrier,
736                        &mut self.partial_graph_manager,
737                        self.active_streaming_nodes.current()
738                    ) {
739                        if !self.enable_recovery {
740                            panic!(
741                                "failed to inject barrier to some databases but recovery not enabled: {:?}", (
742                                    database_id,
743                                    e.as_report()
744                                )
745                            );
746                        }
747                        let result: MetaResult<_> = try {
748                            if !self.enable_per_database_isolation() {
749                                let err = anyhow!("failed to inject barrier to databases: {:?}", (database_id, e.as_report()));
750                                Err(err)?;
751                            } else if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
752                                warn!(%database_id, e = %e.as_report(),"database entering recovery on inject failure");
753                                self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(e).context("inject barrier failure").into()));
754                                // TODO: add log on blocking time
755                                let output = self.completing_task.wait_completing_task().await?;
756                                entering_recovery.enter(output, &mut self.partial_graph_manager);
757                            }
758                        };
759                        if let Err(e) = result {
760                            self.failure_recovery(e).await;
761                        }
762                    }
763                }
764            }
765        }
766    }
767}
768
769impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
770    /// We need to make sure there are no changes when doing recovery
771    pub async fn clear_on_err(&mut self, err: &MetaError) {
772        // join spawned completing command to finish no matter it succeeds or not.
773        match replace(&mut self.completing_task, CompletingTask::None) {
774            CompletingTask::None | CompletingTask::Err(_) => {}
775            CompletingTask::Completing {
776                epochs_to_ack,
777                join_handle,
778                ..
779            } => {
780                info!("waiting for completing command to finish in recovery");
781                match join_handle.await {
782                    Err(e) => {
783                        warn!(err = %e.as_report(), "failed to join completing task");
784                    }
785                    Ok(Err(e)) => {
786                        warn!(
787                            err = %e.as_report(),
788                            "failed to complete barrier during clear"
789                        );
790                    }
791                    Ok(Ok(hummock_version_stats)) => {
792                        self.checkpoint_control.ack_completed(
793                            &mut self.partial_graph_manager,
794                            BarrierCompleteOutput {
795                                epochs_to_ack,
796                                hummock_version_stats,
797                            },
798                        );
799                    }
800                }
801            }
802        };
803        self.partial_graph_manager.notify_all_err(err);
804    }
805}
806
807impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
808    /// Set barrier manager status.
809    async fn failure_recovery(&mut self, err: MetaError) {
810        self.clear_on_err(&err).await;
811
812        if self.enable_recovery {
813            let span = tracing::info_span!(
814                "failure_recovery",
815                error = %err.as_report(),
816            );
817
818            crate::telemetry::report_event(
819                risingwave_pb::telemetry::TelemetryEventStage::Recovery,
820                "failure_recovery",
821                0,
822                None,
823                None,
824                None,
825            );
826
827            let reason = RecoveryReason::Failover(err);
828
829            // No need to clean dirty tables for barrier recovery,
830            // The foreground stream job should cleanup their own tables.
831            self.recovery(false, reason).instrument(span).await;
832        } else {
833            panic!(
834                "a streaming error occurred while recovery is disabled, aborting: {:?}",
835                err.as_report()
836            );
837        }
838    }
839
840    async fn adhoc_recovery(&mut self) {
841        let err = MetaErrorInner::AdhocRecovery.into();
842        self.clear_on_err(&err).await;
843
844        let span = tracing::info_span!(
845            "adhoc_recovery",
846            error = %err.as_report(),
847        );
848
849        crate::telemetry::report_event(
850            risingwave_pb::telemetry::TelemetryEventStage::Recovery,
851            "adhoc_recovery",
852            0,
853            None,
854            None,
855            None,
856        );
857
858        // No need to clean dirty tables for barrier recovery,
859        // The foreground stream job should cleanup their own tables.
860        self.recovery(false, RecoveryReason::Adhoc)
861            .instrument(span)
862            .await;
863    }
864}
865
866impl<C> GlobalBarrierWorker<C> {
867    /// Send barrier-complete-rpc and wait for responses from all CNs
868    pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
869        // Record failure in event log.
870        use risingwave_pb::meta::event_log;
871        let event = event_log::EventCollectBarrierFail {
872            error: error.to_report_string(),
873        };
874        env.event_log_manager_ref()
875            .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
876    }
877}
878
879mod retry_strategy {
880    use std::time::Duration;
881
882    use tokio_retry::strategy::{ExponentialBackoff, jitter};
883
884    // Retry base interval in milliseconds.
885    const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
886    // Retry max interval.
887    const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
888
889    // MrCroxx: Use concrete type here to prevent unsolved compiler issue.
890    // Feel free to replace the concrete type with TAIT after fixed.
891
892    // mod retry_backoff_future {
893    //     use std::future::Future;
894    //     use std::time::Duration;
895    //
896    //     use tokio::time::sleep;
897    //
898    //     pub(crate) type RetryBackoffFuture = impl Future<Output = ()> + Unpin + Send + 'static;
899    //
900    //     #[define_opaque(RetryBackoffFuture)]
901    //     pub(super) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
902    //         Box::pin(sleep(duration))
903    //     }
904    // }
905    // pub(crate) use retry_backoff_future::*;
906
907    pub(crate) type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
908
909    pub(crate) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
910        Box::pin(tokio::time::sleep(duration))
911    }
912
913    pub(crate) type RetryBackoffStrategy =
914        impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
915
916    /// Initialize a retry strategy for operation in recovery.
917    #[inline(always)]
918    pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> + Send + 'static {
919        ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
920            .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
921            .map(jitter)
922    }
923
924    #[define_opaque(RetryBackoffStrategy)]
925    pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
926        get_retry_strategy().map(get_retry_backoff_future)
927    }
928}
929
930pub(crate) use retry_strategy::*;
931use risingwave_common::error::tonic::extra::{Score, ScoredError};
932use risingwave_pb::meta::event_log::{Event, EventRecovery};
933
934use crate::barrier::partial_graph::{
935    PartialGraphEvent, PartialGraphManager, PartialGraphManagerEvent, WorkerEvent,
936};
937
938impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
939    /// Recovery the whole cluster from the latest epoch.
940    ///
941    /// If `paused_reason` is `Some`, all data sources (including connectors and DMLs) will be
942    /// immediately paused after recovery, until the user manually resume them either by restarting
943    /// the cluster or `risectl` command. Used for debugging purpose.
944    ///
945    /// Returns the new state of the barrier manager after recovery.
946    pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
947        // Clear all control streams to release resources (connections to compute nodes) first.
948        self.partial_graph_manager.clear_worker();
949
950        let reason_str = match &recovery_reason {
951            RecoveryReason::Bootstrap => "bootstrap".to_owned(),
952            RecoveryReason::Failover(err) => {
953                format!("failed over: {}", err.as_report())
954            }
955            RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
956        };
957        self.context.abort_and_mark_blocked(None, recovery_reason);
958
959        self.recovery_inner(is_paused, reason_str).await;
960        self.context.mark_ready(MarkReadyOptions::Global {
961            blocked_databases: self.checkpoint_control.recovering_databases().collect(),
962        });
963    }
964
965    #[await_tree::instrument("recovery({recovery_reason})")]
966    async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
967        let event_log_manager_ref = self.env.event_log_manager_ref();
968
969        tracing::info!("recovery start!");
970        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
971            EventRecovery::global_recovery_start(recovery_reason.clone()),
972        )]);
973
974        let retry_strategy = get_retry_strategy();
975
976        // We take retry into consideration because this is the latency user sees for a cluster to
977        // get recovered.
978        let recovery_timer = GLOBAL_META_METRICS
979            .recovery_latency
980            .with_label_values(&["global"])
981            .start_timer();
982
983        let enable_per_database_isolation = self.enable_per_database_isolation();
984
985        let recovery_future = tokio_retry::Retry::spawn(retry_strategy, || async {
986            self.env.stream_client_pool().invalidate_all();
987            // We need to notify_creating_job_failed in every recovery retry, because in outer create_streaming_job handler,
988            // it holds the reschedule_read_lock and wait for creating job to finish, and caused the following scale_actor fail
989            // to acquire the reschedule_write_lock, and then keep recovering, and then deadlock.
990            // TODO: refactor and fix this hacky implementation.
991            self.context
992                .notify_creating_job_failed(None, recovery_reason.clone())
993                .await;
994
995            let runtime_info_snapshot = self
996                .context
997                .reload_runtime_info()
998                .await?;
999            let BarrierWorkerRuntimeInfoSnapshot {
1000                active_streaming_nodes,
1001                recovery_context,
1002                mut state_table_committed_epochs,
1003                mut state_table_log_epochs,
1004                mut mv_depended_subscriptions,
1005                mut background_jobs,
1006                hummock_version_stats,
1007                database_infos,
1008                mut cdc_table_snapshot_splits,
1009            } = runtime_info_snapshot;
1010
1011            let mut partial_graph_manager = PartialGraphManager::recover(
1012                    self.env.clone(),
1013                    active_streaming_nodes.current(),
1014                    self.context.clone(),
1015                )
1016                .await;
1017            {
1018                let mut empty_databases = HashSet::new();
1019                let mut collected_databases = HashMap::new();
1020                let mut collecting_databases = HashMap::new();
1021                let mut failed_databases = HashMap::new();
1022                for &database_id in recovery_context.fragment_context.database_map.keys() {
1023                    let mut recoverer = partial_graph_manager.start_recover();
1024                    let result: MetaResult<_> = try {
1025                        let Some(rendered_info) = render_runtime_info(
1026                            self.env.actor_id_generator(),
1027                            &active_streaming_nodes,
1028                            &recovery_context,
1029                            database_id,
1030                        )
1031                            .inspect_err(|err: &MetaError| {
1032                                warn!(%database_id, err = %err.as_report(), "render runtime info failed");
1033                            })? else {
1034                            empty_databases.insert(database_id);
1035                            continue;
1036                        };
1037                        BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
1038                            database_id,
1039                            &rendered_info.job_infos,
1040                            &active_streaming_nodes,
1041                            &rendered_info.stream_actors,
1042                            &state_table_committed_epochs,
1043                        )
1044                        .inspect_err(|err| {
1045                            warn!(%database_id, err = %err.as_report(), "rendered runtime info failed validation");
1046                        })?;
1047                        let RenderedDatabaseRuntimeInfo {
1048                            job_infos,
1049                            stream_actors,
1050                            mut source_splits,
1051                            batch_refresh,
1052                        } = rendered_info;
1053                        recoverer.inject_database_initial_barrier(
1054                            database_id,
1055                            job_infos,
1056                            &recovery_context.job_extra_info,
1057                            &mut state_table_committed_epochs,
1058                            &mut state_table_log_epochs,
1059                            &recovery_context.fragment_relations,
1060                            &stream_actors,
1061                            &mut source_splits,
1062                            &mut background_jobs,
1063                            &mut mv_depended_subscriptions,
1064                            is_paused,
1065                            &hummock_version_stats,
1066                            &mut cdc_table_snapshot_splits,
1067                            batch_refresh,
1068                        )?
1069                    };
1070                    let collector = match result {
1071                        Ok(database) => {
1072                            DatabaseInitialBarrierCollector {
1073                                database_id,
1074                                initializing_partial_graphs: recoverer.all_initializing(),
1075                                database,
1076                            }
1077                        }
1078                        Err(e) => {
1079                            warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
1080                            assert!(failed_databases.insert(database_id, recoverer.failed()).is_none(), "non-duplicate");
1081                            continue;
1082                        }
1083                    };
1084                    if !collector.is_collected() {
1085                        assert!(collecting_databases.insert(database_id, collector).is_none());
1086                    } else {
1087                        warn!(%database_id, "database has no node to inject initial barrier");
1088                        assert!(collected_databases.insert(database_id, collector.finish()).is_none());
1089                    }
1090                }
1091                if !empty_databases.is_empty() {
1092                    info!(?empty_databases, "empty database in global recovery");
1093                }
1094                while !collecting_databases.is_empty() {
1095                    match partial_graph_manager.next_event(&self.context).await {
1096                        PartialGraphManagerEvent::Worker(_, WorkerEvent::WorkerConnected) => {
1097                            // not handle WorkerConnected yet
1098                        }
1099                        PartialGraphManagerEvent::Worker(worker_id, WorkerEvent::WorkerError { err, affected_partial_graphs }) => {
1100                            let affected_databases: HashSet<_> = affected_partial_graphs.into_iter().map(|partial_graph_id| {
1101                                let (database_id, _) = from_partial_graph_id(partial_graph_id);
1102                                database_id
1103                            }).collect();
1104                            warn!(%worker_id, err = %err.as_report(), "worker node failure during recovery");
1105                            for (failed_database_id, collector) in collecting_databases.extract_if(|database_id, collector| {
1106                                !collector.is_valid_after_worker_err(worker_id) || affected_databases.contains(database_id)
1107                            }) {
1108                                warn!(%failed_database_id, %worker_id, "database failed to recovery in global recovery due to worker node err");
1109                                let resetting_partial_graphs: HashSet<_> = collector.all_partial_graphs().collect();
1110                                partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1111                                assert!(failed_databases.insert(failed_database_id, resetting_partial_graphs).is_none());
1112                            }
1113                        }
1114                        PartialGraphManagerEvent::PartialGraph(partial_graph_id, event) => {
1115                            match event {
1116                                PartialGraphEvent::BarrierCollected(_) => {
1117                                    unreachable!("no barrier collected event on initializing")
1118                                }
1119                                PartialGraphEvent::Reset(_) => {
1120                                    unreachable!("no partial graph reset on initializing")
1121                                }
1122                                PartialGraphEvent::Error(worker_id) => {
1123                                    let (database_id, _) = from_partial_graph_id(partial_graph_id);
1124                                    if let Some(collector) = collecting_databases.remove(&database_id) {
1125                                        warn!(%database_id, %worker_id, "database reset during global recovery");
1126                                        let resetting_partial_graphs: HashSet<_> = collector.all_partial_graphs().collect();
1127                                        partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1128                                        assert!(failed_databases.insert(database_id, resetting_partial_graphs).is_none());
1129                                    } else if let Some(database) = collected_databases.remove(&database_id) {
1130                                        warn!(%database_id, %worker_id, "database initialized but later reset during global recovery");
1131                                        let resetting_partial_graphs: HashSet<_> = database_partial_graphs(database_id, database.independent_checkpoint_job_controls.keys().copied()).collect();
1132                                        partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1133                                        assert!(failed_databases.insert(database_id, resetting_partial_graphs).is_none());
1134                                    } else {
1135                                        assert!(failed_databases.contains_key(&database_id));
1136                                    }
1137                                }
1138                                PartialGraphEvent::Initialized => {
1139                                    let (database_id, _) = from_partial_graph_id(partial_graph_id);
1140                                    if failed_databases.contains_key(&database_id) {
1141                                        assert!(!collecting_databases.contains_key(&database_id));
1142                                        // ignore the lately initialized partial graph of failed database
1143                                        continue;
1144                                    }
1145                                    let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
1146                                        unreachable!("should exist")
1147                                    };
1148                                    let collector = entry.get_mut();
1149                                    collector.partial_graph_initialized(partial_graph_id);
1150                                    if collector.is_collected() {
1151                                        let collector = entry.remove();
1152                                        assert!(collected_databases.insert(database_id, collector.finish()).is_none());
1153                                    }
1154                                }
1155                            }
1156                        }
1157                    }
1158                }
1159                debug!("collected initial barrier");
1160                if !background_jobs.is_empty() {
1161                    warn!(job_ids = ?background_jobs.iter().collect_vec(), "unused recovered background mview in recovery");
1162                }
1163                if !mv_depended_subscriptions.is_empty() {
1164                    warn!(?mv_depended_subscriptions, "unused subscription infos in recovery");
1165                }
1166                if !state_table_committed_epochs.is_empty() {
1167                    warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
1168                }
1169                if !enable_per_database_isolation && !failed_databases.is_empty() {
1170                    return Err(anyhow!(
1171                        "global recovery failed due to failure of databases {:?}",
1172                        failed_databases.keys().collect_vec()).into()
1173                    );
1174                }
1175                let checkpoint_control = CheckpointControl::recover(
1176                    collected_databases,
1177                    failed_databases,
1178                    hummock_version_stats,
1179                    self.env.clone(),
1180                );
1181
1182                let reader = self.env.system_params_reader().await;
1183                let checkpoint_frequency = reader.checkpoint_frequency();
1184                let barrier_interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
1185                let periodic_barriers = PeriodicBarriers::new(
1186                    barrier_interval,
1187                    checkpoint_frequency,
1188                    database_infos,
1189                );
1190
1191                Ok((
1192                    active_streaming_nodes,
1193                    partial_graph_manager,
1194                    checkpoint_control,
1195                    periodic_barriers,
1196                ))
1197            }
1198        }.inspect_err(|err: &MetaError| {
1199            tracing::error!(error = %err.as_report(), "recovery failed");
1200            event_log_manager_ref.add_event_logs(vec![Event::Recovery(
1201                EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
1202            )]);
1203            GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
1204        }))
1205        .instrument(tracing::info_span!("recovery_attempt"));
1206
1207        let mut recover_txs = vec![];
1208        let mut update_barrier_requests = vec![];
1209        pin_mut!(recovery_future);
1210        let mut request_rx_closed = false;
1211        let new_state = loop {
1212            select! {
1213                biased;
1214                new_state = &mut recovery_future => {
1215                    break new_state.expect("Retry until recovery success.");
1216                }
1217                request = pin!(self.request_rx.recv()), if !request_rx_closed => {
1218                    let Some(request) = request else {
1219                        warn!("request rx channel closed during recovery");
1220                        request_rx_closed = true;
1221                        continue;
1222                    };
1223                    match request {
1224                        BarrierManagerRequest::GetBackfillProgress(tx) => {
1225                            let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1226                        }
1227                        BarrierManagerRequest::GetFragmentBackfillProgress(tx) => {
1228                            let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1229                        }
1230                        BarrierManagerRequest::GetCdcProgress(tx) => {
1231                            let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1232                        }
1233                        BarrierManagerRequest::AdhocRecovery(tx) => {
1234                            recover_txs.push(tx);
1235                        }
1236                        BarrierManagerRequest::UpdateDatabaseBarrier(request) => {
1237                            update_barrier_requests.push(request);
1238                        }
1239                        BarrierManagerRequest::MayHaveSnapshotBackfillingJob(tx) => {
1240                            // may recover snapshot backfill jobs
1241                            let _ = tx.send(true);
1242                        }
1243                    }
1244                }
1245            }
1246        };
1247
1248        let duration = recovery_timer.stop_and_record();
1249
1250        (
1251            self.active_streaming_nodes,
1252            self.partial_graph_manager,
1253            self.checkpoint_control,
1254            self.periodic_barriers,
1255        ) = new_state;
1256
1257        tracing::info!("recovery success");
1258
1259        for UpdateDatabaseBarrierRequest {
1260            database_id,
1261            barrier_interval_ms,
1262            checkpoint_frequency,
1263            sender,
1264        } in update_barrier_requests
1265        {
1266            self.periodic_barriers.update_database_barrier(
1267                database_id,
1268                barrier_interval_ms,
1269                checkpoint_frequency,
1270            );
1271            let _ = sender.send(());
1272        }
1273
1274        for tx in recover_txs {
1275            let _ = tx.send(());
1276        }
1277
1278        let recovering_databases = self
1279            .checkpoint_control
1280            .recovering_databases()
1281            .map(|database| database.as_raw_id())
1282            .collect_vec();
1283        let running_databases = self
1284            .checkpoint_control
1285            .running_databases()
1286            .map(|database| database.as_raw_id())
1287            .collect_vec();
1288
1289        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
1290            EventRecovery::global_recovery_success(
1291                recovery_reason.clone(),
1292                duration as f32,
1293                running_databases,
1294                recovering_databases,
1295            ),
1296        )]);
1297
1298        self.env
1299            .notification_manager()
1300            .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
1301        self.env
1302            .notification_manager()
1303            .notify_compute_without_version(Operation::Update, Info::Recovery(Recovery {}));
1304    }
1305}