risingwave_meta/barrier/
worker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::anyhow;
22use arc_swap::ArcSwap;
23use futures::TryFutureExt;
24use itertools::Itertools;
25use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_pb::meta::Recovery;
28use risingwave_pb::meta::subscribe_response::{Info, Operation};
29use risingwave_pb::stream_service::streaming_control_stream_response::Response;
30use thiserror_ext::AsReport;
31use tokio::sync::mpsc;
32use tokio::sync::oneshot::{Receiver, Sender};
33use tokio::task::JoinHandle;
34use tonic::Status;
35use tracing::{Instrument, debug, error, info, warn};
36use uuid::Uuid;
37
38use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
39use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
40use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
41use crate::barrier::rpc::{ControlStreamManager, WorkerNodeEvent, merge_node_rpc_errors};
42use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
43use crate::barrier::{
44    BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, RecoveryReason,
45    schedule,
46};
47use crate::error::MetaErrorInner;
48use crate::hummock::HummockManagerRef;
49use crate::manager::sink_coordination::SinkCoordinatorManager;
50use crate::manager::{
51    ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
52    MetadataManager,
53};
54use crate::rpc::metrics::GLOBAL_META_METRICS;
55use crate::stream::{ScaleControllerRef, SourceManagerRef};
56use crate::{MetaError, MetaResult};
57
58/// [`crate::barrier::worker::GlobalBarrierWorker`] sends barriers to all registered compute nodes and
59/// collect them, with monotonic increasing epoch numbers. On compute nodes, `LocalBarrierManager`
60/// in `risingwave_stream` crate will serve these requests and dispatch them to source actors.
61///
62/// Configuration change in our system is achieved by the mutation in the barrier. Thus,
63/// [`crate::barrier::worker::GlobalBarrierWorker`] provides a set of interfaces like a state machine,
64/// accepting [`crate::barrier::command::Command`] that carries info to build `Mutation`. To keep the consistency between
65/// barrier manager and meta store, some actions like "drop materialized view" or "create mv on mv"
66/// must be done in barrier manager transactional using [`crate::barrier::command::Command`].
67pub(super) struct GlobalBarrierWorker<C> {
68    /// Enable recovery or not when failover.
69    enable_recovery: bool,
70
71    /// The queue of scheduled barriers.
72    periodic_barriers: PeriodicBarriers,
73
74    /// Whether per database failure isolation is enabled in system parameters.
75    system_enable_per_database_isolation: bool,
76
77    pub(super) context: Arc<C>,
78
79    env: MetaSrvEnv,
80
81    checkpoint_control: CheckpointControl,
82
83    /// Command that has been collected but is still completing.
84    /// The join handle of the completing future is stored.
85    completing_task: CompletingTask,
86
87    request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
88
89    active_streaming_nodes: ActiveStreamingWorkerNodes,
90
91    sink_manager: SinkCoordinatorManager,
92
93    control_stream_manager: ControlStreamManager,
94
95    term_id: String,
96}
97
98impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
99    pub(super) async fn new_inner(
100        env: MetaSrvEnv,
101        sink_manager: SinkCoordinatorManager,
102        request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
103        context: Arc<C>,
104    ) -> Self {
105        let enable_recovery = env.opts.enable_recovery;
106
107        let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized();
108
109        let control_stream_manager = ControlStreamManager::new(env.clone());
110
111        let reader = env.system_params_reader().await;
112        let system_enable_per_database_isolation = reader.per_database_isolation();
113        // Load config will be performed in bootstrap phase.
114        let periodic_barriers = PeriodicBarriers::default();
115
116        let checkpoint_control = CheckpointControl::new(env.clone());
117        Self {
118            enable_recovery,
119            periodic_barriers,
120            system_enable_per_database_isolation,
121            context,
122            env,
123            checkpoint_control,
124            completing_task: CompletingTask::None,
125            request_rx,
126            active_streaming_nodes,
127            sink_manager,
128            control_stream_manager,
129            term_id: "uninitialized".into(),
130        }
131    }
132}
133
134impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
135    /// Create a new [`crate::barrier::worker::GlobalBarrierWorker`].
136    pub async fn new(
137        scheduled_barriers: schedule::ScheduledBarriers,
138        env: MetaSrvEnv,
139        metadata_manager: MetadataManager,
140        hummock_manager: HummockManagerRef,
141        source_manager: SourceManagerRef,
142        sink_manager: SinkCoordinatorManager,
143        scale_controller: ScaleControllerRef,
144        request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
145        barrier_scheduler: schedule::BarrierScheduler,
146    ) -> Self {
147        let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
148
149        let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
150            scheduled_barriers,
151            status,
152            metadata_manager,
153            hummock_manager,
154            source_manager,
155            scale_controller,
156            env.clone(),
157            barrier_scheduler,
158        ));
159
160        Self::new_inner(env, sink_manager, request_rx, context).await
161    }
162
163    pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
164        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
165        let fut = (self.env.await_tree_reg())
166            .register_derived_root("Global Barrier Worker")
167            .instrument(self.run(shutdown_rx));
168        let join_handle = tokio::spawn(fut);
169
170        (join_handle, shutdown_tx)
171    }
172
173    /// Check whether we should pause on bootstrap from the system parameter and reset it.
174    async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
175        let paused = self
176            .env
177            .system_params_reader()
178            .await
179            .pause_on_next_bootstrap();
180        if paused {
181            warn!(
182                "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
183                 It will now be reset to `false`. \
184                 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
185                PAUSE_ON_NEXT_BOOTSTRAP_KEY
186            );
187            self.env
188                .system_params_manager_impl_ref()
189                .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
190                .await?;
191        }
192        Ok(paused)
193    }
194
195    /// Start an infinite loop to take scheduled barriers and send them.
196    async fn run(mut self, shutdown_rx: Receiver<()>) {
197        tracing::info!(
198            "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
199            self.enable_recovery,
200            self.checkpoint_control.in_flight_barrier_nums,
201        );
202
203        if !self.enable_recovery {
204            let job_exist = self
205                .context
206                .metadata_manager
207                .catalog_controller
208                .has_any_streaming_jobs()
209                .await
210                .unwrap();
211            if job_exist {
212                panic!(
213                    "Some streaming jobs already exist in meta, please start with recovery enabled \
214                or clean up the metadata using `./risedev clean-data`"
215                );
216            }
217        }
218
219        {
220            // Bootstrap recovery. Here we simply trigger a recovery process to achieve the
221            // consistency.
222            // Even if there's no actor to recover, we still go through the recovery process to
223            // inject the first `Initial` barrier.
224            let span = tracing::info_span!("bootstrap_recovery");
225            crate::telemetry::report_event(
226                risingwave_pb::telemetry::TelemetryEventStage::Recovery,
227                "normal_recovery",
228                0,
229                None,
230                None,
231                None,
232            );
233
234            let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
235
236            self.recovery(paused, RecoveryReason::Bootstrap)
237                .instrument(span)
238                .await;
239        }
240
241        self.run_inner(shutdown_rx).await
242    }
243}
244
245impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
246    fn enable_per_database_isolation(&self) -> bool {
247        self.system_enable_per_database_isolation && {
248            if let Err(e) =
249                risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()
250            {
251                warn!(error = %e.as_report(), "DatabaseFailureIsolation disabled by license");
252                false
253            } else {
254                true
255            }
256        }
257    }
258
259    pub(super) async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
260        let (local_notification_tx, mut local_notification_rx) =
261            tokio::sync::mpsc::unbounded_channel();
262        self.env
263            .notification_manager()
264            .insert_local_sender(local_notification_tx);
265
266        // Start the event loop.
267        loop {
268            tokio::select! {
269                biased;
270
271                // Shutdown
272                _ = &mut shutdown_rx => {
273                    tracing::info!("Barrier manager is stopped");
274                    break;
275                }
276
277                request = self.request_rx.recv() => {
278                    if let Some(request) = request {
279                        match request {
280                            BarrierManagerRequest::GetDdlProgress(result_tx) => {
281                                let progress = self.checkpoint_control.gen_ddl_progress();
282                                if result_tx.send(progress).is_err() {
283                                    error!("failed to send get ddl progress");
284                                }
285                            }// Handle adhoc recovery triggered by user.
286                            BarrierManagerRequest::AdhocRecovery(sender) => {
287                                self.adhoc_recovery().await;
288                                if sender.send(()).is_err() {
289                                    warn!("failed to notify finish of adhoc recovery");
290                                }
291                            }
292                            BarrierManagerRequest::UpdateDatabaseBarrier {
293                                database_id,
294                                barrier_interval_ms,
295                                checkpoint_frequency,
296                                sender,
297                            } => {
298                                self.periodic_barriers
299                                    .update_database_barrier(
300                                        database_id,
301                                        barrier_interval_ms,
302                                        checkpoint_frequency,
303                                    );
304                                if sender.send(()).is_err() {
305                                    warn!("failed to notify finish of update database barrier");
306                                }
307                            }
308                        }
309                    } else {
310                        tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
311                        return;
312                    }
313                }
314
315                changed_worker = self.active_streaming_nodes.changed() => {
316                    #[cfg(debug_assertions)]
317                    {
318                        self.active_streaming_nodes.validate_change().await;
319                    }
320
321                    info!(?changed_worker, "worker changed");
322
323                    match changed_worker {
324                        ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) => {
325                            self.control_stream_manager.add_worker(node, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
326                        }
327                        ActiveStreamingWorkerChange::Remove(node) => {
328                            self.control_stream_manager.remove_worker(node);
329                        }
330                    }
331                }
332
333                notification = local_notification_rx.recv() => {
334                    let notification = notification.unwrap();
335                    if let LocalNotification::SystemParamsChange(p) = notification {
336                        {
337                            self.periodic_barriers.set_sys_barrier_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
338                            self.periodic_barriers
339                                .set_sys_checkpoint_frequency(p.checkpoint_frequency());
340                            self.system_enable_per_database_isolation = p.per_database_isolation();
341                        }
342                    }
343                }
344                complete_result = self
345                    .completing_task
346                    .next_completed_barrier(
347                        &mut self.periodic_barriers,
348                        &mut self.checkpoint_control,
349                        &mut self.control_stream_manager,
350                        &self.context,
351                        &self.env,
352                ) => {
353                    match complete_result {
354                        Ok(output) => {
355                            self.checkpoint_control.ack_completed(output);
356                        }
357                        Err(e) => {
358                            self.failure_recovery(e).await;
359                        }
360                    }
361                },
362                event = self.checkpoint_control.next_event() => {
363                    let result: MetaResult<()> = try {
364                        match event {
365                            CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
366                                let database_id = entering_initializing.database_id();
367                                let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
368                                    resp.root_err.as_ref().map(|root_err| {
369                                        (*worker_id, ScoredError {
370                                            error: Status::internal(&root_err.err_msg),
371                                            score: Score(root_err.score)
372                                        })
373                                    })
374                                }));
375                                Self::report_collect_failure(&self.env, &error);
376                                self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
377                                match self.context.reload_database_runtime_info(database_id).await? { Some(runtime_info) => {
378                                    runtime_info.validate(database_id, &self.active_streaming_nodes).inspect_err(|e| {
379                                        warn!(%database_id, err = ?e.as_report(), ?runtime_info, "reloaded database runtime info failed to validate");
380                                    })?;
381                                    entering_initializing.enter(runtime_info, &mut self.control_stream_manager);
382                                } _ => {
383                                    info!(%database_id, "database removed after reloading empty runtime info");
384                                    // mark ready to unblock subsequent request
385                                    self.context.mark_ready(MarkReadyOptions::Database(database_id));
386                                    entering_initializing.remove();
387                                }}
388                            }
389                            CheckpointControlEvent::EnteringRunning(entering_running) => {
390                                self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
391                                entering_running.enter();
392                            }
393                        }
394                    };
395                    if let Err(e) = result {
396                        self.failure_recovery(e).await;
397                    }
398                }
399                (worker_id, event) = self.control_stream_manager.next_event(&self.term_id, &self.context) => {
400                    let resp_result = match event {
401                        WorkerNodeEvent::Response(result) => {
402                            result
403                        }
404                        WorkerNodeEvent::Connected(connected) => {
405                            connected.initialize(self.checkpoint_control.inflight_infos());
406                            continue;
407                        }
408                    };
409                    let result: MetaResult<()> = try {
410                        let resp = match resp_result {
411                            Err(err) => {
412                                let failed_databases = self.checkpoint_control.databases_failed_at_worker_err(worker_id);
413                                if !failed_databases.is_empty() {
414                                    if !self.enable_recovery {
415                                        panic!("control stream to worker {} failed but recovery not enabled: {:?}", worker_id, err.as_report());
416                                    }
417                                    if !self.enable_per_database_isolation() {
418                                        Err(err.clone())?;
419                                    }
420                                    Self::report_collect_failure(&self.env, &err);
421                                    for database_id in failed_databases {
422                                        if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
423                                            warn!(worker_id, %database_id, "database entering recovery on node failure");
424                                            self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
425                                            self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
426                                            // TODO: add log on blocking time
427                                            let output = self.completing_task.wait_completing_task().await?;
428                                            entering_recovery.enter(output, &mut self.control_stream_manager);
429                                        }
430                                    }
431                                }  else {
432                                    warn!(worker_id, "no barrier to collect from worker, ignore err");
433                                }
434                                continue;
435                            }
436                            Ok(resp) => resp,
437                        };
438                        match resp {
439                            Response::CompleteBarrier(resp) => {
440                                self.checkpoint_control.barrier_collected(resp, &mut self.periodic_barriers)?;
441                            },
442                            Response::ReportDatabaseFailure(resp) => {
443                                if !self.enable_recovery {
444                                    panic!("database failure reported but recovery not enabled: {:?}", resp)
445                                }
446                                if !self.enable_per_database_isolation() {
447                                        Err(anyhow!("database {} reset", resp.database_id))?;
448                                    }
449                                let database_id = resp.database_id;
450                                if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
451                                    warn!(%database_id, "database entering recovery");
452                                    self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
453                                    // TODO: add log on blocking time
454                                    let output = self.completing_task.wait_completing_task().await?;
455                                    entering_recovery.enter(output, &mut self.control_stream_manager);
456                                }
457                            }
458                            Response::ResetDatabase(resp) => {
459                                self.checkpoint_control.on_reset_database_resp(worker_id, resp);
460                            }
461                            other @ Response::Init(_) | other @ Response::Shutdown(_) => {
462                                Err(anyhow!("get expected response: {:?}", other))?;
463                            }
464                        }
465                    };
466                    if let Err(e) = result {
467                        self.failure_recovery(e).await;
468                    }
469                }
470                new_barrier = self.periodic_barriers.next_barrier(&*self.context) => {
471                    let database_id = new_barrier.database_id;
472                    if let Err(e) = self.checkpoint_control.handle_new_barrier(new_barrier, &mut self.control_stream_manager) {
473                        if !self.enable_recovery {
474                            panic!(
475                                "failed to inject barrier to some databases but recovery not enabled: {:?}", (
476                                    database_id,
477                                    e.as_report()
478                                )
479                            );
480                        }
481                        let result: MetaResult<_> = try {
482                            if !self.enable_per_database_isolation() {
483                                let err = anyhow!("failed to inject barrier to databases: {:?}", (database_id, e.as_report()));
484                                Err(err)?;
485                            } else if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
486                                warn!(%database_id, e = %e.as_report(),"database entering recovery on inject failure");
487                                self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(e).context("inject barrier failure").into()));
488                                // TODO: add log on blocking time
489                                let output = self.completing_task.wait_completing_task().await?;
490                                entering_recovery.enter(output, &mut self.control_stream_manager);
491                            }
492                        };
493                        if let Err(e) = result {
494                            self.failure_recovery(e).await;
495                        }
496                    }
497                }
498            }
499            self.checkpoint_control.update_barrier_nums_metrics();
500        }
501    }
502}
503
504impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
505    /// We need to make sure there are no changes when doing recovery
506    pub async fn clear_on_err(&mut self, err: &MetaError) {
507        // join spawned completing command to finish no matter it succeeds or not.
508        let is_err = match replace(&mut self.completing_task, CompletingTask::None) {
509            CompletingTask::None => false,
510            CompletingTask::Completing {
511                epochs_to_ack,
512                join_handle,
513                ..
514            } => {
515                info!("waiting for completing command to finish in recovery");
516                match join_handle.await {
517                    Err(e) => {
518                        warn!(err = %e.as_report(), "failed to join completing task");
519                        true
520                    }
521                    Ok(Err(e)) => {
522                        warn!(
523                            err = %e.as_report(),
524                            "failed to complete barrier during clear"
525                        );
526                        true
527                    }
528                    Ok(Ok(hummock_version_stats)) => {
529                        self.checkpoint_control
530                            .ack_completed(BarrierCompleteOutput {
531                                epochs_to_ack,
532                                hummock_version_stats,
533                            });
534                        false
535                    }
536                }
537            }
538            CompletingTask::Err(_) => true,
539        };
540        if !is_err {
541            // continue to finish the pending collected barrier.
542            while let Some(task) = self.checkpoint_control.next_complete_barrier_task(None) {
543                let epochs_to_ack = task.epochs_to_ack();
544                match task
545                    .complete_barrier(&*self.context, self.env.clone())
546                    .await
547                {
548                    Ok(hummock_version_stats) => {
549                        self.checkpoint_control
550                            .ack_completed(BarrierCompleteOutput {
551                                epochs_to_ack,
552                                hummock_version_stats,
553                            });
554                    }
555                    Err(e) => {
556                        error!(
557                            err = %e.as_report(),
558                            "failed to complete barrier during recovery"
559                        );
560                        break;
561                    }
562                }
563            }
564        }
565        self.checkpoint_control.clear_on_err(err);
566    }
567}
568
569impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
570    /// Set barrier manager status.
571    async fn failure_recovery(&mut self, err: MetaError) {
572        self.clear_on_err(&err).await;
573
574        if self.enable_recovery {
575            let span = tracing::info_span!(
576                "failure_recovery",
577                error = %err.as_report(),
578            );
579
580            crate::telemetry::report_event(
581                risingwave_pb::telemetry::TelemetryEventStage::Recovery,
582                "failure_recovery",
583                0,
584                None,
585                None,
586                None,
587            );
588
589            let reason = RecoveryReason::Failover(err);
590
591            // No need to clean dirty tables for barrier recovery,
592            // The foreground stream job should cleanup their own tables.
593            self.recovery(false, reason).instrument(span).await;
594        } else {
595            panic!(
596                "a streaming error occurred while recovery is disabled, aborting: {:?}",
597                err.as_report()
598            );
599        }
600    }
601
602    async fn adhoc_recovery(&mut self) {
603        let err = MetaErrorInner::AdhocRecovery.into();
604        self.clear_on_err(&err).await;
605
606        let span = tracing::info_span!(
607            "adhoc_recovery",
608            error = %err.as_report(),
609        );
610
611        crate::telemetry::report_event(
612            risingwave_pb::telemetry::TelemetryEventStage::Recovery,
613            "adhoc_recovery",
614            0,
615            None,
616            None,
617            None,
618        );
619
620        // No need to clean dirty tables for barrier recovery,
621        // The foreground stream job should cleanup their own tables.
622        self.recovery(false, RecoveryReason::Adhoc)
623            .instrument(span)
624            .await;
625    }
626}
627
628impl<C> GlobalBarrierWorker<C> {
629    /// Send barrier-complete-rpc and wait for responses from all CNs
630    pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
631        // Record failure in event log.
632        use risingwave_pb::meta::event_log;
633        let event = event_log::EventCollectBarrierFail {
634            error: error.to_report_string(),
635        };
636        env.event_log_manager_ref()
637            .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
638    }
639}
640
641mod retry_strategy {
642    use std::time::Duration;
643
644    use tokio_retry::strategy::{ExponentialBackoff, jitter};
645
646    // Retry base interval in milliseconds.
647    const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
648    // Retry max interval.
649    const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
650
651    // MrCroxx: Use concrete type here to prevent unsolved compiler issue.
652    // Feel free to replace the concrete type with TAIT after fixed.
653
654    // mod retry_backoff_future {
655    //     use std::future::Future;
656    //     use std::time::Duration;
657    //
658    //     use tokio::time::sleep;
659    //
660    //     pub(crate) type RetryBackoffFuture = impl Future<Output = ()> + Unpin + Send + 'static;
661    //
662    //     #[define_opaque(RetryBackoffFuture)]
663    //     pub(super) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
664    //         Box::pin(sleep(duration))
665    //     }
666    // }
667    // pub(crate) use retry_backoff_future::*;
668
669    pub(crate) type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
670
671    pub(crate) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
672        Box::pin(tokio::time::sleep(duration))
673    }
674
675    pub(crate) type RetryBackoffStrategy =
676        impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
677
678    /// Initialize a retry strategy for operation in recovery.
679    #[inline(always)]
680    pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> + Send + 'static {
681        ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
682            .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
683            .map(jitter)
684    }
685
686    #[define_opaque(RetryBackoffStrategy)]
687    pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
688        get_retry_strategy().map(get_retry_backoff_future)
689    }
690}
691
692pub(crate) use retry_strategy::*;
693use risingwave_common::error::tonic::extra::{Score, ScoredError};
694use risingwave_meta_model::WorkerId;
695use risingwave_pb::meta::event_log::{Event, EventRecovery};
696
697use crate::barrier::edge_builder::FragmentEdgeBuilder;
698
699impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
700    /// Recovery the whole cluster from the latest epoch.
701    ///
702    /// If `paused_reason` is `Some`, all data sources (including connectors and DMLs) will be
703    /// immediately paused after recovery, until the user manually resume them either by restarting
704    /// the cluster or `risectl` command. Used for debugging purpose.
705    ///
706    /// Returns the new state of the barrier manager after recovery.
707    pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
708        // Clear all control streams to release resources (connections to compute nodes) first.
709        self.control_stream_manager.clear();
710
711        let reason_str = match &recovery_reason {
712            RecoveryReason::Bootstrap => "bootstrap".to_owned(),
713            RecoveryReason::Failover(err) => {
714                format!("failed over: {}", err.as_report())
715            }
716            RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
717        };
718        self.context.abort_and_mark_blocked(None, recovery_reason);
719
720        self.recovery_inner(is_paused, reason_str).await;
721        self.context.mark_ready(MarkReadyOptions::Global {
722            blocked_databases: self.checkpoint_control.recovering_databases().collect(),
723        });
724    }
725
726    #[await_tree::instrument("recovery({recovery_reason})")]
727    async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
728        let event_log_manager_ref = self.env.event_log_manager_ref();
729
730        tracing::info!("recovery start!");
731        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
732            EventRecovery::global_recovery_start(recovery_reason.clone()),
733        )]);
734
735        let retry_strategy = get_retry_strategy();
736
737        // We take retry into consideration because this is the latency user sees for a cluster to
738        // get recovered.
739        let recovery_timer = GLOBAL_META_METRICS
740            .recovery_latency
741            .with_label_values(&["global"])
742            .start_timer();
743
744        let enable_per_database_isolation = self.enable_per_database_isolation();
745
746        let new_state = tokio_retry::Retry::spawn(retry_strategy, || async {
747            self.env.stream_client_pool().invalidate_all();
748            // We need to notify_creating_job_failed in every recovery retry, because in outer create_streaming_job handler,
749            // it holds the reschedule_read_lock and wait for creating job to finish, and caused the following scale_actor fail
750            // to acquire the reschedule_write_lock, and then keep recovering, and then deadlock.
751            // TODO: refactor and fix this hacky implementation.
752            self.context
753                .notify_creating_job_failed(None, recovery_reason.clone())
754                .await;
755            let runtime_info_snapshot = self
756                .context
757                .reload_runtime_info()
758                .await?;
759            runtime_info_snapshot.validate().inspect_err(|e| {
760                warn!(err = ?e.as_report(), ?runtime_info_snapshot, "reloaded runtime info failed to validate");
761            })?;
762            let BarrierWorkerRuntimeInfoSnapshot {
763                active_streaming_nodes,
764                database_job_infos,
765                mut state_table_committed_epochs,
766                mut state_table_log_epochs,
767                mut mv_depended_subscriptions,
768                stream_actors,
769                fragment_relations,
770                mut source_splits,
771                mut background_jobs,
772                hummock_version_stats,
773                database_infos,
774                mut cdc_table_snapshot_split_assignment,
775            } = runtime_info_snapshot;
776
777            self.sink_manager.reset().await;
778            let term_id = Uuid::new_v4().to_string();
779
780
781            let mut control_stream_manager = ControlStreamManager::recover(
782                    self.env.clone(),
783                    active_streaming_nodes.current(),
784                    &term_id,
785                    self.context.clone(),
786                )
787                .await;
788
789
790            {
791                let mut builder = FragmentEdgeBuilder::new(database_job_infos.values().flat_map(|jobs| jobs.values().flat_map(|fragments| fragments.values())), &control_stream_manager);
792                builder.add_relations(&fragment_relations);
793                let mut edges = builder.build();
794
795                let mut collected_databases = HashMap::new();
796                let mut collecting_databases = HashMap::new();
797                let mut failed_databases = HashSet::new();
798                for (database_id, jobs) in database_job_infos {
799                    let result = control_stream_manager.inject_database_initial_barrier(
800                        database_id,
801                        jobs,
802                        &mut state_table_committed_epochs,
803                        &mut state_table_log_epochs,
804                        &mut edges,
805                        &stream_actors,
806                        &mut source_splits,
807                        &mut background_jobs,
808                        &mut mv_depended_subscriptions,
809                        is_paused,
810                        &hummock_version_stats,
811                        &mut cdc_table_snapshot_split_assignment,
812                    );
813                    let node_to_collect = match result {
814                        Ok(info) => {
815                            info
816                        }
817                        Err(e) => {
818                            warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
819                            assert!(failed_databases.insert(database_id), "non-duplicate");
820                            continue;
821                        }
822                    };
823                    if !node_to_collect.is_collected() {
824                        assert!(collecting_databases.insert(database_id, node_to_collect).is_none());
825                    } else {
826                        warn!(%database_id, "database has no node to inject initial barrier");
827                        assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
828                    }
829                }
830                while !collecting_databases.is_empty() {
831                    let (worker_id, result) =
832                        control_stream_manager.next_response(&term_id, &self.context).await;
833                    let resp = match result {
834                        Err(e) => {
835                            warn!(worker_id, err = %e.as_report(), "worker node failure during recovery");
836                            for (failed_database_id, _) in collecting_databases.extract_if(|_, node_to_collect| {
837                                !node_to_collect.is_valid_after_worker_err(worker_id)
838                            }) {
839                                warn!(%failed_database_id, worker_id, "database failed to recovery in global recovery due to worker node err");
840                                assert!(failed_databases.insert(failed_database_id));
841                            }
842                            continue;
843                        }
844                        Ok(resp) => {
845                            match resp {
846                                Response::CompleteBarrier(resp) => {
847                                    resp
848                                }
849                                Response::ReportDatabaseFailure(resp) => {
850                                    let database_id = resp.database_id;
851                                    if collecting_databases.remove(&database_id).is_some() {
852                                        warn!(%database_id, worker_id, "database reset during global recovery");
853                                        assert!(failed_databases.insert(database_id));
854                                    } else if collected_databases.remove(&database_id).is_some() {
855                                        warn!(%database_id, worker_id, "database initialized but later reset during global recovery");
856                                        assert!(failed_databases.insert(database_id));
857                                    } else {
858                                        assert!(failed_databases.contains(&database_id));
859                                    }
860                                    continue;
861                                }
862                                other @ (Response::Init(_) | Response::Shutdown(_) | Response::ResetDatabase(_)) => {
863                                    return Err(anyhow!("get unexpected resp {:?}", other).into());
864                                }
865                            }
866                        }
867                    };
868                    assert_eq!(worker_id, resp.worker_id as WorkerId);
869                    let database_id = resp.database_id;
870                    if failed_databases.contains(&database_id) {
871                        assert!(!collecting_databases.contains_key(&database_id));
872                        // ignore the lately arrived collect resp of failed database
873                        continue;
874                    }
875                    let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
876                        unreachable!("should exist")
877                    };
878                    let node_to_collect = entry.get_mut();
879                    node_to_collect.collect_resp(resp);
880                    if node_to_collect.is_collected() {
881                        let node_to_collect = entry.remove();
882                        assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
883                    }
884                }
885                debug!("collected initial barrier");
886                if !stream_actors.is_empty() {
887                    warn!(actor_ids = ?stream_actors.keys().collect_vec(), "unused stream actors in recovery");
888                }
889                if !source_splits.is_empty() {
890                    warn!(actor_ids = ?source_splits.keys().collect_vec(), "unused actor source splits in recovery");
891                }
892                if !background_jobs.is_empty() {
893                    warn!(job_ids = ?background_jobs.keys().collect_vec(), "unused recovered background mview in recovery");
894                }
895                if !mv_depended_subscriptions.is_empty() {
896                    warn!(?mv_depended_subscriptions, "unused subscription infos in recovery");
897                }
898                if !state_table_committed_epochs.is_empty() {
899                    warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
900                }
901                if !enable_per_database_isolation && !failed_databases.is_empty() {
902                    return Err(anyhow!(
903                        "global recovery failed due to failure of databases {:?}",
904                        failed_databases.iter().map(|database_id| database_id.as_raw_id()).collect_vec()).into()
905                    );
906                }
907                let checkpoint_control = CheckpointControl::recover(
908                    collected_databases,
909                    failed_databases,
910                    &mut control_stream_manager,
911                    hummock_version_stats,
912                    self.env.clone(),
913                );
914
915                let reader = self.env.system_params_reader().await;
916                let checkpoint_frequency = reader.checkpoint_frequency();
917                let barrier_interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
918                let periodic_barriers = PeriodicBarriers::new(
919                    barrier_interval,
920                    checkpoint_frequency,
921                    database_infos,
922                );
923
924                Ok((
925                    active_streaming_nodes,
926                    control_stream_manager,
927                    checkpoint_control,
928                    term_id,
929                    periodic_barriers,
930                ))
931            }
932        }.inspect_err(|err: &MetaError| {
933            tracing::error!(error = %err.as_report(), "recovery failed");
934            event_log_manager_ref.add_event_logs(vec![Event::Recovery(
935                EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
936            )]);
937            GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
938        }))
939            .instrument(tracing::info_span!("recovery_attempt"))
940            .await
941            .expect("Retry until recovery success.");
942
943        let duration = recovery_timer.stop_and_record();
944
945        (
946            self.active_streaming_nodes,
947            self.control_stream_manager,
948            self.checkpoint_control,
949            self.term_id,
950            self.periodic_barriers,
951        ) = new_state;
952
953        tracing::info!("recovery success");
954
955        let recovering_databases = self
956            .checkpoint_control
957            .recovering_databases()
958            .map(|database| database.as_raw_id())
959            .collect_vec();
960        let running_databases = self
961            .checkpoint_control
962            .running_databases()
963            .map(|database| database.as_raw_id())
964            .collect_vec();
965
966        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
967            EventRecovery::global_recovery_success(
968                recovery_reason.clone(),
969                duration as f32,
970                running_databases,
971                recovering_databases,
972            ),
973        )]);
974
975        self.env
976            .notification_manager()
977            .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
978        self.env
979            .notification_manager()
980            .notify_compute_without_version(Operation::Update, Info::Recovery(Recovery {}));
981    }
982}