risingwave_meta/barrier/
worker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::anyhow;
22use arc_swap::ArcSwap;
23use futures::TryFutureExt;
24use itertools::Itertools;
25use risingwave_common::catalog::DatabaseId;
26use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_pb::meta::Recovery;
29use risingwave_pb::meta::subscribe_response::{Info, Operation};
30use risingwave_pb::stream_service::streaming_control_stream_response::Response;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tokio::sync::oneshot::{Receiver, Sender};
34use tokio::task::JoinHandle;
35use tokio::time::Instant;
36use tonic::Status;
37use tracing::{Instrument, debug, error, info, warn};
38use uuid::Uuid;
39
40use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
41use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
42use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
43use crate::barrier::rpc::{ControlStreamManager, merge_node_rpc_errors};
44use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
45use crate::barrier::{
46    BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, RecoveryReason,
47    schedule,
48};
49use crate::error::MetaErrorInner;
50use crate::hummock::HummockManagerRef;
51use crate::manager::sink_coordination::SinkCoordinatorManager;
52use crate::manager::{
53    ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
54    MetadataManager,
55};
56use crate::rpc::metrics::GLOBAL_META_METRICS;
57use crate::stream::{ScaleControllerRef, SourceManagerRef};
58use crate::{MetaError, MetaResult};
59
60/// [`crate::barrier::worker::GlobalBarrierWorker`] sends barriers to all registered compute nodes and
61/// collect them, with monotonic increasing epoch numbers. On compute nodes, `LocalBarrierManager`
62/// in `risingwave_stream` crate will serve these requests and dispatch them to source actors.
63///
64/// Configuration change in our system is achieved by the mutation in the barrier. Thus,
65/// [`crate::barrier::worker::GlobalBarrierWorker`] provides a set of interfaces like a state machine,
66/// accepting [`crate::barrier::command::Command`] that carries info to build `Mutation`. To keep the consistency between
67/// barrier manager and meta store, some actions like "drop materialized view" or "create mv on mv"
68/// must be done in barrier manager transactional using [`crate::barrier::command::Command`].
69pub(super) struct GlobalBarrierWorker<C> {
70    /// Enable recovery or not when failover.
71    enable_recovery: bool,
72
73    /// The queue of scheduled barriers.
74    periodic_barriers: PeriodicBarriers,
75
76    /// The max barrier nums in flight
77    in_flight_barrier_nums: usize,
78
79    enable_per_database_isolation: bool,
80
81    pub(super) context: Arc<C>,
82
83    env: MetaSrvEnv,
84
85    checkpoint_control: CheckpointControl,
86
87    /// Command that has been collected but is still completing.
88    /// The join handle of the completing future is stored.
89    completing_task: CompletingTask,
90
91    request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
92
93    active_streaming_nodes: ActiveStreamingWorkerNodes,
94
95    sink_manager: SinkCoordinatorManager,
96
97    control_stream_manager: ControlStreamManager,
98
99    term_id: String,
100}
101
102impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
103    /// Create a new [`crate::barrier::worker::GlobalBarrierWorker`].
104    pub async fn new(
105        scheduled_barriers: schedule::ScheduledBarriers,
106        env: MetaSrvEnv,
107        metadata_manager: MetadataManager,
108        hummock_manager: HummockManagerRef,
109        source_manager: SourceManagerRef,
110        sink_manager: SinkCoordinatorManager,
111        scale_controller: ScaleControllerRef,
112        request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
113    ) -> Self {
114        let enable_recovery = env.opts.enable_recovery;
115        let in_flight_barrier_nums = env.opts.in_flight_barrier_nums;
116
117        let active_streaming_nodes =
118            ActiveStreamingWorkerNodes::uninitialized(metadata_manager.clone());
119
120        let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
121
122        let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
123            scheduled_barriers,
124            status,
125            metadata_manager,
126            hummock_manager,
127            source_manager,
128            scale_controller,
129            env.clone(),
130        ));
131
132        let control_stream_manager = ControlStreamManager::new(env.clone());
133
134        let reader = env.system_params_reader().await;
135        let checkpoint_frequency = reader.checkpoint_frequency() as _;
136        let enable_per_database_isolation = reader.per_database_isolation();
137        let interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
138        let periodic_barriers = PeriodicBarriers::new(interval, checkpoint_frequency);
139        tracing::info!(
140            "Starting barrier scheduler with: checkpoint_frequency={:?}",
141            checkpoint_frequency,
142        );
143
144        let checkpoint_control = CheckpointControl::new(env.clone());
145        Self {
146            enable_recovery,
147            periodic_barriers,
148            in_flight_barrier_nums,
149            enable_per_database_isolation,
150            context,
151            env,
152            checkpoint_control,
153            completing_task: CompletingTask::None,
154            request_rx,
155            active_streaming_nodes,
156            sink_manager,
157            control_stream_manager,
158            term_id: "uninitialized".into(),
159        }
160    }
161
162    pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
163        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
164        let join_handle = tokio::spawn(async move {
165            self.run(shutdown_rx).await;
166        });
167
168        (join_handle, shutdown_tx)
169    }
170
171    /// Check whether we should pause on bootstrap from the system parameter and reset it.
172    async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
173        let paused = self
174            .env
175            .system_params_reader()
176            .await
177            .pause_on_next_bootstrap();
178        if paused {
179            warn!(
180                "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
181                 It will now be reset to `false`. \
182                 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
183                PAUSE_ON_NEXT_BOOTSTRAP_KEY
184            );
185            self.env
186                .system_params_manager_impl_ref()
187                .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
188                .await?;
189        }
190        Ok(paused)
191    }
192
193    /// Start an infinite loop to take scheduled barriers and send them.
194    async fn run(mut self, shutdown_rx: Receiver<()>) {
195        tracing::info!(
196            "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
197            self.enable_recovery,
198            self.in_flight_barrier_nums,
199        );
200
201        if !self.enable_recovery {
202            let job_exist = self
203                .context
204                .metadata_manager
205                .catalog_controller
206                .has_any_streaming_jobs()
207                .await
208                .unwrap();
209            if job_exist {
210                panic!(
211                    "Some streaming jobs already exist in meta, please start with recovery enabled \
212                or clean up the metadata using `./risedev clean-data`"
213                );
214            }
215        }
216
217        {
218            // Bootstrap recovery. Here we simply trigger a recovery process to achieve the
219            // consistency.
220            // Even if there's no actor to recover, we still go through the recovery process to
221            // inject the first `Initial` barrier.
222            let span = tracing::info_span!("bootstrap_recovery");
223            crate::telemetry::report_event(
224                risingwave_pb::telemetry::TelemetryEventStage::Recovery,
225                "normal_recovery",
226                0,
227                None,
228                None,
229                None,
230            );
231
232            let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
233
234            self.recovery(paused, RecoveryReason::Bootstrap)
235                .instrument(span)
236                .await;
237        }
238
239        self.run_inner(shutdown_rx).await
240    }
241}
242
243impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
244    async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
245        let (local_notification_tx, mut local_notification_rx) =
246            tokio::sync::mpsc::unbounded_channel();
247        self.env
248            .notification_manager()
249            .insert_local_sender(local_notification_tx)
250            .await;
251
252        // Start the event loop.
253        loop {
254            tokio::select! {
255                biased;
256
257                // Shutdown
258                _ = &mut shutdown_rx => {
259                    tracing::info!("Barrier manager is stopped");
260                    break;
261                }
262
263                request = self.request_rx.recv() => {
264                    if let Some(request) = request {
265                        match request {
266                            BarrierManagerRequest::GetDdlProgress(result_tx) => {
267                                let progress = self.checkpoint_control.gen_ddl_progress();
268                                if result_tx.send(progress).is_err() {
269                                    error!("failed to send get ddl progress");
270                                }
271                            }// Handle adhoc recovery triggered by user.
272                            BarrierManagerRequest::AdhocRecovery(sender) => {
273                                self.adhoc_recovery().await;
274                                if sender.send(()).is_err() {
275                                    warn!("failed to notify finish of adhoc recovery");
276                                }
277                            }
278                        }
279                    } else {
280                        tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
281                        return;
282                    }
283                }
284
285                changed_worker = self.active_streaming_nodes.changed() => {
286                    #[cfg(debug_assertions)]
287                    {
288                        self.active_streaming_nodes.validate_change().await;
289                    }
290
291                    info!(?changed_worker, "worker changed");
292
293                    if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker {
294                        self.control_stream_manager.add_worker(node, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
295                    }
296                }
297
298                notification = local_notification_rx.recv() => {
299                    let notification = notification.unwrap();
300                    if let LocalNotification::SystemParamsChange(p) = notification {
301                        {
302                            self.periodic_barriers.set_min_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
303                            self.periodic_barriers
304                                .set_checkpoint_frequency(p.checkpoint_frequency() as usize);
305                            self.enable_per_database_isolation = p.per_database_isolation();
306                        }
307                    }
308                }
309                complete_result = self
310                    .completing_task
311                    .next_completed_barrier(
312                        &mut self.periodic_barriers,
313                        &mut self.checkpoint_control,
314                        &mut self.control_stream_manager,
315                        &self.context,
316                        &self.env,
317                ) => {
318                    match complete_result {
319                        Ok(output) => {
320                            self.checkpoint_control.ack_completed(output);
321                        }
322                        Err(e) => {
323                            self.failure_recovery(e).await;
324                        }
325                    }
326                },
327                event = self.checkpoint_control.next_event() => {
328                    let result: MetaResult<()> = try {
329                        match event {
330                            CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
331                                let database_id = entering_initializing.database_id();
332                                let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
333                                    resp.root_err.as_ref().map(|root_err| {
334                                        (*worker_id, ScoredError {
335                                            error: Status::internal(&root_err.err_msg),
336                                            score: Score(root_err.score)
337                                        })
338                                    })
339                                }));
340                                Self::report_collect_failure(&self.env, &error);
341                                self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
342                                match self.context.reload_database_runtime_info(database_id).await? { Some(runtime_info) => {
343                                    runtime_info.validate(database_id, &self.active_streaming_nodes).inspect_err(|e| {
344                                        warn!(database_id = database_id.database_id, err = ?e.as_report(), ?runtime_info, "reloaded database runtime info failed to validate");
345                                    })?;
346                                    let workers = runtime_info.database_fragment_info.workers();
347                                    for worker_id in workers {
348                                        if !self.control_stream_manager.contains_worker(worker_id) {
349                                            let node = self.active_streaming_nodes.current()[&worker_id].clone();
350                                            self.control_stream_manager.try_add_worker(node, entering_initializing.control().inflight_infos(), self.term_id.clone(), &*self.context).await;
351                                        }
352                                    }
353                                    entering_initializing.enter(runtime_info, &mut self.control_stream_manager);
354                                } _ => {
355                                    info!(database_id = database_id.database_id, "database removed after reloading empty runtime info");
356                                    // mark ready to unblock subsequent request
357                                    self.context.mark_ready(MarkReadyOptions::Database(database_id));
358                                    entering_initializing.remove();
359                                }}
360                            }
361                            CheckpointControlEvent::EnteringRunning(entering_running) => {
362                                self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
363                                entering_running.enter();
364                            }
365                        }
366                    };
367                    if let Err(e) = result {
368                        self.failure_recovery(e).await;
369                    }
370                }
371                (worker_id, resp_result) = self.control_stream_manager.next_response() => {
372                    let result: MetaResult<()> = try {
373                        let resp = match resp_result {
374                            Err(err) => {
375                                let failed_databases = self.checkpoint_control.databases_failed_at_worker_err(worker_id);
376                                if !failed_databases.is_empty() {
377                                    if !self.enable_recovery {
378                                        panic!("control stream to worker {} failed but recovery not enabled: {:?}", worker_id, err.as_report());
379                                    }
380                                    if !self.enable_per_database_isolation {
381                                        Err(err.clone())?;
382                                    }
383                                    Self::report_collect_failure(&self.env, &err);
384                                    for database_id in failed_databases {
385                                        if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
386                                            warn!(worker_id, database_id = database_id.database_id, "database entering recovery on node failure");
387                                            self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
388                                            self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
389                                            // TODO: add log on blocking time
390                                            let output = self.completing_task.wait_completing_task().await?;
391                                            entering_recovery.enter(output, &mut self.control_stream_manager);
392                                        }
393                                    }
394                                }  else {
395                                    warn!(worker_id, "no barrier to collect from worker, ignore err");
396                                }
397                                continue;
398                            }
399                            Ok(resp) => resp,
400                        };
401                        match resp {
402                            Response::CompleteBarrier(resp) => {
403                                self.checkpoint_control.barrier_collected(resp, &mut self.periodic_barriers)?;
404                            },
405                            Response::ReportDatabaseFailure(resp) => {
406                                if !self.enable_recovery {
407                                    panic!("database failure reported but recovery not enabled: {:?}", resp)
408                                }
409                                if !self.enable_per_database_isolation {
410                                        Err(anyhow!("database {} reset", resp.database_id))?;
411                                    }
412                                let database_id = DatabaseId::new(resp.database_id);
413                                if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
414                                    warn!(database_id = database_id.database_id, "database entering recovery");
415                                    self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
416                                    // TODO: add log on blocking time
417                                    let output = self.completing_task.wait_completing_task().await?;
418                                    entering_recovery.enter(output, &mut self.control_stream_manager);
419                                }
420                            }
421                            Response::ResetDatabase(resp) => {
422                                self.checkpoint_control.on_reset_database_resp(worker_id, resp);
423                            }
424                            other @ Response::Init(_) | other @ Response::Shutdown(_) => {
425                                Err(anyhow!("get expected response: {:?}", other))?;
426                            }
427                        }
428                    };
429                    if let Err(e) = result {
430                        self.failure_recovery(e).await;
431                    }
432                }
433                new_barrier = self.periodic_barriers.next_barrier(&*self.context),
434                    if self
435                        .checkpoint_control
436                        .can_inject_barrier(self.in_flight_barrier_nums) => {
437                    if let Some(failed_databases) = self.checkpoint_control.handle_new_barrier(new_barrier, &mut self.control_stream_manager)
438                        && !failed_databases.is_empty() {
439                        if !self.enable_recovery {
440                            panic!(
441                                "failed to inject barrier for some databases but recovery not enabled: {:?}",
442                                failed_databases.iter().map(|(database_id, e)| (database_id, e.as_report())).collect_vec()
443                            );
444                        }
445                        let result: MetaResult<_> = try {
446                            if !self.enable_per_database_isolation {
447                                let errs = failed_databases.iter().map(|(database_id, e)| (database_id, e.as_report())).collect_vec();
448                                let err = anyhow!("failed to inject barrier for databases: {:?}", errs);
449                                Err(err)?;
450                            } else {
451                                for (database_id, err) in failed_databases {
452                                    if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
453                                        warn!(%database_id, e = %err.as_report(),"database entering recovery on inject failure");
454                                        self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(err).context("inject barrier failure").into()));
455                                        // TODO: add log on blocking time
456                                        let output = self.completing_task.wait_completing_task().await?;
457                                        entering_recovery.enter(output, &mut self.control_stream_manager);
458                                    }
459                                }
460                            }
461                        };
462                        if let Err(e) = result {
463                            self.failure_recovery(e).await;
464                        }
465                    }
466                }
467            }
468            self.checkpoint_control.update_barrier_nums_metrics();
469        }
470    }
471}
472
473impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
474    /// We need to make sure there are no changes when doing recovery
475    pub async fn clear_on_err(&mut self, err: &MetaError) {
476        // join spawned completing command to finish no matter it succeeds or not.
477        let is_err = match replace(&mut self.completing_task, CompletingTask::None) {
478            CompletingTask::None => false,
479            CompletingTask::Completing {
480                epochs_to_ack,
481                join_handle,
482                ..
483            } => {
484                info!("waiting for completing command to finish in recovery");
485                match join_handle.await {
486                    Err(e) => {
487                        warn!(err = %e.as_report(), "failed to join completing task");
488                        true
489                    }
490                    Ok(Err(e)) => {
491                        warn!(
492                            err = %e.as_report(),
493                            "failed to complete barrier during clear"
494                        );
495                        true
496                    }
497                    Ok(Ok(hummock_version_stats)) => {
498                        self.checkpoint_control
499                            .ack_completed(BarrierCompleteOutput {
500                                epochs_to_ack,
501                                hummock_version_stats,
502                            });
503                        false
504                    }
505                }
506            }
507            CompletingTask::Err(_) => true,
508        };
509        if !is_err {
510            // continue to finish the pending collected barrier.
511            while let Some(task) = self.checkpoint_control.next_complete_barrier_task(None) {
512                let epochs_to_ack = task.epochs_to_ack();
513                match task
514                    .complete_barrier(&*self.context, self.env.clone())
515                    .await
516                {
517                    Ok(hummock_version_stats) => {
518                        self.checkpoint_control
519                            .ack_completed(BarrierCompleteOutput {
520                                epochs_to_ack,
521                                hummock_version_stats,
522                            });
523                    }
524                    Err(e) => {
525                        error!(
526                            err = %e.as_report(),
527                            "failed to complete barrier during recovery"
528                        );
529                        break;
530                    }
531                }
532            }
533        }
534        self.checkpoint_control.clear_on_err(err);
535    }
536}
537
538impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
539    /// Set barrier manager status.
540    async fn failure_recovery(&mut self, err: MetaError) {
541        self.clear_on_err(&err).await;
542
543        if self.enable_recovery {
544            let span = tracing::info_span!(
545                "failure_recovery",
546                error = %err.as_report(),
547            );
548
549            crate::telemetry::report_event(
550                risingwave_pb::telemetry::TelemetryEventStage::Recovery,
551                "failure_recovery",
552                0,
553                None,
554                None,
555                None,
556            );
557
558            let reason = RecoveryReason::Failover(err);
559
560            // No need to clean dirty tables for barrier recovery,
561            // The foreground stream job should cleanup their own tables.
562            self.recovery(false, reason).instrument(span).await;
563        } else {
564            panic!(
565                "a streaming error occurred while recovery is disabled, aborting: {:?}",
566                err.as_report()
567            );
568        }
569    }
570
571    async fn adhoc_recovery(&mut self) {
572        let err = MetaErrorInner::AdhocRecovery.into();
573        self.clear_on_err(&err).await;
574
575        let span = tracing::info_span!(
576            "adhoc_recovery",
577            error = %err.as_report(),
578        );
579
580        crate::telemetry::report_event(
581            risingwave_pb::telemetry::TelemetryEventStage::Recovery,
582            "adhoc_recovery",
583            0,
584            None,
585            None,
586            None,
587        );
588
589        // No need to clean dirty tables for barrier recovery,
590        // The foreground stream job should cleanup their own tables.
591        self.recovery(false, RecoveryReason::Adhoc)
592            .instrument(span)
593            .await;
594    }
595}
596
597impl<C> GlobalBarrierWorker<C> {
598    /// Send barrier-complete-rpc and wait for responses from all CNs
599    pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
600        // Record failure in event log.
601        use risingwave_pb::meta::event_log;
602        let event = event_log::EventCollectBarrierFail {
603            error: error.to_report_string(),
604        };
605        env.event_log_manager_ref()
606            .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
607    }
608}
609
610mod retry_strategy {
611    use std::time::Duration;
612
613    use tokio_retry::strategy::{ExponentialBackoff, jitter};
614
615    // Retry base interval in milliseconds.
616    const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
617    // Retry max interval.
618    const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
619
620    mod retry_backoff_future {
621        use std::future::Future;
622        use std::time::Duration;
623
624        use tokio::time::sleep;
625
626        pub(crate) type RetryBackoffFuture = impl Future<Output = ()> + Unpin + Send + 'static;
627        pub(super) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
628            Box::pin(sleep(duration))
629        }
630    }
631    pub(crate) use retry_backoff_future::*;
632
633    pub(crate) type RetryBackoffStrategy =
634        impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
635
636    #[inline(always)]
637    /// Initialize a retry strategy for operation in recovery.
638    pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> {
639        ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
640            .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
641            .map(jitter)
642    }
643
644    pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
645        get_retry_strategy().map(get_retry_backoff_future)
646    }
647}
648
649pub(crate) use retry_strategy::*;
650use risingwave_common::error::tonic::extra::{Score, ScoredError};
651use risingwave_meta_model::WorkerId;
652use risingwave_pb::meta::event_log::{Event, EventRecovery};
653
654use crate::barrier::edge_builder::FragmentEdgeBuilder;
655
656impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
657    /// Recovery the whole cluster from the latest epoch.
658    ///
659    /// If `paused_reason` is `Some`, all data sources (including connectors and DMLs) will be
660    /// immediately paused after recovery, until the user manually resume them either by restarting
661    /// the cluster or `risectl` command. Used for debugging purpose.
662    ///
663    /// Returns the new state of the barrier manager after recovery.
664    pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
665        // Clear all control streams to release resources (connections to compute nodes) first.
666        self.control_stream_manager.clear();
667
668        let reason_str = match &recovery_reason {
669            RecoveryReason::Bootstrap => "bootstrap".to_owned(),
670            RecoveryReason::Failover(err) => {
671                format!("failed over: {}", err.as_report())
672            }
673            RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
674        };
675
676        self.context.abort_and_mark_blocked(None, recovery_reason);
677
678        self.recovery_inner(is_paused, reason_str).await;
679        self.context.mark_ready(MarkReadyOptions::Global {
680            blocked_databases: self.checkpoint_control.recovering_databases().collect(),
681        });
682    }
683
684    async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
685        let event_log_manager_ref = self.env.event_log_manager_ref();
686
687        tracing::info!("recovery start!");
688        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
689            EventRecovery::global_recovery_start(recovery_reason.clone()),
690        )]);
691
692        let retry_strategy = get_retry_strategy();
693
694        // We take retry into consideration because this is the latency user sees for a cluster to
695        // get recovered.
696        let recovery_timer = GLOBAL_META_METRICS
697            .recovery_latency
698            .with_label_values(&["global"])
699            .start_timer();
700
701        let new_state = tokio_retry::Retry::spawn(retry_strategy, || async {
702            // We need to notify_creating_job_failed in every recovery retry, because in outer create_streaming_job handler,
703            // it holds the reschedule_read_lock and wait for creating job to finish, and caused the following scale_actor fail
704            // to acquire the reschedule_write_lock, and then keep recovering, and then deadlock.
705            // TODO: refactor and fix this hacky implementation.
706            self.context
707                .notify_creating_job_failed(None, recovery_reason.clone())
708                .await;
709            let runtime_info_snapshot = self
710                .context
711                .reload_runtime_info()
712                .await?;
713            runtime_info_snapshot.validate().inspect_err(|e| {
714                warn!(err = ?e.as_report(), ?runtime_info_snapshot, "reloaded runtime info failed to validate");
715            })?;
716            let BarrierWorkerRuntimeInfoSnapshot {
717                active_streaming_nodes,
718                database_fragment_infos,
719                mut state_table_committed_epochs,
720                mut subscription_infos,
721                stream_actors,
722                fragment_relations,
723                mut source_splits,
724                mut background_jobs,
725                hummock_version_stats,
726            } = runtime_info_snapshot;
727
728            self.sink_manager.reset().await;
729            let term_id = Uuid::new_v4().to_string();
730
731            let mut control_stream_manager = ControlStreamManager::new(self.env.clone());
732            let reset_start_time = Instant::now();
733            let unconnected_worker = control_stream_manager
734                .reset(
735                    active_streaming_nodes.current(),
736                    term_id.clone(),
737                    &*self.context,
738                )
739                .await;
740            info!(elapsed=?reset_start_time.elapsed(), ?unconnected_worker, "control stream reset");
741
742            {
743                let mut builder = FragmentEdgeBuilder::new(database_fragment_infos.values().flat_map(|info| info.fragment_infos()));
744                builder.add_relations(&fragment_relations);
745                let mut edges = builder.build();
746
747                let mut collected_databases = HashMap::new();
748                let mut collecting_databases = HashMap::new();
749                let mut failed_databases = HashSet::new();
750                for (database_id, info) in database_fragment_infos {
751                    let result = control_stream_manager.inject_database_initial_barrier(
752                        database_id,
753                        info,
754                        &mut state_table_committed_epochs,
755                        &mut edges,
756                        &stream_actors,
757                        &mut source_splits,
758                        &mut background_jobs,
759                        subscription_infos.remove(&database_id).unwrap_or_default(),
760                        is_paused,
761                        &hummock_version_stats,
762                    );
763                    let node_to_collect = match result {
764                        Ok(info) => {
765                            info
766                        }
767                        Err(e) => {
768                            warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
769                            assert!(failed_databases.insert(database_id), "non-duplicate");
770                            continue;
771                        }
772                    };
773                    if !node_to_collect.is_collected() {
774                        assert!(collecting_databases.insert(database_id, node_to_collect).is_none());
775                    } else {
776                        warn!(database_id = database_id.database_id, "database has no node to inject initial barrier");
777                        assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
778                    }
779                }
780                while !collecting_databases.is_empty() {
781                    let (worker_id, result) =
782                        control_stream_manager.next_response().await;
783                    let resp = match result {
784                        Err(e) => {
785                            warn!(worker_id, err = %e.as_report(), "worker node failure during recovery");
786                            for (failed_database_id,_ ) in collecting_databases.extract_if(|_, node_to_collect| {
787                                !node_to_collect.is_valid_after_worker_err(worker_id)
788                            }) {
789                                warn!(%failed_database_id, worker_id, "database failed to recovery in global recovery due to worker node err");
790                                assert!(failed_databases.insert(failed_database_id));
791                            }
792                            continue;
793                        }
794                        Ok(resp) => {
795                            match resp {
796                                Response::CompleteBarrier(resp) => {
797                                    resp
798                                }
799                                Response::ReportDatabaseFailure(resp) => {
800                                    let database_id = DatabaseId::new(resp.database_id);
801                                    if collecting_databases.remove(&database_id).is_some() {
802                                        warn!(%database_id, worker_id, "database reset during global recovery");
803                                        assert!(failed_databases.insert(database_id));
804                                    } else if collected_databases.remove(&database_id).is_some() {
805                                        warn!(%database_id, worker_id, "database initialized but later reset during global recovery");
806                                        assert!(failed_databases.insert(database_id));
807                                    } else {
808                                        assert!(failed_databases.contains(&database_id));
809                                    }
810                                    continue;
811                                }
812                                other @ (Response::Init(_) | Response::Shutdown(_) | Response::ResetDatabase(_))=> {
813                                    return Err(anyhow!("get unexpected resp {:?}", other).into());
814                                }
815                            }
816                        }
817                    };
818                    assert_eq!(worker_id, resp.worker_id as WorkerId);
819                    let database_id = DatabaseId::new(resp.database_id);
820                    if failed_databases.contains(&database_id) {
821                        assert!(!collecting_databases.contains_key(&database_id));
822                        // ignore the lately arrived collect resp of failed database
823                        continue;
824                    }
825                    let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
826                        unreachable!("should exist")
827                    };
828                    let node_to_collect = entry.get_mut();
829                    node_to_collect.collect_resp(resp);
830                    if node_to_collect.is_collected() {
831                        let node_to_collect = entry.remove();
832                        assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
833                    }
834                }
835                debug!("collected initial barrier");
836                if !stream_actors.is_empty() {
837                    warn!(actor_ids = ?stream_actors.keys().collect_vec(), "unused stream actors in recovery");
838                }
839                if !source_splits.is_empty() {
840                    warn!(actor_ids = ?source_splits.keys().collect_vec(), "unused actor source splits in recovery");
841                }
842                if !background_jobs.is_empty() {
843                    warn!(job_ids = ?background_jobs.keys().collect_vec(), "unused recovered background mview in recovery");
844                }
845                if !subscription_infos.is_empty() {
846                    warn!(?subscription_infos, "unused subscription infos in recovery");
847                }
848                if !state_table_committed_epochs.is_empty() {
849                    warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
850                }
851                if !self.enable_per_database_isolation && !failed_databases.is_empty() {
852                    return Err(anyhow!(
853                        "global recovery failed due to failure of databases {:?}",
854                        failed_databases.iter().map(|database_id| database_id.database_id).collect_vec()).into()
855                    );
856                }
857                let checkpoint_control = CheckpointControl::recover(
858                    collected_databases,
859                    failed_databases,
860                    &mut control_stream_manager,
861                    hummock_version_stats,
862                );
863                Ok((
864                    active_streaming_nodes,
865                    control_stream_manager,
866                    checkpoint_control,
867                    term_id,
868                ))
869            }
870        }.inspect_err(|err: &MetaError| {
871            tracing::error!(error = %err.as_report(), "recovery failed");
872            event_log_manager_ref.add_event_logs(vec![Event::Recovery(
873                EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
874            )]);
875            GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
876        }))
877            .instrument(tracing::info_span!("recovery_attempt"))
878            .await
879            .expect("Retry until recovery success.");
880
881        let duration = recovery_timer.stop_and_record();
882
883        (
884            self.active_streaming_nodes,
885            self.control_stream_manager,
886            self.checkpoint_control,
887            self.term_id,
888        ) = new_state;
889
890        tracing::info!("recovery success");
891
892        let recovering_databases = self
893            .checkpoint_control
894            .recovering_databases()
895            .map(|database| database.database_id)
896            .collect_vec();
897        let running_databases = self
898            .checkpoint_control
899            .running_databases()
900            .map(|database| database.database_id)
901            .collect_vec();
902
903        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
904            EventRecovery::global_recovery_success(
905                recovery_reason.clone(),
906                duration as f32,
907                running_databases,
908                recovering_databases,
909            ),
910        )]);
911
912        self.env
913            .notification_manager()
914            .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
915    }
916}