risingwave_meta/barrier/
worker.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::pin::pin;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use arc_swap::ArcSwap;
24use futures::{TryFutureExt, pin_mut};
25use itertools::Itertools;
26use risingwave_common::catalog::DatabaseId;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::system_param::{AdaptiveParallelismStrategy, PAUSE_ON_NEXT_BOOTSTRAP_KEY};
29use risingwave_meta_model::WorkerId;
30use risingwave_pb::common::WorkerNode;
31use risingwave_pb::meta::Recovery;
32use risingwave_pb::meta::subscribe_response::{Info, Operation};
33use thiserror_ext::AsReport;
34use tokio::select;
35use tokio::sync::mpsc;
36use tokio::sync::oneshot::{Receiver, Sender};
37use tokio::task::JoinHandle;
38use tonic::Status;
39use tracing::{Instrument, debug, error, info, warn};
40
41use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
42use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
43use crate::barrier::context::recovery::{RenderedDatabaseRuntimeInfo, render_runtime_info};
44use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
45use crate::barrier::info::InflightDatabaseInfo;
46use crate::barrier::rpc::{
47    DatabaseInitialBarrierCollector, database_partial_graphs, from_partial_graph_id,
48    merge_node_rpc_errors,
49};
50use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
51use crate::barrier::{
52    BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command,
53    RecoveryReason, RescheduleContext, UpdateDatabaseBarrierRequest, schedule,
54};
55use crate::controller::scale::render_actor_assignments;
56use crate::error::MetaErrorInner;
57use crate::hummock::HummockManagerRef;
58use crate::manager::sink_coordination::SinkCoordinatorManager;
59use crate::manager::{
60    ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
61    MetadataManager,
62};
63use crate::rpc::metrics::GLOBAL_META_METRICS;
64use crate::stream::{
65    GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef, build_reschedule_commands,
66};
67use crate::{MetaError, MetaResult};
68
69/// [`crate::barrier::worker::GlobalBarrierWorker`] sends barriers to all registered compute nodes and
70/// collect them, with monotonic increasing epoch numbers. On compute nodes, `LocalBarrierManager`
71/// in `risingwave_stream` crate will serve these requests and dispatch them to source actors.
72///
73/// Configuration change in our system is achieved by the mutation in the barrier. Thus,
74/// [`crate::barrier::worker::GlobalBarrierWorker`] provides a set of interfaces like a state machine,
75/// accepting [`crate::barrier::command::Command`] that carries info to build `Mutation`. To keep the consistency between
76/// barrier manager and meta store, some actions like "drop materialized view" or "create mv on mv"
77/// must be done in barrier manager transactional using [`crate::barrier::command::Command`].
78pub(super) struct GlobalBarrierWorker<C> {
79    /// Enable recovery or not when failover.
80    enable_recovery: bool,
81
82    /// The queue of scheduled barriers.
83    periodic_barriers: PeriodicBarriers,
84
85    /// Whether per database failure isolation is enabled in system parameters.
86    system_enable_per_database_isolation: bool,
87
88    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
89
90    pub(super) context: Arc<C>,
91
92    env: MetaSrvEnv,
93
94    checkpoint_control: CheckpointControl,
95
96    /// Command that has been collected but is still completing.
97    /// The join handle of the completing future is stored.
98    completing_task: CompletingTask,
99
100    request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
101
102    active_streaming_nodes: ActiveStreamingWorkerNodes,
103
104    partial_graph_manager: PartialGraphManager,
105}
106
107#[cfg(test)]
108mod tests {
109    use std::collections::HashMap;
110
111    use tokio::sync::oneshot;
112
113    use super::*;
114    use crate::barrier::RescheduleContext;
115    use crate::barrier::notifier::Notifier;
116
117    #[tokio::test]
118    async fn test_reschedule_intent_without_workers_notifies_start_failed() {
119        let env = MetaSrvEnv::for_test().await;
120        let database_id = DatabaseId::new(1);
121        let database_info =
122            InflightDatabaseInfo::empty(database_id, env.shared_actor_infos().clone());
123        let (started_tx, started_rx) = oneshot::channel();
124        let (_collected_tx, _collected_rx) = oneshot::channel();
125
126        let notifier = Notifier {
127            started: Some(started_tx),
128            collected: Some(_collected_tx),
129        };
130
131        let new_barrier = schedule::NewBarrier {
132            database_id,
133            command: Some((
134                Command::RescheduleIntent {
135                    context: RescheduleContext::empty(),
136                    reschedule_plan: None,
137                },
138                vec![notifier],
139            )),
140            span: tracing::Span::none(),
141            checkpoint: false,
142        };
143
144        let result = resolve_reschedule_intent(
145            env,
146            HashMap::new(),
147            AdaptiveParallelismStrategy::default(),
148            Some(&database_info),
149            new_barrier,
150        );
151
152        assert!(matches!(result, Ok(None)));
153        let started = started_rx.await.expect("started notifier dropped");
154        assert!(started.is_err());
155    }
156}
157
158impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
159    pub(super) async fn new_inner(
160        env: MetaSrvEnv,
161        request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
162        context: Arc<C>,
163    ) -> Self {
164        let enable_recovery = env.opts.enable_recovery;
165
166        let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized();
167
168        let partial_graph_manager = PartialGraphManager::uninitialized(env.clone());
169
170        let reader = env.system_params_reader().await;
171        let system_enable_per_database_isolation = reader.per_database_isolation();
172        // Load config will be performed in bootstrap phase.
173        let periodic_barriers = PeriodicBarriers::default();
174        let adaptive_parallelism_strategy = reader.adaptive_parallelism_strategy();
175
176        let checkpoint_control = CheckpointControl::new(env.clone());
177        Self {
178            enable_recovery,
179            periodic_barriers,
180            system_enable_per_database_isolation,
181            adaptive_parallelism_strategy,
182            context,
183            env,
184            checkpoint_control,
185            completing_task: CompletingTask::None,
186            request_rx,
187            active_streaming_nodes,
188            partial_graph_manager,
189        }
190    }
191}
192
193fn resolve_reschedule_intent(
194    env: MetaSrvEnv,
195    worker_nodes: HashMap<WorkerId, WorkerNode>,
196    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
197    database_info: Option<&InflightDatabaseInfo>,
198    mut new_barrier: schedule::NewBarrier,
199) -> MetaResult<Option<schedule::NewBarrier>> {
200    let Some((command, notifiers)) = new_barrier.command.take() else {
201        return Ok(Some(new_barrier));
202    };
203
204    match command {
205        Command::RescheduleIntent {
206            context,
207            reschedule_plan,
208        } => {
209            if let Some(reschedule_plan) = reschedule_plan {
210                new_barrier.command = Some((
211                    Command::RescheduleIntent {
212                        context,
213                        reschedule_plan: Some(reschedule_plan),
214                    },
215                    notifiers,
216                ));
217                return Ok(Some(new_barrier));
218            }
219            let span = tracing::info_span!(
220                "resolve_reschedule_intent",
221                database_id = %new_barrier.database_id
222            );
223            let reschedule_plan = {
224                let _guard = span.enter();
225                build_reschedule_from_context(
226                    &env,
227                    worker_nodes,
228                    adaptive_parallelism_strategy,
229                    new_barrier.database_id,
230                    context,
231                    database_info.ok_or_else(|| {
232                        anyhow!(
233                            "database {} not found when resolving reschedule intent",
234                            new_barrier.database_id
235                        )
236                    })?,
237                )
238            };
239            match reschedule_plan {
240                Ok(Some(reschedule_plan)) => {
241                    new_barrier.command = Some((
242                        Command::RescheduleIntent {
243                            context: RescheduleContext::empty(),
244                            reschedule_plan: Some(reschedule_plan),
245                        },
246                        notifiers,
247                    ));
248                    Ok(Some(new_barrier))
249                }
250                Ok(None) => {
251                    // No-op intent: notify to unblock callers even though no barrier is injected.
252                    for mut notifier in notifiers {
253                        notifier.notify_started();
254                        notifier.notify_collected();
255                    }
256                    Ok(None)
257                }
258                Err(err) => {
259                    for notifier in notifiers {
260                        notifier.notify_start_failed(err.clone());
261                    }
262                    Ok(None)
263                }
264            }
265        }
266        _ => {
267            new_barrier.command = Some((command, notifiers));
268            Ok(Some(new_barrier))
269        }
270    }
271}
272
273fn build_reschedule_from_context(
274    env: &MetaSrvEnv,
275    worker_nodes: HashMap<WorkerId, WorkerNode>,
276    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
277    database_id: DatabaseId,
278    context: RescheduleContext,
279    database_info: &InflightDatabaseInfo,
280) -> MetaResult<Option<crate::barrier::ReschedulePlan>> {
281    if worker_nodes.is_empty() {
282        return Err(anyhow!("no active streaming workers for reschedule").into());
283    }
284
285    let actor_id_counter = env.actor_id_generator();
286    if context.is_empty() {
287        return Ok(None);
288    }
289
290    let rendered = render_actor_assignments(
291        actor_id_counter,
292        &worker_nodes,
293        adaptive_parallelism_strategy,
294        &context.loaded,
295    )?;
296
297    let all_prev_fragments = database_info
298        .fragment_infos()
299        .map(|fragment| (fragment.fragment_id, fragment))
300        .collect();
301    let mut commands = build_reschedule_commands(rendered.fragments, context, all_prev_fragments)?;
302    Ok(commands.remove(&database_id))
303}
304
305impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
306    /// Create a new [`crate::barrier::worker::GlobalBarrierWorker`].
307    pub async fn new(
308        scheduled_barriers: schedule::ScheduledBarriers,
309        env: MetaSrvEnv,
310        metadata_manager: MetadataManager,
311        hummock_manager: HummockManagerRef,
312        source_manager: SourceManagerRef,
313        sink_manager: SinkCoordinatorManager,
314        scale_controller: ScaleControllerRef,
315        request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
316        barrier_scheduler: schedule::BarrierScheduler,
317        refresh_manager: GlobalRefreshManagerRef,
318    ) -> Self {
319        let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
320
321        let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
322            scheduled_barriers,
323            status,
324            metadata_manager,
325            hummock_manager,
326            source_manager,
327            scale_controller,
328            env.clone(),
329            barrier_scheduler,
330            refresh_manager,
331            sink_manager,
332        ));
333
334        Self::new_inner(env, request_rx, context).await
335    }
336
337    pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
338        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
339        let fut = (self.env.await_tree_reg())
340            .register_derived_root("Global Barrier Worker")
341            .instrument(self.run(shutdown_rx));
342        let join_handle = tokio::spawn(fut);
343
344        (join_handle, shutdown_tx)
345    }
346
347    /// Check whether we should pause on bootstrap from the system parameter and reset it.
348    async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
349        let paused = self
350            .env
351            .system_params_reader()
352            .await
353            .pause_on_next_bootstrap()
354            || self.env.opts.pause_on_next_bootstrap_offline;
355
356        if paused {
357            warn!(
358                "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
359                 It will now be reset to `false`. \
360                 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
361                PAUSE_ON_NEXT_BOOTSTRAP_KEY
362            );
363            self.env
364                .system_params_manager_impl_ref()
365                .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
366                .await?;
367        }
368        Ok(paused)
369    }
370
371    /// Start an infinite loop to take scheduled barriers and send them.
372    async fn run(mut self, shutdown_rx: Receiver<()>) {
373        tracing::info!(
374            "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
375            self.enable_recovery,
376            self.checkpoint_control.in_flight_barrier_nums,
377        );
378
379        if !self.enable_recovery {
380            let job_exist = self
381                .context
382                .metadata_manager
383                .catalog_controller
384                .has_any_streaming_jobs()
385                .await
386                .unwrap();
387            if job_exist {
388                panic!(
389                    "Some streaming jobs already exist in meta, please start with recovery enabled \
390                or clean up the metadata using `./risedev clean-data`"
391                );
392            }
393        }
394
395        {
396            // Bootstrap recovery. Here we simply trigger a recovery process to achieve the
397            // consistency.
398            // Even if there's no actor to recover, we still go through the recovery process to
399            // inject the first `Initial` barrier.
400            let span = tracing::info_span!("bootstrap_recovery");
401            crate::telemetry::report_event(
402                risingwave_pb::telemetry::TelemetryEventStage::Recovery,
403                "normal_recovery",
404                0,
405                None,
406                None,
407                None,
408            );
409
410            let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
411
412            self.recovery(paused, RecoveryReason::Bootstrap)
413                .instrument(span)
414                .await;
415        }
416
417        Box::pin(self.run_inner(shutdown_rx)).await
418    }
419}
420
421impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
422    fn enable_per_database_isolation(&self) -> bool {
423        self.system_enable_per_database_isolation && {
424            if let Err(e) =
425                risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()
426            {
427                warn!(error = %e.as_report(), "DatabaseFailureIsolation disabled by license");
428                false
429            } else {
430                true
431            }
432        }
433    }
434
435    pub(super) async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
436        let (local_notification_tx, mut local_notification_rx) =
437            tokio::sync::mpsc::unbounded_channel();
438        self.env
439            .notification_manager()
440            .insert_local_sender(local_notification_tx);
441
442        // Start the event loop.
443        loop {
444            tokio::select! {
445                biased;
446
447                // Shutdown
448                _ = &mut shutdown_rx => {
449                    tracing::info!("Barrier manager is stopped");
450                    break;
451                }
452
453                request = self.request_rx.recv() => {
454                    if let Some(request) = request {
455                        match request {
456                            BarrierManagerRequest::GetBackfillProgress(result_tx) => {
457                                let progress = self.checkpoint_control.gen_backfill_progress();
458                                if result_tx.send(Ok(progress)).is_err() {
459                                    error!("failed to send get ddl progress");
460                                }
461                            }
462                            BarrierManagerRequest::GetFragmentBackfillProgress(result_tx) => {
463                                let progress =
464                                    self.checkpoint_control.gen_fragment_backfill_progress();
465                                if result_tx.send(Ok(progress)).is_err() {
466                                    error!("failed to send get fragment backfill progress");
467                                }
468                            }
469                            BarrierManagerRequest::GetCdcProgress(result_tx) => {
470                                let progress = self.checkpoint_control.gen_cdc_progress();
471                                if result_tx.send(Ok(progress)).is_err() {
472                                    error!("failed to send get ddl progress");
473                                }
474                            }
475                            // Handle adhoc recovery triggered by user.
476                            BarrierManagerRequest::AdhocRecovery(sender) => {
477                                self.adhoc_recovery().await;
478                                if sender.send(()).is_err() {
479                                    warn!("failed to notify finish of adhoc recovery");
480                                }
481                            }
482                            BarrierManagerRequest::UpdateDatabaseBarrier( UpdateDatabaseBarrierRequest {
483                                database_id,
484                                barrier_interval_ms,
485                                checkpoint_frequency,
486                                sender,
487                            }) => {
488                                self.periodic_barriers
489                                    .update_database_barrier(
490                                        database_id,
491                                        barrier_interval_ms,
492                                        checkpoint_frequency,
493                                    );
494                                if sender.send(()).is_err() {
495                                    warn!("failed to notify finish of update database barrier");
496                                }
497                            }
498                            BarrierManagerRequest::MayHaveSnapshotBackfillingJob(tx) => {
499                                if tx.send(self.checkpoint_control.may_have_snapshot_backfilling_jobs()).is_err() {
500                                    warn!("failed to may have snapshot backfill job");
501                                }
502                            }
503                        }
504                    } else {
505                        tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
506                        return;
507                    }
508                }
509
510                changed_worker = self.active_streaming_nodes.changed() => {
511                    #[cfg(debug_assertions)]
512                    {
513                        self.active_streaming_nodes.validate_change().await;
514                    }
515
516                    info!(?changed_worker, "worker changed");
517
518                    match changed_worker {
519                        ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) => {
520                            self.partial_graph_manager.add_worker(node, &*self.context).await;
521                        }
522                        ActiveStreamingWorkerChange::Remove(node) => {
523                            self.partial_graph_manager.remove_worker(node);
524                        }
525                    }
526                }
527
528                notification = local_notification_rx.recv() => {
529                    let notification = notification.unwrap();
530                    if let LocalNotification::SystemParamsChange(p) = notification {
531                        {
532                            self.periodic_barriers.set_sys_barrier_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
533                            self.periodic_barriers
534                                .set_sys_checkpoint_frequency(p.checkpoint_frequency());
535                            self.system_enable_per_database_isolation = p.per_database_isolation();
536                            self.adaptive_parallelism_strategy = p.adaptive_parallelism_strategy();
537                        }
538                    }
539                }
540                complete_result = self
541                    .completing_task
542                    .next_completed_barrier(
543                        &mut self.periodic_barriers,
544                        &mut self.checkpoint_control,
545                        &mut self.partial_graph_manager,
546                        &self.context,
547                        &self.env,
548                ) => {
549                    match complete_result {
550                        Ok(output) => {
551                            self.checkpoint_control.ack_completed(output);
552                        }
553                        Err(e) => {
554                            self.failure_recovery(e).await;
555                        }
556                    }
557                },
558                event = self.checkpoint_control.next_event() => {
559                    let result: MetaResult<()> = try {
560                        match event {
561                            CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
562                                let database_id = entering_initializing.database_id();
563                                let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
564                                    resp.root_err.as_ref().map(|root_err| {
565                                        (*worker_id, ScoredError {
566                                            error: Status::internal(&root_err.err_msg),
567                                            score: Score(root_err.score)
568                                        })
569                                    })
570                                }));
571                                Self::report_collect_failure(&self.env, &error);
572                                self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
573                                let result: MetaResult<_> = try {
574                                    let runtime_info = self.context.reload_database_runtime_info(database_id).await.inspect_err(|err| {
575                                        warn!(%database_id, err = %err.as_report(), "reload runtime info failed");
576                                    })?;
577                                    let rendered_info = render_runtime_info(
578                                        self.env.actor_id_generator(),
579                                        &self.active_streaming_nodes,
580                                        self.adaptive_parallelism_strategy,
581                                        &runtime_info.recovery_context,
582                                        database_id,
583                                    )
584                                    .inspect_err(|err: &MetaError| {
585                                        warn!(%database_id, err = %err.as_report(), "render runtime info failed");
586                                    })?;
587                                    if let Some(rendered_info) = rendered_info {
588                                        BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
589                                            database_id,
590                                            &rendered_info.job_infos,
591                                            &self.active_streaming_nodes,
592                                            &rendered_info.stream_actors,
593                                            &runtime_info.state_table_committed_epochs,
594                                        )
595                                        .inspect_err(|err| {
596                                            warn!(%database_id, err = ?err.as_report(), "database runtime info failed validation");
597                                        })?;
598                                        Some((runtime_info, rendered_info))
599                                    } else {
600                                        None
601                                    }
602                                };
603                                match result {
604                                    Ok(Some((runtime_info, rendered_info))) => {
605                                        entering_initializing.enter(
606                                            runtime_info,
607                                            rendered_info,
608                                            &mut self.partial_graph_manager,
609                                        );
610                                    }
611                                    Ok(None) => {
612                                        info!(%database_id, "database removed after reloading empty runtime info");
613                                        // mark ready to unblock subsequent request
614                                        self.context.mark_ready(MarkReadyOptions::Database(database_id));
615                                        entering_initializing.remove();
616                                    }
617                                    Err(e) => {
618                                        entering_initializing.fail_reload_runtime_info(e);
619                                    }
620                                }
621                            }
622                            CheckpointControlEvent::EnteringRunning(entering_running) => {
623                                self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
624                                entering_running.enter();
625                            }
626                        }
627                    };
628                    if let Err(e) = result {
629                        self.failure_recovery(e).await;
630                    }
631                }
632                event = self.partial_graph_manager.next_event(&self.context) => {
633                    let result: MetaResult<()> = try {
634                        match event {
635                            PartialGraphManagerEvent::Worker(_worker_id, WorkerEvent::WorkerConnected) => {
636                                // no handling on new worker connected event yet
637                            }
638                            PartialGraphManagerEvent::Worker(worker_id, WorkerEvent::WorkerError { err, affected_partial_graphs }) => {
639                                let failed_databases = self
640                                    .checkpoint_control
641                                    .databases_failed_at_worker_err(worker_id)
642                                    .chain(
643                                        affected_partial_graphs
644                                        .into_iter()
645                                        .map(|partial_graph_id| {
646                                            let (database_id, _) = from_partial_graph_id(partial_graph_id);
647                                            database_id
648                                        })
649                                    )
650                                    .collect::<HashSet<_>>();
651                                if !failed_databases.is_empty() {
652                                    if !self.enable_recovery {
653                                        panic!("control stream to worker {} failed but recovery not enabled: {}", worker_id, err.as_report());
654                                    }
655                                    if !self.enable_per_database_isolation() {
656                                        Err(err.clone())?;
657                                    }
658                                    Self::report_collect_failure(&self.env, &err);
659                                    for database_id in failed_databases {
660                                        if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
661                                            warn!(%worker_id, %database_id, "database entering recovery on node failure");
662                                            self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
663                                            self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
664                                            // TODO: add log on blocking time
665                                            let output = self.completing_task.wait_completing_task().await?;
666                                            entering_recovery.enter(output, &mut self.partial_graph_manager);
667                                        }
668                                    }
669                                }  else {
670                                    warn!(%worker_id, "no barrier to collect from worker, ignore err");
671                                }
672                                continue;
673                            }
674                            PartialGraphManagerEvent::PartialGraph(partial_graph_id, event) => {
675                                let (database_id, _creating_job_id) = from_partial_graph_id(partial_graph_id);
676                                match event {
677                                    PartialGraphEvent::BarrierCollected(collected_barrier) => {
678                                        self.checkpoint_control.barrier_collected(partial_graph_id, collected_barrier, &mut self.periodic_barriers)?;
679                                    }
680                                    PartialGraphEvent::Error(worker_id) => {
681                                        if !self.enable_recovery {
682                                            panic!("database {database_id} failure reported from {worker_id} but recovery not enabled")
683                                        }
684                                        if !self.enable_per_database_isolation() {
685                                                Err(anyhow!("database {database_id} report failure from {worker_id}"))?;
686                                            }
687                                        if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
688                                            warn!(%database_id, "database entering recovery");
689                                            self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
690                                            // TODO: add log on blocking time
691                                            let output = self.completing_task.wait_completing_task().await?;
692                                            entering_recovery.enter(output, &mut self.partial_graph_manager);
693                                        }
694                                    }
695                                    PartialGraphEvent::Reset(reset_resps) => {
696                                        self.checkpoint_control.on_partial_graph_reset(partial_graph_id, reset_resps);
697                                    }
698                                    PartialGraphEvent::Initialized => {
699                                        self.checkpoint_control.on_partial_graph_initialized(partial_graph_id);
700                                    }
701                                }
702                            }
703                        };
704                    };
705                    if let Err(e) = result {
706                        self.failure_recovery(e).await;
707                    }
708                }
709                new_barrier = self.periodic_barriers.next_barrier(&*self.context) => {
710                    let database_id = new_barrier.database_id;
711                    let new_barrier = if matches!(
712                        new_barrier.command,
713                        Some((Command::RescheduleIntent { .. }, _))
714                    ) {
715                        let env = self.env.clone();
716                        let worker_nodes = self
717                            .active_streaming_nodes
718                            .current()
719                            .iter()
720                            .map(|(worker_id, worker)| (*worker_id, worker.clone()))
721                            .collect();
722                        let adaptive_parallelism_strategy = self.adaptive_parallelism_strategy;
723                        let database_info = self.checkpoint_control.database_info(database_id);
724                        match resolve_reschedule_intent(
725                            env,
726                            worker_nodes,
727                            adaptive_parallelism_strategy,
728                            database_info,
729                            new_barrier,
730                        ) {
731                            Ok(Some(new_barrier)) => new_barrier,
732                            Ok(None) => continue,
733                            Err(err) => {
734                                self.failure_recovery(err).await;
735                                continue;
736                            }
737                        }
738                    } else {
739                        new_barrier
740                    };
741                    if let Err(e) = self.checkpoint_control.handle_new_barrier(new_barrier, &mut self.partial_graph_manager) {
742                        if !self.enable_recovery {
743                            panic!(
744                                "failed to inject barrier to some databases but recovery not enabled: {:?}", (
745                                    database_id,
746                                    e.as_report()
747                                )
748                            );
749                        }
750                        let result: MetaResult<_> = try {
751                            if !self.enable_per_database_isolation() {
752                                let err = anyhow!("failed to inject barrier to databases: {:?}", (database_id, e.as_report()));
753                                Err(err)?;
754                            } else if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
755                                warn!(%database_id, e = %e.as_report(),"database entering recovery on inject failure");
756                                self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(e).context("inject barrier failure").into()));
757                                // TODO: add log on blocking time
758                                let output = self.completing_task.wait_completing_task().await?;
759                                entering_recovery.enter(output, &mut self.partial_graph_manager);
760                            }
761                        };
762                        if let Err(e) = result {
763                            self.failure_recovery(e).await;
764                        }
765                    }
766                }
767            }
768            self.checkpoint_control.update_barrier_nums_metrics();
769        }
770    }
771}
772
773impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
774    /// We need to make sure there are no changes when doing recovery
775    pub async fn clear_on_err(&mut self, err: &MetaError) {
776        // join spawned completing command to finish no matter it succeeds or not.
777        let is_err = match replace(&mut self.completing_task, CompletingTask::None) {
778            CompletingTask::None => false,
779            CompletingTask::Completing {
780                epochs_to_ack,
781                join_handle,
782                ..
783            } => {
784                info!("waiting for completing command to finish in recovery");
785                match join_handle.await {
786                    Err(e) => {
787                        warn!(err = %e.as_report(), "failed to join completing task");
788                        true
789                    }
790                    Ok(Err(e)) => {
791                        warn!(
792                            err = %e.as_report(),
793                            "failed to complete barrier during clear"
794                        );
795                        true
796                    }
797                    Ok(Ok(hummock_version_stats)) => {
798                        self.checkpoint_control
799                            .ack_completed(BarrierCompleteOutput {
800                                epochs_to_ack,
801                                hummock_version_stats,
802                            });
803                        false
804                    }
805                }
806            }
807            CompletingTask::Err(_) => true,
808        };
809        if !is_err {
810            // continue to finish the pending collected barrier.
811            while let Some(task) = self.checkpoint_control.next_complete_barrier_task(None) {
812                let epochs_to_ack = task.epochs_to_ack();
813                match task
814                    .complete_barrier(&*self.context, self.env.clone())
815                    .await
816                {
817                    Ok(hummock_version_stats) => {
818                        self.checkpoint_control
819                            .ack_completed(BarrierCompleteOutput {
820                                epochs_to_ack,
821                                hummock_version_stats,
822                            });
823                    }
824                    Err(e) => {
825                        error!(
826                            err = %e.as_report(),
827                            "failed to complete barrier during recovery"
828                        );
829                        break;
830                    }
831                }
832            }
833        }
834        self.checkpoint_control.clear_on_err(err);
835    }
836}
837
838impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
839    /// Set barrier manager status.
840    async fn failure_recovery(&mut self, err: MetaError) {
841        self.clear_on_err(&err).await;
842
843        if self.enable_recovery {
844            let span = tracing::info_span!(
845                "failure_recovery",
846                error = %err.as_report(),
847            );
848
849            crate::telemetry::report_event(
850                risingwave_pb::telemetry::TelemetryEventStage::Recovery,
851                "failure_recovery",
852                0,
853                None,
854                None,
855                None,
856            );
857
858            let reason = RecoveryReason::Failover(err);
859
860            // No need to clean dirty tables for barrier recovery,
861            // The foreground stream job should cleanup their own tables.
862            self.recovery(false, reason).instrument(span).await;
863        } else {
864            panic!(
865                "a streaming error occurred while recovery is disabled, aborting: {:?}",
866                err.as_report()
867            );
868        }
869    }
870
871    async fn adhoc_recovery(&mut self) {
872        let err = MetaErrorInner::AdhocRecovery.into();
873        self.clear_on_err(&err).await;
874
875        let span = tracing::info_span!(
876            "adhoc_recovery",
877            error = %err.as_report(),
878        );
879
880        crate::telemetry::report_event(
881            risingwave_pb::telemetry::TelemetryEventStage::Recovery,
882            "adhoc_recovery",
883            0,
884            None,
885            None,
886            None,
887        );
888
889        // No need to clean dirty tables for barrier recovery,
890        // The foreground stream job should cleanup their own tables.
891        self.recovery(false, RecoveryReason::Adhoc)
892            .instrument(span)
893            .await;
894    }
895}
896
897impl<C> GlobalBarrierWorker<C> {
898    /// Send barrier-complete-rpc and wait for responses from all CNs
899    pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
900        // Record failure in event log.
901        use risingwave_pb::meta::event_log;
902        let event = event_log::EventCollectBarrierFail {
903            error: error.to_report_string(),
904        };
905        env.event_log_manager_ref()
906            .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
907    }
908}
909
910mod retry_strategy {
911    use std::time::Duration;
912
913    use tokio_retry::strategy::{ExponentialBackoff, jitter};
914
915    // Retry base interval in milliseconds.
916    const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
917    // Retry max interval.
918    const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
919
920    // MrCroxx: Use concrete type here to prevent unsolved compiler issue.
921    // Feel free to replace the concrete type with TAIT after fixed.
922
923    // mod retry_backoff_future {
924    //     use std::future::Future;
925    //     use std::time::Duration;
926    //
927    //     use tokio::time::sleep;
928    //
929    //     pub(crate) type RetryBackoffFuture = impl Future<Output = ()> + Unpin + Send + 'static;
930    //
931    //     #[define_opaque(RetryBackoffFuture)]
932    //     pub(super) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
933    //         Box::pin(sleep(duration))
934    //     }
935    // }
936    // pub(crate) use retry_backoff_future::*;
937
938    pub(crate) type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
939
940    pub(crate) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
941        Box::pin(tokio::time::sleep(duration))
942    }
943
944    pub(crate) type RetryBackoffStrategy =
945        impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
946
947    /// Initialize a retry strategy for operation in recovery.
948    #[inline(always)]
949    pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> + Send + 'static {
950        ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
951            .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
952            .map(jitter)
953    }
954
955    #[define_opaque(RetryBackoffStrategy)]
956    pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
957        get_retry_strategy().map(get_retry_backoff_future)
958    }
959}
960
961pub(crate) use retry_strategy::*;
962use risingwave_common::error::tonic::extra::{Score, ScoredError};
963use risingwave_pb::meta::event_log::{Event, EventRecovery};
964
965use crate::barrier::partial_graph::{
966    PartialGraphEvent, PartialGraphManager, PartialGraphManagerEvent, WorkerEvent,
967};
968
969impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
970    /// Recovery the whole cluster from the latest epoch.
971    ///
972    /// If `paused_reason` is `Some`, all data sources (including connectors and DMLs) will be
973    /// immediately paused after recovery, until the user manually resume them either by restarting
974    /// the cluster or `risectl` command. Used for debugging purpose.
975    ///
976    /// Returns the new state of the barrier manager after recovery.
977    pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
978        // Clear all control streams to release resources (connections to compute nodes) first.
979        self.partial_graph_manager.clear_worker();
980
981        let reason_str = match &recovery_reason {
982            RecoveryReason::Bootstrap => "bootstrap".to_owned(),
983            RecoveryReason::Failover(err) => {
984                format!("failed over: {}", err.as_report())
985            }
986            RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
987        };
988        self.context.abort_and_mark_blocked(None, recovery_reason);
989
990        self.recovery_inner(is_paused, reason_str).await;
991        self.context.mark_ready(MarkReadyOptions::Global {
992            blocked_databases: self.checkpoint_control.recovering_databases().collect(),
993        });
994    }
995
996    #[await_tree::instrument("recovery({recovery_reason})")]
997    async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
998        let event_log_manager_ref = self.env.event_log_manager_ref();
999
1000        tracing::info!("recovery start!");
1001        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
1002            EventRecovery::global_recovery_start(recovery_reason.clone()),
1003        )]);
1004
1005        let retry_strategy = get_retry_strategy();
1006
1007        // We take retry into consideration because this is the latency user sees for a cluster to
1008        // get recovered.
1009        let recovery_timer = GLOBAL_META_METRICS
1010            .recovery_latency
1011            .with_label_values(&["global"])
1012            .start_timer();
1013
1014        let enable_per_database_isolation = self.enable_per_database_isolation();
1015
1016        let recovery_future = tokio_retry::Retry::spawn(retry_strategy, || async {
1017            self.env.stream_client_pool().invalidate_all();
1018            // We need to notify_creating_job_failed in every recovery retry, because in outer create_streaming_job handler,
1019            // it holds the reschedule_read_lock and wait for creating job to finish, and caused the following scale_actor fail
1020            // to acquire the reschedule_write_lock, and then keep recovering, and then deadlock.
1021            // TODO: refactor and fix this hacky implementation.
1022            self.context
1023                .notify_creating_job_failed(None, recovery_reason.clone())
1024                .await;
1025
1026            let runtime_info_snapshot = self
1027                .context
1028                .reload_runtime_info()
1029                .await?;
1030            let BarrierWorkerRuntimeInfoSnapshot {
1031                active_streaming_nodes,
1032                recovery_context,
1033                mut state_table_committed_epochs,
1034                mut state_table_log_epochs,
1035                mut mv_depended_subscriptions,
1036                mut background_jobs,
1037                hummock_version_stats,
1038                database_infos,
1039                mut cdc_table_snapshot_splits,
1040            } = runtime_info_snapshot;
1041
1042            let mut partial_graph_manager = PartialGraphManager::recover(
1043                    self.env.clone(),
1044                    active_streaming_nodes.current(),
1045                    self.context.clone(),
1046                )
1047                .await;
1048            {
1049                let mut empty_databases = HashSet::new();
1050                let mut collected_databases = HashMap::new();
1051                let mut collecting_databases = HashMap::new();
1052                let mut failed_databases = HashMap::new();
1053                for &database_id in recovery_context.fragment_context.database_map.keys() {
1054                    let mut recoverer = partial_graph_manager.start_recover();
1055                    let result: MetaResult<_> = try {
1056                        let Some(rendered_info) = render_runtime_info(
1057                            self.env.actor_id_generator(),
1058                            &active_streaming_nodes,
1059                            self.adaptive_parallelism_strategy,
1060                            &recovery_context,
1061                            database_id,
1062                        )
1063                            .inspect_err(|err: &MetaError| {
1064                                warn!(%database_id, err = %err.as_report(), "render runtime info failed");
1065                            })? else {
1066                            empty_databases.insert(database_id);
1067                            continue;
1068                        };
1069                        BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
1070                            database_id,
1071                            &rendered_info.job_infos,
1072                            &active_streaming_nodes,
1073                            &rendered_info.stream_actors,
1074                            &state_table_committed_epochs,
1075                        )
1076                        .inspect_err(|err| {
1077                            warn!(%database_id, err = %err.as_report(), "rendered runtime info failed validation");
1078                        })?;
1079                        let RenderedDatabaseRuntimeInfo {
1080                            job_infos,
1081                            stream_actors,
1082                            mut source_splits,
1083                        } = rendered_info;
1084                        recoverer.inject_database_initial_barrier(
1085                            database_id,
1086                            job_infos,
1087                            &recovery_context.job_extra_info,
1088                            &mut state_table_committed_epochs,
1089                            &mut state_table_log_epochs,
1090                            &recovery_context.fragment_relations,
1091                            &stream_actors,
1092                            &mut source_splits,
1093                            &mut background_jobs,
1094                            &mut mv_depended_subscriptions,
1095                            is_paused,
1096                            &hummock_version_stats,
1097                            &mut cdc_table_snapshot_splits,
1098                        )?
1099                    };
1100                    let collector = match result {
1101                        Ok(database) => {
1102                            DatabaseInitialBarrierCollector {
1103                                database_id,
1104                                initializing_partial_graphs: recoverer.all_initializing(),
1105                                database,
1106                            }
1107                        }
1108                        Err(e) => {
1109                            warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
1110                            assert!(failed_databases.insert(database_id, recoverer.failed()).is_none(), "non-duplicate");
1111                            continue;
1112                        }
1113                    };
1114                    if !collector.is_collected() {
1115                        assert!(collecting_databases.insert(database_id, collector).is_none());
1116                    } else {
1117                        warn!(%database_id, "database has no node to inject initial barrier");
1118                        assert!(collected_databases.insert(database_id, collector.finish()).is_none());
1119                    }
1120                }
1121                if !empty_databases.is_empty() {
1122                    info!(?empty_databases, "empty database in global recovery");
1123                }
1124                while !collecting_databases.is_empty() {
1125                    match partial_graph_manager.next_event(&self.context).await {
1126                        PartialGraphManagerEvent::Worker(_, WorkerEvent::WorkerConnected) => {
1127                            // not handle WorkerConnected yet
1128                        }
1129                        PartialGraphManagerEvent::Worker(worker_id, WorkerEvent::WorkerError { err, affected_partial_graphs }) => {
1130                            let affected_databases: HashSet<_> = affected_partial_graphs.into_iter().map(|partial_graph_id| {
1131                                let (database_id, _) = from_partial_graph_id(partial_graph_id);
1132                                database_id
1133                            }).collect();
1134                            warn!(%worker_id, err = %err.as_report(), "worker node failure during recovery");
1135                            for (failed_database_id, collector) in collecting_databases.extract_if(|database_id, collector| {
1136                                !collector.is_valid_after_worker_err(worker_id) || affected_databases.contains(database_id)
1137                            }) {
1138                                warn!(%failed_database_id, %worker_id, "database failed to recovery in global recovery due to worker node err");
1139                                let resetting_partial_graphs: HashSet<_> = collector.all_partial_graphs().collect();
1140                                partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1141                                assert!(failed_databases.insert(failed_database_id, resetting_partial_graphs).is_none());
1142                            }
1143                        }
1144                        PartialGraphManagerEvent::PartialGraph(partial_graph_id, event) => {
1145                            match event {
1146                                PartialGraphEvent::BarrierCollected(_) => {
1147                                    unreachable!("no barrier collected event on initializing")
1148                                }
1149                                PartialGraphEvent::Reset(_) => {
1150                                    unreachable!("no partial graph reset on initializing")
1151                                }
1152                                PartialGraphEvent::Error(worker_id) => {
1153                                    let (database_id, _) = from_partial_graph_id(partial_graph_id);
1154                                    if let Some(collector) = collecting_databases.remove(&database_id) {
1155                                        warn!(%database_id, %worker_id, "database reset during global recovery");
1156                                        let resetting_partial_graphs: HashSet<_> = collector.all_partial_graphs().collect();
1157                                        partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1158                                        assert!(failed_databases.insert(database_id, resetting_partial_graphs).is_none());
1159                                    } else if let Some(database) = collected_databases.remove(&database_id) {
1160                                        warn!(%database_id, %worker_id, "database initialized but later reset during global recovery");
1161                                        let resetting_partial_graphs: HashSet<_> = database_partial_graphs(database_id, database.creating_streaming_job_controls.keys().copied()).collect();
1162                                        partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1163                                        assert!(failed_databases.insert(database_id, resetting_partial_graphs).is_none());
1164                                    } else {
1165                                        assert!(failed_databases.contains_key(&database_id));
1166                                    }
1167                                }
1168                                PartialGraphEvent::Initialized => {
1169                                    let (database_id, _) = from_partial_graph_id(partial_graph_id);
1170                                    if failed_databases.contains_key(&database_id) {
1171                                        assert!(!collecting_databases.contains_key(&database_id));
1172                                        // ignore the lately initialized partial graph of failed database
1173                                        continue;
1174                                    }
1175                                    let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
1176                                        unreachable!("should exist")
1177                                    };
1178                                    let collector = entry.get_mut();
1179                                    collector.partial_graph_initialized(partial_graph_id);
1180                                    if collector.is_collected() {
1181                                        let collector = entry.remove();
1182                                        assert!(collected_databases.insert(database_id, collector.finish()).is_none());
1183                                    }
1184                                }
1185                            }
1186                        }
1187                    }
1188                }
1189                debug!("collected initial barrier");
1190                if !background_jobs.is_empty() {
1191                    warn!(job_ids = ?background_jobs.iter().collect_vec(), "unused recovered background mview in recovery");
1192                }
1193                if !mv_depended_subscriptions.is_empty() {
1194                    warn!(?mv_depended_subscriptions, "unused subscription infos in recovery");
1195                }
1196                if !state_table_committed_epochs.is_empty() {
1197                    warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
1198                }
1199                if !enable_per_database_isolation && !failed_databases.is_empty() {
1200                    return Err(anyhow!(
1201                        "global recovery failed due to failure of databases {:?}",
1202                        failed_databases.keys().collect_vec()).into()
1203                    );
1204                }
1205                let checkpoint_control = CheckpointControl::recover(
1206                    collected_databases,
1207                    failed_databases,
1208                    hummock_version_stats,
1209                    self.env.clone(),
1210                );
1211
1212                let reader = self.env.system_params_reader().await;
1213                let checkpoint_frequency = reader.checkpoint_frequency();
1214                let barrier_interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
1215                let periodic_barriers = PeriodicBarriers::new(
1216                    barrier_interval,
1217                    checkpoint_frequency,
1218                    database_infos,
1219                );
1220
1221                Ok((
1222                    active_streaming_nodes,
1223                    partial_graph_manager,
1224                    checkpoint_control,
1225                    periodic_barriers,
1226                ))
1227            }
1228        }.inspect_err(|err: &MetaError| {
1229            tracing::error!(error = %err.as_report(), "recovery failed");
1230            event_log_manager_ref.add_event_logs(vec![Event::Recovery(
1231                EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
1232            )]);
1233            GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
1234        }))
1235        .instrument(tracing::info_span!("recovery_attempt"));
1236
1237        let mut recover_txs = vec![];
1238        let mut update_barrier_requests = vec![];
1239        pin_mut!(recovery_future);
1240        let mut request_rx_closed = false;
1241        let new_state = loop {
1242            select! {
1243                biased;
1244                new_state = &mut recovery_future => {
1245                    break new_state.expect("Retry until recovery success.");
1246                }
1247                request = pin!(self.request_rx.recv()), if !request_rx_closed => {
1248                    let Some(request) = request else {
1249                        warn!("request rx channel closed during recovery");
1250                        request_rx_closed = true;
1251                        continue;
1252                    };
1253                    match request {
1254                        BarrierManagerRequest::GetBackfillProgress(tx) => {
1255                            let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1256                        }
1257                        BarrierManagerRequest::GetFragmentBackfillProgress(tx) => {
1258                            let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1259                        }
1260                        BarrierManagerRequest::GetCdcProgress(tx) => {
1261                            let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1262                        }
1263                        BarrierManagerRequest::AdhocRecovery(tx) => {
1264                            recover_txs.push(tx);
1265                        }
1266                        BarrierManagerRequest::UpdateDatabaseBarrier(request) => {
1267                            update_barrier_requests.push(request);
1268                        }
1269                        BarrierManagerRequest::MayHaveSnapshotBackfillingJob(tx) => {
1270                            // may recover snapshot backfill jobs
1271                            let _ = tx.send(true);
1272                        }
1273                    }
1274                }
1275            }
1276        };
1277
1278        let duration = recovery_timer.stop_and_record();
1279
1280        (
1281            self.active_streaming_nodes,
1282            self.partial_graph_manager,
1283            self.checkpoint_control,
1284            self.periodic_barriers,
1285        ) = new_state;
1286
1287        tracing::info!("recovery success");
1288
1289        for UpdateDatabaseBarrierRequest {
1290            database_id,
1291            barrier_interval_ms,
1292            checkpoint_frequency,
1293            sender,
1294        } in update_barrier_requests
1295        {
1296            self.periodic_barriers.update_database_barrier(
1297                database_id,
1298                barrier_interval_ms,
1299                checkpoint_frequency,
1300            );
1301            let _ = sender.send(());
1302        }
1303
1304        for tx in recover_txs {
1305            let _ = tx.send(());
1306        }
1307
1308        let recovering_databases = self
1309            .checkpoint_control
1310            .recovering_databases()
1311            .map(|database| database.as_raw_id())
1312            .collect_vec();
1313        let running_databases = self
1314            .checkpoint_control
1315            .running_databases()
1316            .map(|database| database.as_raw_id())
1317            .collect_vec();
1318
1319        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
1320            EventRecovery::global_recovery_success(
1321                recovery_reason.clone(),
1322                duration as f32,
1323                running_databases,
1324                recovering_databases,
1325            ),
1326        )]);
1327
1328        self.env
1329            .notification_manager()
1330            .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
1331        self.env
1332            .notification_manager()
1333            .notify_compute_without_version(Operation::Update, Info::Recovery(Recovery {}));
1334    }
1335}