Skip to main content

risingwave_meta/barrier/
worker.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::pin::pin;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use arc_swap::ArcSwap;
24use futures::{TryFutureExt, pin_mut};
25use itertools::Itertools;
26use risingwave_common::catalog::DatabaseId;
27use risingwave_common::id::JobId;
28use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
29use risingwave_common::system_param::reader::SystemParamsRead;
30use risingwave_meta_model::WorkerId;
31use risingwave_pb::common::WorkerNode;
32use risingwave_pb::meta::Recovery;
33use risingwave_pb::meta::subscribe_response::{Info, Operation};
34use thiserror_ext::AsReport;
35use tokio::select;
36use tokio::sync::mpsc;
37use tokio::sync::oneshot::{Receiver, Sender};
38use tokio::task::JoinHandle;
39use tonic::Status;
40use tracing::{Instrument, debug, error, info, warn};
41
42use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
43use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
44use crate::barrier::context::recovery::{RenderedDatabaseRuntimeInfo, render_runtime_info};
45use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
46use crate::barrier::info::InflightDatabaseInfo;
47use crate::barrier::rpc::{
48    DatabaseInitialBarrierCollector, database_partial_graphs, from_partial_graph_id,
49    merge_node_rpc_errors,
50};
51use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
52use crate::barrier::{
53    BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command,
54    RecoveryReason, RescheduleContext, UpdateDatabaseBarrierRequest, schedule,
55};
56use crate::controller::scale::{materialize_actor_assignments, preview_actor_assignments};
57use crate::error::MetaErrorInner;
58use crate::hummock::HummockManagerRef;
59use crate::manager::iceberg_v3_sink::IcebergV3SinkManager;
60use crate::manager::sink_coordination::SinkCoordinatorManager;
61use crate::manager::{
62    ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
63    MetadataManager,
64};
65use crate::rpc::metrics::GLOBAL_META_METRICS;
66use crate::stream::{
67    GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef, build_reschedule_commands,
68    rendered_layout_matches_current,
69};
70use crate::{MetaError, MetaResult};
71
72/// [`crate::barrier::worker::GlobalBarrierWorker`] sends barriers to all registered compute nodes and
73/// collect them, with monotonic increasing epoch numbers. On compute nodes, `LocalBarrierManager`
74/// in `risingwave_stream` crate will serve these requests and dispatch them to source actors.
75///
76/// Configuration change in our system is achieved by the mutation in the barrier. Thus,
77/// [`crate::barrier::worker::GlobalBarrierWorker`] provides a set of interfaces like a state machine,
78/// accepting [`crate::barrier::command::Command`] that carries info to build `Mutation`. To keep the consistency between
79/// barrier manager and meta store, some actions like "drop materialized view" or "create mv on mv"
80/// must be done in barrier manager transactional using [`crate::barrier::command::Command`].
81pub(super) struct GlobalBarrierWorker<C> {
82    /// Enable recovery or not when failover.
83    enable_recovery: bool,
84
85    /// The queue of scheduled barriers.
86    periodic_barriers: PeriodicBarriers,
87
88    /// Whether per database failure isolation is enabled in system parameters.
89    system_enable_per_database_isolation: bool,
90
91    pub(super) context: Arc<C>,
92
93    env: MetaSrvEnv,
94
95    checkpoint_control: CheckpointControl,
96
97    /// Command that has been collected but is still completing.
98    /// The join handle of the completing future is stored.
99    completing_task: CompletingTask,
100
101    request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
102
103    active_streaming_nodes: ActiveStreamingWorkerNodes,
104
105    partial_graph_manager: PartialGraphManager,
106}
107
108#[cfg(test)]
109mod tests {
110    use std::collections::HashMap;
111
112    use tokio::sync::oneshot;
113
114    use super::*;
115    use crate::barrier::RescheduleContext;
116    use crate::barrier::notifier::Notifier;
117
118    #[tokio::test]
119    async fn test_reschedule_intent_without_workers_notifies_start_failed() {
120        let env = MetaSrvEnv::for_test().await;
121        let database_id = DatabaseId::new(1);
122        let database_info =
123            InflightDatabaseInfo::empty(database_id, env.shared_actor_infos().clone());
124        let (started_tx, started_rx) = oneshot::channel();
125        let (_collected_tx, _collected_rx) = oneshot::channel();
126
127        let notifier = Notifier {
128            started: Some(started_tx),
129            collected: Some(_collected_tx),
130        };
131
132        let new_barrier = schedule::NewBarrier {
133            database_id,
134            command: Some((
135                Command::RescheduleIntent {
136                    context: RescheduleContext::empty(),
137                    reschedule_plan: None,
138                },
139                vec![notifier],
140            )),
141            span: tracing::Span::none(),
142            checkpoint: false,
143        };
144
145        let result =
146            resolve_reschedule_intent(env, HashMap::new(), Some(&database_info), new_barrier);
147
148        assert!(matches!(result, Ok(None)));
149        let started = started_rx.await.expect("started notifier dropped");
150        assert!(started.is_err());
151    }
152}
153
154impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
155    pub(super) async fn new_inner(
156        env: MetaSrvEnv,
157        request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
158        context: Arc<C>,
159    ) -> Self {
160        let enable_recovery = env.opts.enable_recovery;
161
162        let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized();
163
164        let partial_graph_manager = PartialGraphManager::uninitialized(env.clone());
165
166        let reader = env.system_params_reader().await;
167        let system_enable_per_database_isolation = reader.per_database_isolation();
168        // Load config will be performed in bootstrap phase.
169        let periodic_barriers = PeriodicBarriers::default();
170
171        let checkpoint_control = CheckpointControl::new(env.clone());
172        Self {
173            enable_recovery,
174            periodic_barriers,
175            system_enable_per_database_isolation,
176            context,
177            env,
178            checkpoint_control,
179            completing_task: CompletingTask::None,
180            request_rx,
181            active_streaming_nodes,
182            partial_graph_manager,
183        }
184    }
185}
186
187fn resolve_reschedule_intent(
188    env: MetaSrvEnv,
189    worker_nodes: HashMap<WorkerId, WorkerNode>,
190    database_info: Option<&InflightDatabaseInfo>,
191    mut new_barrier: schedule::NewBarrier,
192) -> MetaResult<Option<schedule::NewBarrier>> {
193    let Some((command, notifiers)) = new_barrier.command.take() else {
194        return Ok(Some(new_barrier));
195    };
196
197    match command {
198        Command::RescheduleIntent {
199            context,
200            reschedule_plan,
201        } => {
202            if let Some(reschedule_plan) = reschedule_plan {
203                new_barrier.command = Some((
204                    Command::RescheduleIntent {
205                        context,
206                        reschedule_plan: Some(reschedule_plan),
207                    },
208                    notifiers,
209                ));
210                return Ok(Some(new_barrier));
211            }
212            let span = tracing::info_span!(
213                "resolve_reschedule_intent",
214                database_id = %new_barrier.database_id
215            );
216            let reschedule_plan = {
217                let _guard = span.enter();
218                build_reschedule_from_context(
219                    &env,
220                    worker_nodes,
221                    new_barrier.database_id,
222                    context,
223                    database_info.ok_or_else(|| {
224                        anyhow!(
225                            "database {} not found when resolving reschedule intent",
226                            new_barrier.database_id
227                        )
228                    })?,
229                )
230            };
231            match reschedule_plan {
232                Ok(Some(reschedule_plan)) => {
233                    new_barrier.command = Some((
234                        Command::RescheduleIntent {
235                            context: RescheduleContext::empty(),
236                            reschedule_plan: Some(reschedule_plan),
237                        },
238                        notifiers,
239                    ));
240                    Ok(Some(new_barrier))
241                }
242                Ok(None) => {
243                    // No-op intent: notify to unblock callers even though no barrier is injected.
244                    for mut notifier in notifiers {
245                        notifier.notify_started();
246                        notifier.notify_collected();
247                    }
248                    Ok(None)
249                }
250                Err(err) => {
251                    for notifier in notifiers {
252                        notifier.notify_start_failed(err.clone());
253                    }
254                    Ok(None)
255                }
256            }
257        }
258        _ => {
259            new_barrier.command = Some((command, notifiers));
260            Ok(Some(new_barrier))
261        }
262    }
263}
264
265fn build_reschedule_from_context(
266    env: &MetaSrvEnv,
267    worker_nodes: HashMap<WorkerId, WorkerNode>,
268    database_id: DatabaseId,
269    context: RescheduleContext,
270    database_info: &InflightDatabaseInfo,
271) -> MetaResult<Option<crate::barrier::ReschedulePlan>> {
272    if worker_nodes.is_empty() {
273        return Err(anyhow!("no active streaming workers for reschedule").into());
274    }
275
276    if context.is_empty() {
277        return Ok(None);
278    }
279
280    // Barrier worker resolves this intent against a stable in-flight snapshot.
281    // Reuse the same fragment view for preview comparison and command building.
282    let all_prev_fragments = database_info
283        .fragment_infos()
284        .map(|fragment| (fragment.fragment_id, fragment))
285        .collect();
286
287    let previewed = preview_actor_assignments(&worker_nodes, &context.loaded)?;
288
289    if rendered_layout_matches_current(&previewed.fragments, &all_prev_fragments)? {
290        return Ok(None);
291    }
292
293    let actor_id_counter = env.actor_id_generator();
294    // Materialization only replaces preview actor ids with real ids. Worker
295    // placement, vnode ownership, and split assignment remain unchanged.
296    let rendered = materialize_actor_assignments(actor_id_counter, previewed);
297    let mut commands = build_reschedule_commands(rendered.fragments, context, all_prev_fragments)?;
298    Ok(commands.remove(&database_id))
299}
300
301impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
302    /// Create a new [`crate::barrier::worker::GlobalBarrierWorker`].
303    #[expect(clippy::too_many_arguments)]
304    pub async fn new(
305        scheduled_barriers: schedule::ScheduledBarriers,
306        env: MetaSrvEnv,
307        metadata_manager: MetadataManager,
308        hummock_manager: HummockManagerRef,
309        source_manager: SourceManagerRef,
310        sink_manager: SinkCoordinatorManager,
311        iceberg_v3_sink_manager: IcebergV3SinkManager,
312        scale_controller: ScaleControllerRef,
313        request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
314        barrier_scheduler: schedule::BarrierScheduler,
315        refresh_manager: GlobalRefreshManagerRef,
316    ) -> Self {
317        let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
318
319        let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
320            scheduled_barriers,
321            status,
322            metadata_manager,
323            hummock_manager,
324            source_manager,
325            scale_controller,
326            env.clone(),
327            barrier_scheduler,
328            refresh_manager,
329            sink_manager,
330            iceberg_v3_sink_manager,
331        ));
332
333        Self::new_inner(env, request_rx, context).await
334    }
335
336    pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
337        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
338        let fut = (self.env.await_tree_reg())
339            .register_derived_root("Global Barrier Worker")
340            .instrument(self.run(shutdown_rx));
341        let join_handle = tokio::spawn(fut);
342
343        (join_handle, shutdown_tx)
344    }
345
346    /// Check whether we should pause on bootstrap from the system parameter and reset it.
347    async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
348        let paused = self
349            .env
350            .system_params_reader()
351            .await
352            .pause_on_next_bootstrap()
353            || self.env.opts.pause_on_next_bootstrap_offline;
354
355        if paused {
356            warn!(
357                "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
358                 It will now be reset to `false`. \
359                 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
360                PAUSE_ON_NEXT_BOOTSTRAP_KEY
361            );
362            self.env
363                .system_params_manager_impl_ref()
364                .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
365                .await?;
366        }
367        Ok(paused)
368    }
369
370    /// Start an infinite loop to take scheduled barriers and send them.
371    async fn run(mut self, shutdown_rx: Receiver<()>) {
372        tracing::info!(
373            "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
374            self.enable_recovery,
375            self.checkpoint_control.in_flight_barrier_nums,
376        );
377
378        if !self.enable_recovery {
379            let job_exist = self
380                .context
381                .metadata_manager
382                .catalog_controller
383                .has_any_streaming_jobs()
384                .await
385                .unwrap();
386            if job_exist {
387                panic!(
388                    "Some streaming jobs already exist in meta, please start with recovery enabled \
389                or clean up the metadata using `./risedev clean-data`"
390                );
391            }
392        }
393
394        {
395            // Bootstrap recovery. Here we simply trigger a recovery process to achieve the
396            // consistency.
397            // Even if there's no actor to recover, we still go through the recovery process to
398            // inject the first `Initial` barrier.
399            let span = tracing::info_span!("bootstrap_recovery");
400            crate::telemetry::report_event(
401                risingwave_pb::telemetry::TelemetryEventStage::Recovery,
402                "normal_recovery",
403                0,
404                None,
405                None,
406                None,
407            );
408
409            let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
410
411            self.recovery(paused, RecoveryReason::Bootstrap)
412                .instrument(span)
413                .await;
414        }
415
416        Box::pin(self.run_inner(shutdown_rx)).await
417    }
418}
419
420impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
421    fn enable_per_database_isolation(&self) -> bool {
422        self.system_enable_per_database_isolation && {
423            if let Err(e) =
424                risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()
425            {
426                warn!(error = %e.as_report(), "DatabaseFailureIsolation disabled by license");
427                false
428            } else {
429                true
430            }
431        }
432    }
433
434    pub(super) async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
435        let (local_notification_tx, mut local_notification_rx) =
436            tokio::sync::mpsc::unbounded_channel();
437        self.env
438            .notification_manager()
439            .insert_local_sender(local_notification_tx);
440
441        // Start the event loop.
442        loop {
443            tokio::select! {
444                biased;
445
446                // Shutdown
447                _ = &mut shutdown_rx => {
448                    tracing::info!("Barrier manager is stopped");
449                    break;
450                }
451
452                request = self.request_rx.recv() => {
453                    if let Some(request) = request {
454                        match request {
455                            BarrierManagerRequest::GetBackfillProgress(result_tx) => {
456                                let progress = self.checkpoint_control.gen_backfill_progress();
457                                if result_tx.send(Ok(progress)).is_err() {
458                                    error!("failed to send get ddl progress");
459                                }
460                            }
461                            BarrierManagerRequest::GetFragmentBackfillProgress(result_tx) => {
462                                let progress =
463                                    self.checkpoint_control.gen_fragment_backfill_progress();
464                                if result_tx.send(Ok(progress)).is_err() {
465                                    error!("failed to send get fragment backfill progress");
466                                }
467                            }
468                            BarrierManagerRequest::GetCdcProgress(result_tx) => {
469                                let progress = self.checkpoint_control.gen_cdc_progress();
470                                if result_tx.send(Ok(progress)).is_err() {
471                                    error!("failed to send get ddl progress");
472                                }
473                            }
474                            // Handle adhoc recovery triggered by user.
475                            BarrierManagerRequest::AdhocRecovery(sender) => {
476                                self.adhoc_recovery().await;
477                                if sender.send(()).is_err() {
478                                    warn!("failed to notify finish of adhoc recovery");
479                                }
480                            }
481                            BarrierManagerRequest::UpdateDatabaseBarrier( UpdateDatabaseBarrierRequest {
482                                database_id,
483                                barrier_interval_ms,
484                                checkpoint_frequency,
485                                sender,
486                            }) => {
487                                self.periodic_barriers
488                                    .update_database_barrier(
489                                        database_id,
490                                        barrier_interval_ms,
491                                        checkpoint_frequency,
492                                    );
493                                if sender.send(()).is_err() {
494                                    warn!("failed to notify finish of update database barrier");
495                                }
496                            }
497                            BarrierManagerRequest::MayHaveSnapshotBackfillingJob(tx) => {
498                                if tx.send(self.checkpoint_control.may_have_snapshot_backfilling_jobs()).is_err() {
499                                    warn!("failed to may have snapshot backfill job");
500                                }
501                            }
502                        }
503                    } else {
504                        tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
505                        return;
506                    }
507                }
508
509                changed_worker = self.active_streaming_nodes.changed() => {
510                    #[cfg(debug_assertions)]
511                    {
512                        self.active_streaming_nodes.validate_change().await;
513                    }
514
515                    info!(?changed_worker, "worker changed");
516
517                    match changed_worker {
518                        ActiveStreamingWorkerChange::Add(node)
519                        | ActiveStreamingWorkerChange::Update(node) => {
520                            self.partial_graph_manager
521                                .add_worker(node, self.context.clone())
522                                .await;
523                        }
524                        ActiveStreamingWorkerChange::Remove(node) => {
525                            self.partial_graph_manager.remove_worker(node);
526                        }
527                    }
528                }
529
530                notification = local_notification_rx.recv() => {
531                    let notification = notification.unwrap();
532                    if let LocalNotification::SystemParamsChange(p) = notification {
533                        {
534                            self.periodic_barriers.set_sys_barrier_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
535                            self.periodic_barriers
536                                .set_sys_checkpoint_frequency(p.checkpoint_frequency());
537                            self.system_enable_per_database_isolation = p.per_database_isolation();
538                        }
539                    }
540                }
541                complete_result = self
542                    .completing_task
543                    .next_completed_barrier(
544                        &mut self.periodic_barriers,
545                        &mut self.checkpoint_control,
546                        &mut self.partial_graph_manager,
547                        &self.context,
548                        &self.env,
549                ) => {
550                    match complete_result {
551                        Ok(output) => {
552                            self.checkpoint_control.ack_completed(&mut self.partial_graph_manager, output);
553                        }
554                        Err(e) => {
555                            self.failure_recovery(e).await;
556                        }
557                    }
558                },
559                event = self.checkpoint_control.next_event() => {
560                    let result: MetaResult<()> = try {
561                        match event {
562                            CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
563                                let database_id = entering_initializing.database_id();
564                                let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
565                                    resp.root_err.as_ref().map(|root_err| {
566                                        (*worker_id, ScoredError {
567                                            error: Status::internal(&root_err.err_msg),
568                                            score: Score(root_err.score)
569                                        })
570                                    })
571                                }));
572                                Self::report_collect_failure(&self.env, &error);
573                                self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
574                                let result: MetaResult<_> = try {
575                                    let runtime_info = self.context.reload_database_runtime_info(database_id).await.inspect_err(|err| {
576                                        warn!(%database_id, err = %err.as_report(), "reload runtime info failed");
577                                    })?;
578                                    let rendered_info = render_runtime_info(
579                                        self.env.actor_id_generator(),
580                                        &self.active_streaming_nodes,
581                                        &runtime_info.recovery_context,
582                                        database_id,
583                                    )
584                                    .inspect_err(|err: &MetaError| {
585                                        warn!(%database_id, err = %err.as_report(), "render runtime info failed");
586                                    })?;
587                                    if let Some(rendered_info) = rendered_info {
588                                        BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
589                                            database_id,
590                                            &rendered_info.job_infos,
591                                            &self.active_streaming_nodes,
592                                            &rendered_info.stream_actors,
593                                            &runtime_info.state_table_committed_epochs,
594                                        )
595                                        .inspect_err(|err| {
596                                            warn!(%database_id, err = ?err.as_report(), "database runtime info failed validation");
597                                        })?;
598                                        Some((runtime_info, rendered_info))
599                                    } else {
600                                        None
601                                    }
602                                };
603                                match result {
604                                    Ok(Some((runtime_info, rendered_info))) => {
605                                        entering_initializing.enter(
606                                            runtime_info,
607                                            rendered_info,
608                                            &mut self.partial_graph_manager,
609                                        );
610                                    }
611                                    Ok(None) => {
612                                        info!(%database_id, "database removed after reloading empty runtime info");
613                                        // mark ready to unblock subsequent request
614                                        self.context.mark_ready(MarkReadyOptions::Database(database_id));
615                                        entering_initializing.remove();
616                                    }
617                                    Err(e) => {
618                                        entering_initializing.fail_reload_runtime_info(e);
619                                    }
620                                }
621                            }
622                            CheckpointControlEvent::EnteringRunning(entering_running) => {
623                                self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
624                                entering_running.enter();
625                            }
626                            CheckpointControlEvent::BatchRefreshTrigger { database_id, job_id } => {
627                                self.handle_batch_refresh_trigger(database_id, job_id).await?;
628                            }
629                        }
630                    };
631                    if let Err(e) = result {
632                        self.failure_recovery(e).await;
633                    }
634                }
635                event = self.partial_graph_manager.next_event(&self.context) => {
636                    let result: MetaResult<()> = try {
637                        match event {
638                            PartialGraphManagerEvent::Worker(_worker_id, WorkerEvent::WorkerConnected) => {
639                                // no handling on new worker connected event yet
640                            }
641                            PartialGraphManagerEvent::Worker(worker_id, WorkerEvent::WorkerError { err, affected_partial_graphs }) => {
642                                let failed_databases = self
643                                    .checkpoint_control
644                                    .databases_failed_at_worker_err(worker_id)
645                                    .chain(
646                                        affected_partial_graphs
647                                        .into_iter()
648                                        .map(|partial_graph_id| {
649                                            let (database_id, _) = from_partial_graph_id(partial_graph_id);
650                                            database_id
651                                        })
652                                    )
653                                    .collect::<HashSet<_>>();
654                                if !failed_databases.is_empty() {
655                                    if !self.enable_recovery {
656                                        panic!("control stream to worker {} failed but recovery not enabled: {}", worker_id, err.as_report());
657                                    }
658                                    if !self.enable_per_database_isolation() {
659                                        Err(err.clone())?;
660                                    }
661                                    Self::report_collect_failure(&self.env, &err);
662                                    for database_id in failed_databases {
663                                        if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
664                                            warn!(%worker_id, %database_id, "database entering recovery on node failure");
665                                            self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
666                                            self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
667                                            // TODO: add log on blocking time
668                                            let output = self.completing_task.wait_completing_task().await?;
669                                            entering_recovery.enter(output, &mut self.partial_graph_manager);
670                                        }
671                                    }
672                                }  else {
673                                    warn!(%worker_id, "no barrier to collect from worker, ignore err");
674                                }
675                                continue;
676                            }
677                            PartialGraphManagerEvent::PartialGraph(partial_graph_id, event) => {
678                                let (database_id, _creating_job_id) = from_partial_graph_id(partial_graph_id);
679                                match event {
680                                    PartialGraphEvent::BarrierCollected(collected_barrier) => {
681                                        self.checkpoint_control.barrier_collected(partial_graph_id, collected_barrier, &mut self.periodic_barriers)?;
682                                    }
683                                    PartialGraphEvent::Error(worker_id) => {
684                                        if !self.enable_recovery {
685                                            panic!("database {database_id} failure reported from {worker_id} but recovery not enabled")
686                                        }
687                                        if !self.enable_per_database_isolation() {
688                                                Err(MetaError::from(anyhow!("database {database_id} report failure from {worker_id}")))?;
689                                            }
690                                        if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
691                                            warn!(%database_id, "database entering recovery");
692                                            self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
693                                            // TODO: add log on blocking time
694                                            let output = self.completing_task.wait_completing_task().await?;
695                                            entering_recovery.enter(output, &mut self.partial_graph_manager);
696                                        }
697                                    }
698                                    PartialGraphEvent::Reset(reset_resps) => {
699                                        self.checkpoint_control.on_partial_graph_reset(partial_graph_id, reset_resps);
700                                    }
701                                    PartialGraphEvent::Initialized => {
702                                        self.checkpoint_control.on_partial_graph_initialized(
703                                            partial_graph_id,
704                                            &mut self.partial_graph_manager,
705                                        )?;
706                                    }
707                                }
708                            }
709                        };
710                    };
711                    if let Err(e) = result {
712                        self.failure_recovery(e).await;
713                    }
714                }
715                new_barrier = self.periodic_barriers.next_barrier(&*self.context) => {
716                    let database_id = new_barrier.database_id;
717                    let new_barrier = if matches!(
718                        new_barrier.command,
719                        Some((Command::RescheduleIntent { .. }, _))
720                    ) {
721                        let env = self.env.clone();
722                        let worker_nodes = self
723                            .active_streaming_nodes
724                            .current()
725                            .iter()
726                            .map(|(worker_id, worker)| (*worker_id, worker.clone()))
727                            .collect();
728                        let database_info = self.checkpoint_control.database_info(database_id);
729                        match resolve_reschedule_intent(
730                            env,
731                            worker_nodes,
732                            database_info,
733                            new_barrier,
734                        ) {
735                            Ok(Some(new_barrier)) => new_barrier,
736                            Ok(None) => continue,
737                            Err(err) => {
738                                self.failure_recovery(err).await;
739                                continue;
740                            }
741                        }
742                    } else {
743                        new_barrier
744                    };
745                    if let Err(e) = self.checkpoint_control.handle_new_barrier(
746                        new_barrier,
747                        &mut self.partial_graph_manager,
748                        self.active_streaming_nodes.current()
749                    ) {
750                        if !self.enable_recovery {
751                            panic!(
752                                "failed to inject barrier to some databases but recovery not enabled: {:?}", (
753                                    database_id,
754                                    e.as_report()
755                                )
756                            );
757                        }
758                        let result: MetaResult<_> = try {
759                            if !self.enable_per_database_isolation() {
760                                let err = anyhow!("failed to inject barrier to databases: {:?}", (database_id, e.as_report()));
761                                Err(MetaError::from(err))?;
762                            } else if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
763                                warn!(%database_id, e = %e.as_report(),"database entering recovery on inject failure");
764                                self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(e).context("inject barrier failure").into()));
765                                // TODO: add log on blocking time
766                                let output = self.completing_task.wait_completing_task().await?;
767                                entering_recovery.enter(output, &mut self.partial_graph_manager);
768                            }
769                        };
770                        if let Err(e) = result {
771                            self.failure_recovery(e).await;
772                        }
773                    }
774                }
775            }
776        }
777    }
778}
779
780impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
781    /// We need to make sure there are no changes when doing recovery
782    pub async fn clear_on_err(&mut self, err: &MetaError) {
783        // join spawned completing command to finish no matter it succeeds or not.
784        match replace(&mut self.completing_task, CompletingTask::None) {
785            CompletingTask::None | CompletingTask::Err(_) => {}
786            CompletingTask::Completing {
787                epochs_to_ack,
788                join_handle,
789                ..
790            } => {
791                info!("waiting for completing command to finish in recovery");
792                match join_handle.await {
793                    Err(e) => {
794                        warn!(err = %e.as_report(), "failed to join completing task");
795                    }
796                    Ok(Err(e)) => {
797                        warn!(
798                            err = %e.as_report(),
799                            "failed to complete barrier during clear"
800                        );
801                    }
802                    Ok(Ok(hummock_version_stats)) => {
803                        self.checkpoint_control.ack_completed(
804                            &mut self.partial_graph_manager,
805                            BarrierCompleteOutput {
806                                epochs_to_ack,
807                                hummock_version_stats,
808                            },
809                        );
810                    }
811                }
812            }
813        };
814        self.partial_graph_manager.notify_all_err(err);
815    }
816}
817
818impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
819    /// Handle a batch refresh trigger: load metadata + log epochs, then start a logstore
820    /// consumption run for the given batch refresh job.
821    async fn handle_batch_refresh_trigger(
822        &mut self,
823        database_id: DatabaseId,
824        job_id: JobId,
825    ) -> MetaResult<()> {
826        // 1. Get the last committed epoch for this job (read-only).
827        let last_committed_epoch = self
828            .checkpoint_control
829            .get_batch_refresh_trigger_info(database_id, job_id);
830
831        // 2. Load context metadata + resolve log epochs asynchronously.
832        let context = self
833            .context
834            .load_batch_refresh_trigger_context(job_id, database_id, last_committed_epoch)
835            .await?;
836
837        // 3. Start the refresh run.
838        let started = self.checkpoint_control.start_batch_refresh_run(
839            database_id,
840            job_id,
841            &context,
842            self.active_streaming_nodes.current(),
843            self.env.actor_id_generator(),
844            &mut self.partial_graph_manager,
845        )?;
846
847        // 5. Update shared_actor_infos with the new fragment infos.
848        if started {
849            self.checkpoint_control
850                .apply_batch_refresh_fragment_infos(database_id, job_id);
851        }
852
853        Ok(())
854    }
855}
856
857impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
858    /// Set barrier manager status.
859    async fn failure_recovery(&mut self, err: MetaError) {
860        self.clear_on_err(&err).await;
861
862        if self.enable_recovery {
863            let span = tracing::info_span!(
864                "failure_recovery",
865                error = %err.as_report(),
866            );
867
868            crate::telemetry::report_event(
869                risingwave_pb::telemetry::TelemetryEventStage::Recovery,
870                "failure_recovery",
871                0,
872                None,
873                None,
874                None,
875            );
876
877            let reason = RecoveryReason::Failover(err);
878
879            // No need to clean dirty tables for barrier recovery,
880            // The foreground stream job should cleanup their own tables.
881            self.recovery(false, reason).instrument(span).await;
882        } else {
883            panic!(
884                "a streaming error occurred while recovery is disabled, aborting: {:?}",
885                err.as_report()
886            );
887        }
888    }
889
890    async fn adhoc_recovery(&mut self) {
891        let err = MetaErrorInner::AdhocRecovery.into();
892        self.clear_on_err(&err).await;
893
894        let span = tracing::info_span!(
895            "adhoc_recovery",
896            error = %err.as_report(),
897        );
898
899        crate::telemetry::report_event(
900            risingwave_pb::telemetry::TelemetryEventStage::Recovery,
901            "adhoc_recovery",
902            0,
903            None,
904            None,
905            None,
906        );
907
908        // No need to clean dirty tables for barrier recovery,
909        // The foreground stream job should cleanup their own tables.
910        self.recovery(false, RecoveryReason::Adhoc)
911            .instrument(span)
912            .await;
913    }
914}
915
916impl<C> GlobalBarrierWorker<C> {
917    /// Send barrier-complete-rpc and wait for responses from all CNs
918    pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
919        // Record failure in event log.
920        use risingwave_pb::meta::event_log;
921        let event = event_log::EventCollectBarrierFail {
922            error: error.to_report_string(),
923        };
924        env.event_log_manager_ref()
925            .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
926    }
927}
928
929mod retry_strategy {
930    use std::time::Duration;
931
932    use tokio_retry::strategy::{ExponentialBackoff, jitter};
933
934    // Retry base interval in milliseconds.
935    const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
936    // Retry max interval.
937    const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
938
939    // MrCroxx: Use concrete type here to prevent unsolved compiler issue.
940    // Feel free to replace the concrete type with TAIT after fixed.
941
942    // mod retry_backoff_future {
943    //     use std::future::Future;
944    //     use std::time::Duration;
945    //
946    //     use tokio::time::sleep;
947    //
948    //     pub(crate) type RetryBackoffFuture = impl Future<Output = ()> + Unpin + Send + 'static;
949    //
950    //     #[define_opaque(RetryBackoffFuture)]
951    //     pub(super) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
952    //         Box::pin(sleep(duration))
953    //     }
954    // }
955    // pub(crate) use retry_backoff_future::*;
956
957    pub(crate) type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
958
959    pub(crate) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
960        Box::pin(tokio::time::sleep(duration))
961    }
962
963    pub(crate) type RetryBackoffStrategy =
964        impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
965
966    /// Initialize a retry strategy for operation in recovery.
967    #[inline(always)]
968    pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> + Send + 'static {
969        ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
970            .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
971            .map(jitter)
972    }
973
974    #[define_opaque(RetryBackoffStrategy)]
975    pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
976        get_retry_strategy().map(get_retry_backoff_future)
977    }
978}
979
980pub(crate) use retry_strategy::*;
981use risingwave_common::error::tonic::extra::{Score, ScoredError};
982use risingwave_pb::meta::event_log::{Event, EventRecovery};
983
984use crate::barrier::partial_graph::{
985    PartialGraphEvent, PartialGraphManager, PartialGraphManagerEvent, WorkerEvent,
986};
987
988impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
989    /// Recovery the whole cluster from the latest epoch.
990    ///
991    /// If `paused_reason` is `Some`, all data sources (including connectors and DMLs) will be
992    /// immediately paused after recovery, until the user manually resume them either by restarting
993    /// the cluster or `risectl` command. Used for debugging purpose.
994    ///
995    /// Returns the new state of the barrier manager after recovery.
996    pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
997        // Clear all control streams to release resources (connections to compute nodes) first.
998        self.partial_graph_manager.clear_worker();
999
1000        let reason_str = match &recovery_reason {
1001            RecoveryReason::Bootstrap => "bootstrap".to_owned(),
1002            RecoveryReason::Failover(err) => {
1003                format!("failed over: {}", err.as_report())
1004            }
1005            RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
1006        };
1007        self.context.abort_and_mark_blocked(None, recovery_reason);
1008
1009        self.recovery_inner(is_paused, reason_str).await;
1010        self.context.mark_ready(MarkReadyOptions::Global {
1011            blocked_databases: self.checkpoint_control.recovering_databases().collect(),
1012        });
1013    }
1014
1015    #[await_tree::instrument("recovery({recovery_reason})")]
1016    async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
1017        let event_log_manager_ref = self.env.event_log_manager_ref();
1018
1019        tracing::info!("recovery start!");
1020        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
1021            EventRecovery::global_recovery_start(recovery_reason.clone()),
1022        )]);
1023
1024        let retry_strategy = get_retry_strategy();
1025
1026        // We take retry into consideration because this is the latency user sees for a cluster to
1027        // get recovered.
1028        let recovery_timer = GLOBAL_META_METRICS
1029            .recovery_latency
1030            .with_label_values(&["global"])
1031            .start_timer();
1032
1033        let enable_per_database_isolation = self.enable_per_database_isolation();
1034
1035        let recovery_future = tokio_retry::Retry::spawn(retry_strategy, || async {
1036            self.env.stream_client_pool().invalidate_all();
1037            // We need to notify_creating_job_failed in every recovery retry, because in outer create_streaming_job handler,
1038            // it holds the reschedule_read_lock and wait for creating job to finish, and caused the following scale_actor fail
1039            // to acquire the reschedule_write_lock, and then keep recovering, and then deadlock.
1040            // TODO: refactor and fix this hacky implementation.
1041            self.context
1042                .notify_creating_job_failed(None, recovery_reason.clone())
1043                .await;
1044
1045            let runtime_info_snapshot = self
1046                .context
1047                .reload_runtime_info()
1048                .await?;
1049            let BarrierWorkerRuntimeInfoSnapshot {
1050                active_streaming_nodes,
1051                recovery_context,
1052                mut state_table_committed_epochs,
1053                mut state_table_log_epochs,
1054                mut mv_depended_subscriptions,
1055                mut background_jobs,
1056                hummock_version_stats,
1057                database_infos,
1058                mut cdc_table_snapshot_splits,
1059            } = runtime_info_snapshot;
1060
1061            let mut partial_graph_manager = PartialGraphManager::recover(
1062                    self.env.clone(),
1063                    active_streaming_nodes.current(),
1064                    self.context.clone(),
1065                )
1066                .await;
1067            {
1068                let mut empty_databases = HashSet::new();
1069                let mut collected_databases = HashMap::new();
1070                let mut collecting_databases = HashMap::new();
1071                let mut failed_databases = HashMap::new();
1072                for &database_id in recovery_context.fragment_context.database_map.keys() {
1073                    let mut recoverer = partial_graph_manager.start_recover();
1074                    let result: MetaResult<_> = try {
1075                        let Some(rendered_info) = render_runtime_info(
1076                            self.env.actor_id_generator(),
1077                            &active_streaming_nodes,
1078                            &recovery_context,
1079                            database_id,
1080                        )
1081                            .inspect_err(|err: &MetaError| {
1082                                warn!(%database_id, err = %err.as_report(), "render runtime info failed");
1083                            })? else {
1084                            empty_databases.insert(database_id);
1085                            continue;
1086                        };
1087                        BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
1088                            database_id,
1089                            &rendered_info.job_infos,
1090                            &active_streaming_nodes,
1091                            &rendered_info.stream_actors,
1092                            &state_table_committed_epochs,
1093                        )
1094                        .inspect_err(|err| {
1095                            warn!(%database_id, err = %err.as_report(), "rendered runtime info failed validation");
1096                        })?;
1097                        let RenderedDatabaseRuntimeInfo {
1098                            job_infos,
1099                            stream_actors,
1100                            mut source_splits,
1101                            batch_refresh,
1102                        } = rendered_info;
1103                        recoverer.inject_database_initial_barrier(
1104                            database_id,
1105                            job_infos,
1106                            &recovery_context.job_extra_info,
1107                            &mut state_table_committed_epochs,
1108                            &mut state_table_log_epochs,
1109                            &recovery_context.fragment_relations,
1110                            &stream_actors,
1111                            &mut source_splits,
1112                            &mut background_jobs,
1113                            &mut mv_depended_subscriptions,
1114                            is_paused,
1115                            &hummock_version_stats,
1116                            &mut cdc_table_snapshot_splits,
1117                            batch_refresh,
1118                        )?
1119                    };
1120                    let collector = match result {
1121                        Ok(database) => {
1122                            DatabaseInitialBarrierCollector {
1123                                database_id,
1124                                initializing_partial_graphs: recoverer.all_initializing(),
1125                                database,
1126                            }
1127                        }
1128                        Err(e) => {
1129                            warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
1130                            assert!(failed_databases.insert(database_id, recoverer.failed()).is_none(), "non-duplicate");
1131                            continue;
1132                        }
1133                    };
1134                    if !collector.is_collected() {
1135                        assert!(collecting_databases.insert(database_id, collector).is_none());
1136                    } else {
1137                        warn!(%database_id, "database has no node to inject initial barrier");
1138                        assert!(collected_databases.insert(database_id, collector.finish()).is_none());
1139                    }
1140                }
1141                if !empty_databases.is_empty() {
1142                    info!(?empty_databases, "empty database in global recovery");
1143                }
1144                while !collecting_databases.is_empty() {
1145                    match partial_graph_manager.next_event(&self.context).await {
1146                        PartialGraphManagerEvent::Worker(_, WorkerEvent::WorkerConnected) => {
1147                            // not handle WorkerConnected yet
1148                        }
1149                        PartialGraphManagerEvent::Worker(worker_id, WorkerEvent::WorkerError { err, affected_partial_graphs }) => {
1150                            let affected_databases: HashSet<_> = affected_partial_graphs.into_iter().map(|partial_graph_id| {
1151                                let (database_id, _) = from_partial_graph_id(partial_graph_id);
1152                                database_id
1153                            }).collect();
1154                            warn!(%worker_id, err = %err.as_report(), "worker node failure during recovery");
1155                            for (failed_database_id, collector) in collecting_databases.extract_if(|database_id, collector| {
1156                                !collector.is_valid_after_worker_err(worker_id) || affected_databases.contains(database_id)
1157                            }) {
1158                                warn!(%failed_database_id, %worker_id, "database failed to recovery in global recovery due to worker node err");
1159                                let resetting_partial_graphs: HashSet<_> = collector.all_partial_graphs().collect();
1160                                partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1161                                assert!(failed_databases.insert(failed_database_id, resetting_partial_graphs).is_none());
1162                            }
1163                        }
1164                        PartialGraphManagerEvent::PartialGraph(partial_graph_id, event) => {
1165                            match event {
1166                                PartialGraphEvent::BarrierCollected(_) => {
1167                                    unreachable!("no barrier collected event on initializing")
1168                                }
1169                                PartialGraphEvent::Reset(_) => {
1170                                    unreachable!("no partial graph reset on initializing")
1171                                }
1172                                PartialGraphEvent::Error(worker_id) => {
1173                                    let (database_id, _) = from_partial_graph_id(partial_graph_id);
1174                                    if let Some(collector) = collecting_databases.remove(&database_id) {
1175                                        warn!(%database_id, %worker_id, "database reset during global recovery");
1176                                        let resetting_partial_graphs: HashSet<_> = collector.all_partial_graphs().collect();
1177                                        partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1178                                        assert!(failed_databases.insert(database_id, resetting_partial_graphs).is_none());
1179                                    } else if let Some(database) = collected_databases.remove(&database_id) {
1180                                        warn!(%database_id, %worker_id, "database initialized but later reset during global recovery");
1181                                        let resetting_partial_graphs: HashSet<_> = database_partial_graphs(database_id, database.independent_checkpoint_job_controls.keys().copied()).collect();
1182                                        partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1183                                        assert!(failed_databases.insert(database_id, resetting_partial_graphs).is_none());
1184                                    } else {
1185                                        assert!(failed_databases.contains_key(&database_id));
1186                                    }
1187                                }
1188                                PartialGraphEvent::Initialized => {
1189                                    let (database_id, _) = from_partial_graph_id(partial_graph_id);
1190                                    if failed_databases.contains_key(&database_id) {
1191                                        assert!(!collecting_databases.contains_key(&database_id));
1192                                        // ignore the lately initialized partial graph of failed database
1193                                        continue;
1194                                    }
1195                                    let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
1196                                        unreachable!("should exist")
1197                                    };
1198                                    let collector = entry.get_mut();
1199                                    collector.partial_graph_initialized(partial_graph_id);
1200                                    if collector.is_collected() {
1201                                        let collector = entry.remove();
1202                                        assert!(collected_databases.insert(database_id, collector.finish()).is_none());
1203                                    }
1204                                }
1205                            }
1206                        }
1207                    }
1208                }
1209                debug!("collected initial barrier");
1210                if !background_jobs.is_empty() {
1211                    warn!(job_ids = ?background_jobs.iter().collect_vec(), "unused recovered background mview in recovery");
1212                }
1213                if !mv_depended_subscriptions.is_empty() {
1214                    warn!(?mv_depended_subscriptions, "unused subscription infos in recovery");
1215                }
1216                if !state_table_committed_epochs.is_empty() {
1217                    warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
1218                }
1219                if !enable_per_database_isolation && !failed_databases.is_empty() {
1220                    return Err(anyhow!(
1221                        "global recovery failed due to failure of databases {:?}",
1222                        failed_databases.keys().collect_vec()).into()
1223                    );
1224                }
1225                let checkpoint_control = CheckpointControl::recover(
1226                    collected_databases,
1227                    failed_databases,
1228                    hummock_version_stats,
1229                    self.env.clone(),
1230                );
1231
1232                let reader = self.env.system_params_reader().await;
1233                let checkpoint_frequency = reader.checkpoint_frequency();
1234                let barrier_interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
1235                let periodic_barriers = PeriodicBarriers::new(
1236                    barrier_interval,
1237                    checkpoint_frequency,
1238                    database_infos,
1239                );
1240
1241                Ok((
1242                    active_streaming_nodes,
1243                    partial_graph_manager,
1244                    checkpoint_control,
1245                    periodic_barriers,
1246                ))
1247            }
1248        }.inspect_err(|err: &MetaError| {
1249            tracing::error!(error = %err.as_report(), "recovery failed");
1250            event_log_manager_ref.add_event_logs(vec![Event::Recovery(
1251                EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
1252            )]);
1253            GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
1254        }))
1255        .instrument(tracing::info_span!("recovery_attempt"));
1256
1257        let mut recover_txs = vec![];
1258        let mut update_barrier_requests = vec![];
1259        pin_mut!(recovery_future);
1260        let mut request_rx_closed = false;
1261        let new_state = loop {
1262            select! {
1263                biased;
1264                new_state = &mut recovery_future => {
1265                    break new_state.expect("Retry until recovery success.");
1266                }
1267                request = pin!(self.request_rx.recv()), if !request_rx_closed => {
1268                    let Some(request) = request else {
1269                        warn!("request rx channel closed during recovery");
1270                        request_rx_closed = true;
1271                        continue;
1272                    };
1273                    match request {
1274                        BarrierManagerRequest::GetBackfillProgress(tx) => {
1275                            let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1276                        }
1277                        BarrierManagerRequest::GetFragmentBackfillProgress(tx) => {
1278                            let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1279                        }
1280                        BarrierManagerRequest::GetCdcProgress(tx) => {
1281                            let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1282                        }
1283                        BarrierManagerRequest::AdhocRecovery(tx) => {
1284                            recover_txs.push(tx);
1285                        }
1286                        BarrierManagerRequest::UpdateDatabaseBarrier(request) => {
1287                            update_barrier_requests.push(request);
1288                        }
1289                        BarrierManagerRequest::MayHaveSnapshotBackfillingJob(tx) => {
1290                            // may recover snapshot backfill jobs
1291                            let _ = tx.send(true);
1292                        }
1293                    }
1294                }
1295            }
1296        };
1297
1298        let duration = recovery_timer.stop_and_record();
1299
1300        (
1301            self.active_streaming_nodes,
1302            self.partial_graph_manager,
1303            self.checkpoint_control,
1304            self.periodic_barriers,
1305        ) = new_state;
1306
1307        tracing::info!("recovery success");
1308
1309        for UpdateDatabaseBarrierRequest {
1310            database_id,
1311            barrier_interval_ms,
1312            checkpoint_frequency,
1313            sender,
1314        } in update_barrier_requests
1315        {
1316            self.periodic_barriers.update_database_barrier(
1317                database_id,
1318                barrier_interval_ms,
1319                checkpoint_frequency,
1320            );
1321            let _ = sender.send(());
1322        }
1323
1324        for tx in recover_txs {
1325            let _ = tx.send(());
1326        }
1327
1328        let recovering_databases = self
1329            .checkpoint_control
1330            .recovering_databases()
1331            .map(|database| database.as_raw_id())
1332            .collect_vec();
1333        let running_databases = self
1334            .checkpoint_control
1335            .running_databases()
1336            .map(|database| database.as_raw_id())
1337            .collect_vec();
1338
1339        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
1340            EventRecovery::global_recovery_success(
1341                recovery_reason.clone(),
1342                duration as f32,
1343                running_databases,
1344                recovering_databases,
1345            ),
1346        )]);
1347
1348        self.env
1349            .notification_manager()
1350            .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
1351        self.env
1352            .notification_manager()
1353            .notify_compute_without_version(Operation::Update, Info::Recovery(Recovery {}));
1354    }
1355}