risingwave_meta/barrier/
worker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::anyhow;
22use arc_swap::ArcSwap;
23use futures::TryFutureExt;
24use itertools::Itertools;
25use risingwave_common::catalog::DatabaseId;
26use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_pb::meta::Recovery;
29use risingwave_pb::meta::subscribe_response::{Info, Operation};
30use risingwave_pb::stream_service::streaming_control_stream_response::Response;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tokio::sync::oneshot::{Receiver, Sender};
34use tokio::task::JoinHandle;
35use tokio::time::Instant;
36use tonic::Status;
37use tracing::{Instrument, debug, error, info, warn};
38use uuid::Uuid;
39
40use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
41use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
42use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
43use crate::barrier::rpc::{ControlStreamManager, merge_node_rpc_errors};
44use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
45use crate::barrier::{
46    BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command,
47    RecoveryReason, schedule,
48};
49use crate::error::MetaErrorInner;
50use crate::hummock::HummockManagerRef;
51use crate::manager::sink_coordination::SinkCoordinatorManager;
52use crate::manager::{
53    ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
54    MetadataManager,
55};
56use crate::rpc::metrics::GLOBAL_META_METRICS;
57use crate::stream::{ScaleControllerRef, SourceManagerRef};
58use crate::{MetaError, MetaResult};
59
60/// [`crate::barrier::worker::GlobalBarrierWorker`] sends barriers to all registered compute nodes and
61/// collect them, with monotonic increasing epoch numbers. On compute nodes, `LocalBarrierManager`
62/// in `risingwave_stream` crate will serve these requests and dispatch them to source actors.
63///
64/// Configuration change in our system is achieved by the mutation in the barrier. Thus,
65/// [`crate::barrier::worker::GlobalBarrierWorker`] provides a set of interfaces like a state machine,
66/// accepting [`crate::barrier::command::Command`] that carries info to build `Mutation`. To keep the consistency between
67/// barrier manager and meta store, some actions like "drop materialized view" or "create mv on mv"
68/// must be done in barrier manager transactional using [`crate::barrier::command::Command`].
69pub(super) struct GlobalBarrierWorker<C> {
70    /// Enable recovery or not when failover.
71    enable_recovery: bool,
72
73    /// The queue of scheduled barriers.
74    periodic_barriers: PeriodicBarriers,
75
76    /// The max barrier nums in flight
77    in_flight_barrier_nums: usize,
78
79    /// Whether per database failure isolation is enabled in system parameters.
80    system_enable_per_database_isolation: bool,
81
82    pub(super) context: Arc<C>,
83
84    env: MetaSrvEnv,
85
86    checkpoint_control: CheckpointControl,
87
88    /// Command that has been collected but is still completing.
89    /// The join handle of the completing future is stored.
90    completing_task: CompletingTask,
91
92    request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
93
94    active_streaming_nodes: ActiveStreamingWorkerNodes,
95
96    sink_manager: SinkCoordinatorManager,
97
98    control_stream_manager: ControlStreamManager,
99
100    term_id: String,
101}
102
103impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
104    /// Create a new [`crate::barrier::worker::GlobalBarrierWorker`].
105    pub async fn new(
106        scheduled_barriers: schedule::ScheduledBarriers,
107        env: MetaSrvEnv,
108        metadata_manager: MetadataManager,
109        hummock_manager: HummockManagerRef,
110        source_manager: SourceManagerRef,
111        sink_manager: SinkCoordinatorManager,
112        scale_controller: ScaleControllerRef,
113        request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
114    ) -> Self {
115        let enable_recovery = env.opts.enable_recovery;
116        let in_flight_barrier_nums = env.opts.in_flight_barrier_nums;
117
118        let active_streaming_nodes =
119            ActiveStreamingWorkerNodes::uninitialized(metadata_manager.clone());
120
121        let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
122
123        let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
124            scheduled_barriers,
125            status,
126            metadata_manager,
127            hummock_manager,
128            source_manager,
129            scale_controller,
130            env.clone(),
131        ));
132
133        let control_stream_manager = ControlStreamManager::new(env.clone());
134
135        let reader = env.system_params_reader().await;
136        let checkpoint_frequency = reader.checkpoint_frequency() as _;
137        let system_enable_per_database_isolation = reader.per_database_isolation();
138        let interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
139        let periodic_barriers = PeriodicBarriers::new(interval, checkpoint_frequency);
140        tracing::info!(
141            "Starting barrier scheduler with: checkpoint_frequency={:?}",
142            checkpoint_frequency,
143        );
144
145        let checkpoint_control = CheckpointControl::new(env.clone());
146        Self {
147            enable_recovery,
148            periodic_barriers,
149            in_flight_barrier_nums,
150            system_enable_per_database_isolation,
151            context,
152            env,
153            checkpoint_control,
154            completing_task: CompletingTask::None,
155            request_rx,
156            active_streaming_nodes,
157            sink_manager,
158            control_stream_manager,
159            term_id: "uninitialized".into(),
160        }
161    }
162
163    pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
164        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
165        let join_handle = tokio::spawn(async move {
166            self.run(shutdown_rx).await;
167        });
168
169        (join_handle, shutdown_tx)
170    }
171
172    /// Check whether we should pause on bootstrap from the system parameter and reset it.
173    async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
174        let paused = self
175            .env
176            .system_params_reader()
177            .await
178            .pause_on_next_bootstrap();
179        if paused {
180            warn!(
181                "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
182                 It will now be reset to `false`. \
183                 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
184                PAUSE_ON_NEXT_BOOTSTRAP_KEY
185            );
186            self.env
187                .system_params_manager_impl_ref()
188                .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
189                .await?;
190        }
191        Ok(paused)
192    }
193
194    /// Start an infinite loop to take scheduled barriers and send them.
195    async fn run(mut self, shutdown_rx: Receiver<()>) {
196        tracing::info!(
197            "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
198            self.enable_recovery,
199            self.in_flight_barrier_nums,
200        );
201
202        if !self.enable_recovery {
203            let job_exist = self
204                .context
205                .metadata_manager
206                .catalog_controller
207                .has_any_streaming_jobs()
208                .await
209                .unwrap();
210            if job_exist {
211                panic!(
212                    "Some streaming jobs already exist in meta, please start with recovery enabled \
213                or clean up the metadata using `./risedev clean-data`"
214                );
215            }
216        }
217
218        {
219            // Bootstrap recovery. Here we simply trigger a recovery process to achieve the
220            // consistency.
221            // Even if there's no actor to recover, we still go through the recovery process to
222            // inject the first `Initial` barrier.
223            let span = tracing::info_span!("bootstrap_recovery");
224            crate::telemetry::report_event(
225                risingwave_pb::telemetry::TelemetryEventStage::Recovery,
226                "normal_recovery",
227                0,
228                None,
229                None,
230                None,
231            );
232
233            let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
234
235            self.recovery(paused, RecoveryReason::Bootstrap)
236                .instrument(span)
237                .await;
238        }
239
240        self.run_inner(shutdown_rx).await
241    }
242}
243
244impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
245    fn enable_per_database_isolation(&self) -> bool {
246        self.system_enable_per_database_isolation && {
247            if let Err(e) =
248                risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()
249            {
250                warn!(error = %e.as_report(), "DatabaseFailureIsolation disabled by license");
251                false
252            } else {
253                true
254            }
255        }
256    }
257
258    async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
259        let (local_notification_tx, mut local_notification_rx) =
260            tokio::sync::mpsc::unbounded_channel();
261        self.env
262            .notification_manager()
263            .insert_local_sender(local_notification_tx)
264            .await;
265
266        // Start the event loop.
267        loop {
268            tokio::select! {
269                biased;
270
271                // Shutdown
272                _ = &mut shutdown_rx => {
273                    tracing::info!("Barrier manager is stopped");
274                    break;
275                }
276
277                request = self.request_rx.recv() => {
278                    if let Some(request) = request {
279                        match request {
280                            BarrierManagerRequest::GetDdlProgress(result_tx) => {
281                                let progress = self.checkpoint_control.gen_ddl_progress();
282                                if result_tx.send(progress).is_err() {
283                                    error!("failed to send get ddl progress");
284                                }
285                            }// Handle adhoc recovery triggered by user.
286                            BarrierManagerRequest::AdhocRecovery(sender) => {
287                                self.adhoc_recovery().await;
288                                if sender.send(()).is_err() {
289                                    warn!("failed to notify finish of adhoc recovery");
290                                }
291                            }
292                        }
293                    } else {
294                        tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
295                        return;
296                    }
297                }
298
299                changed_worker = self.active_streaming_nodes.changed() => {
300                    #[cfg(debug_assertions)]
301                    {
302                        self.active_streaming_nodes.validate_change().await;
303                    }
304
305                    info!(?changed_worker, "worker changed");
306
307                    if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker {
308                        self.control_stream_manager.add_worker(node, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
309                    }
310                }
311
312                notification = local_notification_rx.recv() => {
313                    let notification = notification.unwrap();
314                    if let LocalNotification::SystemParamsChange(p) = notification {
315                        {
316                            self.periodic_barriers.set_min_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
317                            self.periodic_barriers
318                                .set_checkpoint_frequency(p.checkpoint_frequency() as usize);
319                            self.system_enable_per_database_isolation = p.per_database_isolation();
320                        }
321                    }
322                }
323                complete_result = self
324                    .completing_task
325                    .next_completed_barrier(
326                        &mut self.periodic_barriers,
327                        &mut self.checkpoint_control,
328                        &mut self.control_stream_manager,
329                        &self.context,
330                        &self.env,
331                ) => {
332                    match complete_result {
333                        Ok(output) => {
334                            self.checkpoint_control.ack_completed(output);
335                        }
336                        Err(e) => {
337                            self.failure_recovery(e).await;
338                        }
339                    }
340                },
341                event = self.checkpoint_control.next_event() => {
342                    let result: MetaResult<()> = try {
343                        match event {
344                            CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
345                                let database_id = entering_initializing.database_id();
346                                let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
347                                    resp.root_err.as_ref().map(|root_err| {
348                                        (*worker_id, ScoredError {
349                                            error: Status::internal(&root_err.err_msg),
350                                            score: Score(root_err.score)
351                                        })
352                                    })
353                                }));
354                                Self::report_collect_failure(&self.env, &error);
355                                self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
356                                match self.context.reload_database_runtime_info(database_id).await? { Some(runtime_info) => {
357                                    runtime_info.validate(database_id, &self.active_streaming_nodes).inspect_err(|e| {
358                                        warn!(database_id = database_id.database_id, err = ?e.as_report(), ?runtime_info, "reloaded database runtime info failed to validate");
359                                    })?;
360                                    let workers = InflightFragmentInfo::workers(runtime_info.job_infos.values().flat_map(|job| job.fragment_infos()));
361                                    for worker_id in workers {
362                                        if !self.control_stream_manager.is_connected(worker_id) {
363                                            self.control_stream_manager.try_reconnect_worker(worker_id, entering_initializing.control().inflight_infos(), self.term_id.clone(), &*self.context).await;
364                                        }
365                                    }
366                                    entering_initializing.enter(runtime_info, &mut self.control_stream_manager);
367                                } _ => {
368                                    info!(database_id = database_id.database_id, "database removed after reloading empty runtime info");
369                                    // mark ready to unblock subsequent request
370                                    self.context.mark_ready(MarkReadyOptions::Database(database_id));
371                                    entering_initializing.remove();
372                                }}
373                            }
374                            CheckpointControlEvent::EnteringRunning(entering_running) => {
375                                self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
376                                entering_running.enter();
377                            }
378                        }
379                    };
380                    if let Err(e) = result {
381                        self.failure_recovery(e).await;
382                    }
383                }
384                (worker_id, resp_result) = self.control_stream_manager.next_response() => {
385                    let result: MetaResult<()> = try {
386                        let resp = match resp_result {
387                            Err(err) => {
388                                let failed_databases = self.checkpoint_control.databases_failed_at_worker_err(worker_id);
389                                if !failed_databases.is_empty() {
390                                    if !self.enable_recovery {
391                                        panic!("control stream to worker {} failed but recovery not enabled: {:?}", worker_id, err.as_report());
392                                    }
393                                    if !self.enable_per_database_isolation() {
394                                        Err(err.clone())?;
395                                    }
396                                    Self::report_collect_failure(&self.env, &err);
397                                    for database_id in failed_databases {
398                                        if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
399                                            warn!(worker_id, database_id = database_id.database_id, "database entering recovery on node failure");
400                                            self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
401                                            self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
402                                            // TODO: add log on blocking time
403                                            let output = self.completing_task.wait_completing_task().await?;
404                                            entering_recovery.enter(output, &mut self.control_stream_manager);
405                                        }
406                                    }
407                                }  else {
408                                    warn!(worker_id, "no barrier to collect from worker, ignore err");
409                                }
410                                continue;
411                            }
412                            Ok(resp) => resp,
413                        };
414                        match resp {
415                            Response::CompleteBarrier(resp) => {
416                                self.checkpoint_control.barrier_collected(resp, &mut self.periodic_barriers)?;
417                            },
418                            Response::ReportDatabaseFailure(resp) => {
419                                if !self.enable_recovery {
420                                    panic!("database failure reported but recovery not enabled: {:?}", resp)
421                                }
422                                if !self.enable_per_database_isolation() {
423                                        Err(anyhow!("database {} reset", resp.database_id))?;
424                                    }
425                                let database_id = DatabaseId::new(resp.database_id);
426                                if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
427                                    warn!(database_id = database_id.database_id, "database entering recovery");
428                                    self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
429                                    // TODO: add log on blocking time
430                                    let output = self.completing_task.wait_completing_task().await?;
431                                    entering_recovery.enter(output, &mut self.control_stream_manager);
432                                }
433                            }
434                            Response::ResetDatabase(resp) => {
435                                self.checkpoint_control.on_reset_database_resp(worker_id, resp);
436                            }
437                            other @ Response::Init(_) | other @ Response::Shutdown(_) => {
438                                Err(anyhow!("get expected response: {:?}", other))?;
439                            }
440                        }
441                    };
442                    if let Err(e) = result {
443                        self.failure_recovery(e).await;
444                    }
445                }
446                new_barrier = self.periodic_barriers.next_barrier(&*self.context),
447                    if self
448                        .checkpoint_control
449                        .can_inject_barrier(self.in_flight_barrier_nums) => {
450                    if let Some((_, Command::CreateStreamingJob { info, .. }, _)) = &new_barrier.command {
451                        let worker_ids: HashSet<_> =
452                            info.stream_job_fragments.inner
453                            .actors_to_create()
454                            .flat_map(|(_, _, actors)|
455                                actors.map(|(_, worker_id)| worker_id)
456                            )
457                            .collect();
458                        for worker_id in worker_ids {
459                            if !self.control_stream_manager.is_connected(worker_id) {
460                                self.control_stream_manager.try_reconnect_worker(worker_id, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
461                            }
462                        }
463                    }
464                    if let Some(failed_databases) = self.checkpoint_control.handle_new_barrier(new_barrier, &mut self.control_stream_manager)
465                        && !failed_databases.is_empty() {
466                        if !self.enable_recovery {
467                            panic!(
468                                "failed to inject barrier for some databases but recovery not enabled: {:?}",
469                                failed_databases.iter().map(|(database_id, e)| (database_id, e.as_report())).collect_vec()
470                            );
471                        }
472                        let result: MetaResult<_> = try {
473                            if !self.enable_per_database_isolation() {
474                                let errs = failed_databases.iter().map(|(database_id, e)| (database_id, e.as_report())).collect_vec();
475                                let err = anyhow!("failed to inject barrier for databases: {:?}", errs);
476                                Err(err)?;
477                            } else {
478                                for (database_id, err) in failed_databases {
479                                    if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
480                                        warn!(%database_id, e = %err.as_report(),"database entering recovery on inject failure");
481                                        self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(err).context("inject barrier failure").into()));
482                                        // TODO: add log on blocking time
483                                        let output = self.completing_task.wait_completing_task().await?;
484                                        entering_recovery.enter(output, &mut self.control_stream_manager);
485                                    }
486                                }
487                            }
488                        };
489                        if let Err(e) = result {
490                            self.failure_recovery(e).await;
491                        }
492                    }
493                }
494            }
495            self.checkpoint_control.update_barrier_nums_metrics();
496        }
497    }
498}
499
500impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
501    /// We need to make sure there are no changes when doing recovery
502    pub async fn clear_on_err(&mut self, err: &MetaError) {
503        // join spawned completing command to finish no matter it succeeds or not.
504        let is_err = match replace(&mut self.completing_task, CompletingTask::None) {
505            CompletingTask::None => false,
506            CompletingTask::Completing {
507                epochs_to_ack,
508                join_handle,
509                ..
510            } => {
511                info!("waiting for completing command to finish in recovery");
512                match join_handle.await {
513                    Err(e) => {
514                        warn!(err = %e.as_report(), "failed to join completing task");
515                        true
516                    }
517                    Ok(Err(e)) => {
518                        warn!(
519                            err = %e.as_report(),
520                            "failed to complete barrier during clear"
521                        );
522                        true
523                    }
524                    Ok(Ok(hummock_version_stats)) => {
525                        self.checkpoint_control
526                            .ack_completed(BarrierCompleteOutput {
527                                epochs_to_ack,
528                                hummock_version_stats,
529                            });
530                        false
531                    }
532                }
533            }
534            CompletingTask::Err(_) => true,
535        };
536        if !is_err {
537            // continue to finish the pending collected barrier.
538            while let Some(task) = self.checkpoint_control.next_complete_barrier_task(None) {
539                let epochs_to_ack = task.epochs_to_ack();
540                match task
541                    .complete_barrier(&*self.context, self.env.clone())
542                    .await
543                {
544                    Ok(hummock_version_stats) => {
545                        self.checkpoint_control
546                            .ack_completed(BarrierCompleteOutput {
547                                epochs_to_ack,
548                                hummock_version_stats,
549                            });
550                    }
551                    Err(e) => {
552                        error!(
553                            err = %e.as_report(),
554                            "failed to complete barrier during recovery"
555                        );
556                        break;
557                    }
558                }
559            }
560        }
561        self.checkpoint_control.clear_on_err(err);
562    }
563}
564
565impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
566    /// Set barrier manager status.
567    async fn failure_recovery(&mut self, err: MetaError) {
568        self.clear_on_err(&err).await;
569
570        if self.enable_recovery {
571            let span = tracing::info_span!(
572                "failure_recovery",
573                error = %err.as_report(),
574            );
575
576            crate::telemetry::report_event(
577                risingwave_pb::telemetry::TelemetryEventStage::Recovery,
578                "failure_recovery",
579                0,
580                None,
581                None,
582                None,
583            );
584
585            let reason = RecoveryReason::Failover(err);
586
587            // No need to clean dirty tables for barrier recovery,
588            // The foreground stream job should cleanup their own tables.
589            self.recovery(false, reason).instrument(span).await;
590        } else {
591            panic!(
592                "a streaming error occurred while recovery is disabled, aborting: {:?}",
593                err.as_report()
594            );
595        }
596    }
597
598    async fn adhoc_recovery(&mut self) {
599        let err = MetaErrorInner::AdhocRecovery.into();
600        self.clear_on_err(&err).await;
601
602        let span = tracing::info_span!(
603            "adhoc_recovery",
604            error = %err.as_report(),
605        );
606
607        crate::telemetry::report_event(
608            risingwave_pb::telemetry::TelemetryEventStage::Recovery,
609            "adhoc_recovery",
610            0,
611            None,
612            None,
613            None,
614        );
615
616        // No need to clean dirty tables for barrier recovery,
617        // The foreground stream job should cleanup their own tables.
618        self.recovery(false, RecoveryReason::Adhoc)
619            .instrument(span)
620            .await;
621    }
622}
623
624impl<C> GlobalBarrierWorker<C> {
625    /// Send barrier-complete-rpc and wait for responses from all CNs
626    pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
627        // Record failure in event log.
628        use risingwave_pb::meta::event_log;
629        let event = event_log::EventCollectBarrierFail {
630            error: error.to_report_string(),
631        };
632        env.event_log_manager_ref()
633            .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
634    }
635}
636
637mod retry_strategy {
638    use std::time::Duration;
639
640    use tokio_retry::strategy::{ExponentialBackoff, jitter};
641
642    // Retry base interval in milliseconds.
643    const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
644    // Retry max interval.
645    const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
646
647    mod retry_backoff_future {
648        use std::future::Future;
649        use std::time::Duration;
650
651        use tokio::time::sleep;
652
653        pub(crate) type RetryBackoffFuture = impl Future<Output = ()> + Unpin + Send + 'static;
654        pub(super) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
655            Box::pin(sleep(duration))
656        }
657    }
658    pub(crate) use retry_backoff_future::*;
659
660    pub(crate) type RetryBackoffStrategy =
661        impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
662
663    #[inline(always)]
664    /// Initialize a retry strategy for operation in recovery.
665    pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> {
666        ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
667            .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
668            .map(jitter)
669    }
670
671    pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
672        get_retry_strategy().map(get_retry_backoff_future)
673    }
674}
675
676pub(crate) use retry_strategy::*;
677use risingwave_common::error::tonic::extra::{Score, ScoredError};
678use risingwave_meta_model::WorkerId;
679use risingwave_pb::meta::event_log::{Event, EventRecovery};
680
681use crate::barrier::edge_builder::FragmentEdgeBuilder;
682use crate::controller::fragment::InflightFragmentInfo;
683
684impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
685    /// Recovery the whole cluster from the latest epoch.
686    ///
687    /// If `paused_reason` is `Some`, all data sources (including connectors and DMLs) will be
688    /// immediately paused after recovery, until the user manually resume them either by restarting
689    /// the cluster or `risectl` command. Used for debugging purpose.
690    ///
691    /// Returns the new state of the barrier manager after recovery.
692    pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
693        // Clear all control streams to release resources (connections to compute nodes) first.
694        self.control_stream_manager.clear();
695
696        let reason_str = match &recovery_reason {
697            RecoveryReason::Bootstrap => "bootstrap".to_owned(),
698            RecoveryReason::Failover(err) => {
699                format!("failed over: {}", err.as_report())
700            }
701            RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
702        };
703
704        self.context.abort_and_mark_blocked(None, recovery_reason);
705
706        self.recovery_inner(is_paused, reason_str).await;
707        self.context.mark_ready(MarkReadyOptions::Global {
708            blocked_databases: self.checkpoint_control.recovering_databases().collect(),
709        });
710    }
711
712    async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
713        let event_log_manager_ref = self.env.event_log_manager_ref();
714
715        tracing::info!("recovery start!");
716        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
717            EventRecovery::global_recovery_start(recovery_reason.clone()),
718        )]);
719
720        let retry_strategy = get_retry_strategy();
721
722        // We take retry into consideration because this is the latency user sees for a cluster to
723        // get recovered.
724        let recovery_timer = GLOBAL_META_METRICS
725            .recovery_latency
726            .with_label_values(&["global"])
727            .start_timer();
728
729        let enable_per_database_isolation = self.enable_per_database_isolation();
730
731        let new_state = tokio_retry::Retry::spawn(retry_strategy, || async {
732            self.env.stream_client_pool().invalidate_all();
733            // We need to notify_creating_job_failed in every recovery retry, because in outer create_streaming_job handler,
734            // it holds the reschedule_read_lock and wait for creating job to finish, and caused the following scale_actor fail
735            // to acquire the reschedule_write_lock, and then keep recovering, and then deadlock.
736            // TODO: refactor and fix this hacky implementation.
737            self.context
738                .notify_creating_job_failed(None, recovery_reason.clone())
739                .await;
740            let runtime_info_snapshot = self
741                .context
742                .reload_runtime_info()
743                .await?;
744            runtime_info_snapshot.validate().inspect_err(|e| {
745                warn!(err = ?e.as_report(), ?runtime_info_snapshot, "reloaded runtime info failed to validate");
746            })?;
747            let BarrierWorkerRuntimeInfoSnapshot {
748                active_streaming_nodes,
749                database_job_infos,
750                mut state_table_committed_epochs,
751                mut state_table_log_epochs,
752                mut subscription_infos,
753                stream_actors,
754                fragment_relations,
755                mut source_splits,
756                mut background_jobs,
757                hummock_version_stats,
758            } = runtime_info_snapshot;
759
760            self.sink_manager.reset().await;
761            let term_id = Uuid::new_v4().to_string();
762
763            let mut control_stream_manager = ControlStreamManager::new(self.env.clone());
764            let reset_start_time = Instant::now();
765            let unconnected_worker = control_stream_manager
766                .reset(
767                    active_streaming_nodes.current(),
768                    term_id.clone(),
769                    &*self.context,
770                )
771                .await;
772            info!(elapsed=?reset_start_time.elapsed(), ?unconnected_worker, "control stream reset");
773
774            {
775                let mut builder = FragmentEdgeBuilder::new(database_job_infos.values().flat_map(|info| info.values().flatten()), &control_stream_manager);
776                builder.add_relations(&fragment_relations);
777                let mut edges = builder.build();
778
779                let mut collected_databases = HashMap::new();
780                let mut collecting_databases = HashMap::new();
781                let mut failed_databases = HashSet::new();
782                for (database_id, jobs) in database_job_infos {
783                    let result = control_stream_manager.inject_database_initial_barrier(
784                        database_id,
785                        jobs,
786                        &mut state_table_committed_epochs,
787                        &mut state_table_log_epochs,
788                        &mut edges,
789                        &stream_actors,
790                        &mut source_splits,
791                        &mut background_jobs,
792                        subscription_infos.remove(&database_id).unwrap_or_default(),
793                        is_paused,
794                        &hummock_version_stats,
795                    );
796                    let node_to_collect = match result {
797                        Ok(info) => {
798                            info
799                        }
800                        Err(e) => {
801                            warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
802                            assert!(failed_databases.insert(database_id), "non-duplicate");
803                            continue;
804                        }
805                    };
806                    if !node_to_collect.is_collected() {
807                        assert!(collecting_databases.insert(database_id, node_to_collect).is_none());
808                    } else {
809                        warn!(database_id = database_id.database_id, "database has no node to inject initial barrier");
810                        assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
811                    }
812                }
813                while !collecting_databases.is_empty() {
814                    let (worker_id, result) =
815                        control_stream_manager.next_response().await;
816                    let resp = match result {
817                        Err(e) => {
818                            warn!(worker_id, err = %e.as_report(), "worker node failure during recovery");
819                            for (failed_database_id,_ ) in collecting_databases.extract_if(|_, node_to_collect| {
820                                !node_to_collect.is_valid_after_worker_err(worker_id)
821                            }) {
822                                warn!(%failed_database_id, worker_id, "database failed to recovery in global recovery due to worker node err");
823                                assert!(failed_databases.insert(failed_database_id));
824                            }
825                            continue;
826                        }
827                        Ok(resp) => {
828                            match resp {
829                                Response::CompleteBarrier(resp) => {
830                                    resp
831                                }
832                                Response::ReportDatabaseFailure(resp) => {
833                                    let database_id = DatabaseId::new(resp.database_id);
834                                    if collecting_databases.remove(&database_id).is_some() {
835                                        warn!(%database_id, worker_id, "database reset during global recovery");
836                                        assert!(failed_databases.insert(database_id));
837                                    } else if collected_databases.remove(&database_id).is_some() {
838                                        warn!(%database_id, worker_id, "database initialized but later reset during global recovery");
839                                        assert!(failed_databases.insert(database_id));
840                                    } else {
841                                        assert!(failed_databases.contains(&database_id));
842                                    }
843                                    continue;
844                                }
845                                other @ (Response::Init(_) | Response::Shutdown(_) | Response::ResetDatabase(_))=> {
846                                    return Err(anyhow!("get unexpected resp {:?}", other).into());
847                                }
848                            }
849                        }
850                    };
851                    assert_eq!(worker_id, resp.worker_id as WorkerId);
852                    let database_id = DatabaseId::new(resp.database_id);
853                    if failed_databases.contains(&database_id) {
854                        assert!(!collecting_databases.contains_key(&database_id));
855                        // ignore the lately arrived collect resp of failed database
856                        continue;
857                    }
858                    let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
859                        unreachable!("should exist")
860                    };
861                    let node_to_collect = entry.get_mut();
862                    node_to_collect.collect_resp(resp);
863                    if node_to_collect.is_collected() {
864                        let node_to_collect = entry.remove();
865                        assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
866                    }
867                }
868                debug!("collected initial barrier");
869                if !stream_actors.is_empty() {
870                    warn!(actor_ids = ?stream_actors.keys().collect_vec(), "unused stream actors in recovery");
871                }
872                if !source_splits.is_empty() {
873                    warn!(actor_ids = ?source_splits.keys().collect_vec(), "unused actor source splits in recovery");
874                }
875                if !background_jobs.is_empty() {
876                    warn!(job_ids = ?background_jobs.keys().collect_vec(), "unused recovered background mview in recovery");
877                }
878                if !subscription_infos.is_empty() {
879                    warn!(?subscription_infos, "unused subscription infos in recovery");
880                }
881                if !state_table_committed_epochs.is_empty() {
882                    warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
883                }
884                if !enable_per_database_isolation && !failed_databases.is_empty() {
885                    return Err(anyhow!(
886                        "global recovery failed due to failure of databases {:?}",
887                        failed_databases.iter().map(|database_id| database_id.database_id).collect_vec()).into()
888                    );
889                }
890                let checkpoint_control = CheckpointControl::recover(
891                    collected_databases,
892                    failed_databases,
893                    &mut control_stream_manager,
894                    hummock_version_stats,
895                    self.env.clone(),
896                );
897                Ok((
898                    active_streaming_nodes,
899                    control_stream_manager,
900                    checkpoint_control,
901                    term_id,
902                ))
903            }
904        }.inspect_err(|err: &MetaError| {
905            tracing::error!(error = %err.as_report(), "recovery failed");
906            event_log_manager_ref.add_event_logs(vec![Event::Recovery(
907                EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
908            )]);
909            GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
910        }))
911            .instrument(tracing::info_span!("recovery_attempt"))
912            .await
913            .expect("Retry until recovery success.");
914
915        let duration = recovery_timer.stop_and_record();
916
917        (
918            self.active_streaming_nodes,
919            self.control_stream_manager,
920            self.checkpoint_control,
921            self.term_id,
922        ) = new_state;
923
924        tracing::info!("recovery success");
925
926        let recovering_databases = self
927            .checkpoint_control
928            .recovering_databases()
929            .map(|database| database.database_id)
930            .collect_vec();
931        let running_databases = self
932            .checkpoint_control
933            .running_databases()
934            .map(|database| database.database_id)
935            .collect_vec();
936
937        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
938            EventRecovery::global_recovery_success(
939                recovery_reason.clone(),
940                duration as f32,
941                running_databases,
942                recovering_databases,
943            ),
944        )]);
945
946        self.env
947            .notification_manager()
948            .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
949    }
950}