risingwave_meta/barrier/
worker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::anyhow;
22use arc_swap::ArcSwap;
23use futures::TryFutureExt;
24use itertools::Itertools;
25use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_pb::meta::Recovery;
28use risingwave_pb::meta::subscribe_response::{Info, Operation};
29use risingwave_pb::stream_service::streaming_control_stream_response::Response;
30use thiserror_ext::AsReport;
31use tokio::sync::mpsc;
32use tokio::sync::oneshot::{Receiver, Sender};
33use tokio::task::JoinHandle;
34use tonic::Status;
35use tracing::{Instrument, debug, error, info, warn};
36use uuid::Uuid;
37
38use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
39use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
40use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
41use crate::barrier::rpc::{ControlStreamManager, WorkerNodeEvent, merge_node_rpc_errors};
42use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
43use crate::barrier::{
44    BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, RecoveryReason,
45    schedule,
46};
47use crate::error::MetaErrorInner;
48use crate::hummock::HummockManagerRef;
49use crate::manager::sink_coordination::SinkCoordinatorManager;
50use crate::manager::{
51    ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
52    MetadataManager,
53};
54use crate::rpc::metrics::GLOBAL_META_METRICS;
55use crate::stream::{GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef};
56use crate::{MetaError, MetaResult};
57
58/// [`crate::barrier::worker::GlobalBarrierWorker`] sends barriers to all registered compute nodes and
59/// collect them, with monotonic increasing epoch numbers. On compute nodes, `LocalBarrierManager`
60/// in `risingwave_stream` crate will serve these requests and dispatch them to source actors.
61///
62/// Configuration change in our system is achieved by the mutation in the barrier. Thus,
63/// [`crate::barrier::worker::GlobalBarrierWorker`] provides a set of interfaces like a state machine,
64/// accepting [`crate::barrier::command::Command`] that carries info to build `Mutation`. To keep the consistency between
65/// barrier manager and meta store, some actions like "drop materialized view" or "create mv on mv"
66/// must be done in barrier manager transactional using [`crate::barrier::command::Command`].
67pub(super) struct GlobalBarrierWorker<C> {
68    /// Enable recovery or not when failover.
69    enable_recovery: bool,
70
71    /// The queue of scheduled barriers.
72    periodic_barriers: PeriodicBarriers,
73
74    /// Whether per database failure isolation is enabled in system parameters.
75    system_enable_per_database_isolation: bool,
76
77    pub(super) context: Arc<C>,
78
79    env: MetaSrvEnv,
80
81    checkpoint_control: CheckpointControl,
82
83    /// Command that has been collected but is still completing.
84    /// The join handle of the completing future is stored.
85    completing_task: CompletingTask,
86
87    request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
88
89    active_streaming_nodes: ActiveStreamingWorkerNodes,
90
91    control_stream_manager: ControlStreamManager,
92
93    term_id: String,
94}
95
96impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
97    pub(super) async fn new_inner(
98        env: MetaSrvEnv,
99        request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
100        context: Arc<C>,
101    ) -> Self {
102        let enable_recovery = env.opts.enable_recovery;
103
104        let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized();
105
106        let control_stream_manager = ControlStreamManager::new(env.clone());
107
108        let reader = env.system_params_reader().await;
109        let system_enable_per_database_isolation = reader.per_database_isolation();
110        // Load config will be performed in bootstrap phase.
111        let periodic_barriers = PeriodicBarriers::default();
112
113        let checkpoint_control = CheckpointControl::new(env.clone());
114        Self {
115            enable_recovery,
116            periodic_barriers,
117            system_enable_per_database_isolation,
118            context,
119            env,
120            checkpoint_control,
121            completing_task: CompletingTask::None,
122            request_rx,
123            active_streaming_nodes,
124            control_stream_manager,
125            term_id: "uninitialized".into(),
126        }
127    }
128}
129
130impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
131    /// Create a new [`crate::barrier::worker::GlobalBarrierWorker`].
132    pub async fn new(
133        scheduled_barriers: schedule::ScheduledBarriers,
134        env: MetaSrvEnv,
135        metadata_manager: MetadataManager,
136        hummock_manager: HummockManagerRef,
137        source_manager: SourceManagerRef,
138        sink_manager: SinkCoordinatorManager,
139        scale_controller: ScaleControllerRef,
140        request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
141        barrier_scheduler: schedule::BarrierScheduler,
142        refresh_manager: GlobalRefreshManagerRef,
143    ) -> Self {
144        let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
145
146        let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
147            scheduled_barriers,
148            status,
149            metadata_manager,
150            hummock_manager,
151            source_manager,
152            scale_controller,
153            env.clone(),
154            barrier_scheduler,
155            refresh_manager,
156            sink_manager,
157        ));
158
159        Self::new_inner(env, request_rx, context).await
160    }
161
162    pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
163        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
164        let fut = (self.env.await_tree_reg())
165            .register_derived_root("Global Barrier Worker")
166            .instrument(self.run(shutdown_rx));
167        let join_handle = tokio::spawn(fut);
168
169        (join_handle, shutdown_tx)
170    }
171
172    /// Check whether we should pause on bootstrap from the system parameter and reset it.
173    async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
174        let paused = self
175            .env
176            .system_params_reader()
177            .await
178            .pause_on_next_bootstrap()
179            || self.env.opts.pause_on_next_bootstrap_offline;
180
181        if paused {
182            warn!(
183                "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
184                 It will now be reset to `false`. \
185                 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
186                PAUSE_ON_NEXT_BOOTSTRAP_KEY
187            );
188            self.env
189                .system_params_manager_impl_ref()
190                .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
191                .await?;
192        }
193        Ok(paused)
194    }
195
196    /// Start an infinite loop to take scheduled barriers and send them.
197    async fn run(mut self, shutdown_rx: Receiver<()>) {
198        tracing::info!(
199            "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
200            self.enable_recovery,
201            self.checkpoint_control.in_flight_barrier_nums,
202        );
203
204        if !self.enable_recovery {
205            let job_exist = self
206                .context
207                .metadata_manager
208                .catalog_controller
209                .has_any_streaming_jobs()
210                .await
211                .unwrap();
212            if job_exist {
213                panic!(
214                    "Some streaming jobs already exist in meta, please start with recovery enabled \
215                or clean up the metadata using `./risedev clean-data`"
216                );
217            }
218        }
219
220        {
221            // Bootstrap recovery. Here we simply trigger a recovery process to achieve the
222            // consistency.
223            // Even if there's no actor to recover, we still go through the recovery process to
224            // inject the first `Initial` barrier.
225            let span = tracing::info_span!("bootstrap_recovery");
226            crate::telemetry::report_event(
227                risingwave_pb::telemetry::TelemetryEventStage::Recovery,
228                "normal_recovery",
229                0,
230                None,
231                None,
232                None,
233            );
234
235            let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
236
237            self.recovery(paused, RecoveryReason::Bootstrap)
238                .instrument(span)
239                .await;
240        }
241
242        self.run_inner(shutdown_rx).await
243    }
244}
245
246impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
247    fn enable_per_database_isolation(&self) -> bool {
248        self.system_enable_per_database_isolation && {
249            if let Err(e) =
250                risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()
251            {
252                warn!(error = %e.as_report(), "DatabaseFailureIsolation disabled by license");
253                false
254            } else {
255                true
256            }
257        }
258    }
259
260    pub(super) async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
261        let (local_notification_tx, mut local_notification_rx) =
262            tokio::sync::mpsc::unbounded_channel();
263        self.env
264            .notification_manager()
265            .insert_local_sender(local_notification_tx);
266
267        // Start the event loop.
268        loop {
269            tokio::select! {
270                biased;
271
272                // Shutdown
273                _ = &mut shutdown_rx => {
274                    tracing::info!("Barrier manager is stopped");
275                    break;
276                }
277
278                request = self.request_rx.recv() => {
279                    if let Some(request) = request {
280                        match request {
281                            BarrierManagerRequest::GetDdlProgress(result_tx) => {
282                                let progress = self.checkpoint_control.gen_ddl_progress();
283                                if result_tx.send(progress).is_err() {
284                                    error!("failed to send get ddl progress");
285                                }
286                            }
287                            BarrierManagerRequest::GetCdcProgress(result_tx) => {
288                                let progress = self.checkpoint_control.gen_cdc_progress();
289                                if result_tx.send(progress).is_err() {
290                                    error!("failed to send get ddl progress");
291                                }
292                            }
293                            // Handle adhoc recovery triggered by user.
294                            BarrierManagerRequest::AdhocRecovery(sender) => {
295                                self.adhoc_recovery().await;
296                                if sender.send(()).is_err() {
297                                    warn!("failed to notify finish of adhoc recovery");
298                                }
299                            }
300                            BarrierManagerRequest::UpdateDatabaseBarrier {
301                                database_id,
302                                barrier_interval_ms,
303                                checkpoint_frequency,
304                                sender,
305                            } => {
306                                self.periodic_barriers
307                                    .update_database_barrier(
308                                        database_id,
309                                        barrier_interval_ms,
310                                        checkpoint_frequency,
311                                    );
312                                if sender.send(()).is_err() {
313                                    warn!("failed to notify finish of update database barrier");
314                                }
315                            }
316                        }
317                    } else {
318                        tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
319                        return;
320                    }
321                }
322
323                changed_worker = self.active_streaming_nodes.changed() => {
324                    #[cfg(debug_assertions)]
325                    {
326                        self.active_streaming_nodes.validate_change().await;
327                    }
328
329                    info!(?changed_worker, "worker changed");
330
331                    match changed_worker {
332                        ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) => {
333                            self.control_stream_manager.add_worker(node, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
334                        }
335                        ActiveStreamingWorkerChange::Remove(node) => {
336                            self.control_stream_manager.remove_worker(node);
337                        }
338                    }
339                }
340
341                notification = local_notification_rx.recv() => {
342                    let notification = notification.unwrap();
343                    if let LocalNotification::SystemParamsChange(p) = notification {
344                        {
345                            self.periodic_barriers.set_sys_barrier_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
346                            self.periodic_barriers
347                                .set_sys_checkpoint_frequency(p.checkpoint_frequency());
348                            self.system_enable_per_database_isolation = p.per_database_isolation();
349                        }
350                    }
351                }
352                complete_result = self
353                    .completing_task
354                    .next_completed_barrier(
355                        &mut self.periodic_barriers,
356                        &mut self.checkpoint_control,
357                        &mut self.control_stream_manager,
358                        &self.context,
359                        &self.env,
360                ) => {
361                    match complete_result {
362                        Ok(output) => {
363                            self.checkpoint_control.ack_completed(output);
364                        }
365                        Err(e) => {
366                            self.failure_recovery(e).await;
367                        }
368                    }
369                },
370                event = self.checkpoint_control.next_event() => {
371                    let result: MetaResult<()> = try {
372                        match event {
373                            CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
374                                let database_id = entering_initializing.database_id();
375                                let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
376                                    resp.root_err.as_ref().map(|root_err| {
377                                        (*worker_id, ScoredError {
378                                            error: Status::internal(&root_err.err_msg),
379                                            score: Score(root_err.score)
380                                        })
381                                    })
382                                }));
383                                Self::report_collect_failure(&self.env, &error);
384                                self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
385                                match self.context.reload_database_runtime_info(database_id).await.and_then(|runtime_info| {
386                                    runtime_info.map(|runtime_info| {
387                                        runtime_info.validate(database_id, &self.active_streaming_nodes).inspect_err(|e| {
388                                            warn!(%database_id, err = ?e.as_report(), ?runtime_info, "reloaded database runtime info failed to validate");
389                                        })?;
390                                        Ok(runtime_info)
391                                    })
392                                    .transpose()
393                                }) {
394                                    Ok(Some(runtime_info)) => {
395                                        entering_initializing.enter(runtime_info, &mut self.control_stream_manager);
396                                    }
397                                    Ok(None) => {
398                                        info!(%database_id, "database removed after reloading empty runtime info");
399                                        // mark ready to unblock subsequent request
400                                        self.context.mark_ready(MarkReadyOptions::Database(database_id));
401                                        entering_initializing.remove();
402                                    }
403                                    Err(e) => {
404                                        entering_initializing.fail_reload_runtime_info(e);
405                                    }
406                                }
407                            }
408                            CheckpointControlEvent::EnteringRunning(entering_running) => {
409                                self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
410                                entering_running.enter();
411                            }
412                        }
413                    };
414                    if let Err(e) = result {
415                        self.failure_recovery(e).await;
416                    }
417                }
418                (worker_id, event) = self.control_stream_manager.next_event(&self.term_id, &self.context) => {
419                    let resp_result = match event {
420                        WorkerNodeEvent::Response(result) => {
421                            result
422                        }
423                        WorkerNodeEvent::Connected(connected) => {
424                            connected.initialize(self.checkpoint_control.inflight_infos());
425                            continue;
426                        }
427                    };
428                    let result: MetaResult<()> = try {
429                        let resp = match resp_result {
430                            Err(err) => {
431                                let failed_databases = self.checkpoint_control.databases_failed_at_worker_err(worker_id);
432                                if !failed_databases.is_empty() {
433                                    if !self.enable_recovery {
434                                        panic!("control stream to worker {} failed but recovery not enabled: {:?}", worker_id, err.as_report());
435                                    }
436                                    if !self.enable_per_database_isolation() {
437                                        Err(err.clone())?;
438                                    }
439                                    Self::report_collect_failure(&self.env, &err);
440                                    for database_id in failed_databases {
441                                        if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
442                                            warn!(%worker_id, %database_id, "database entering recovery on node failure");
443                                            self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
444                                            self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
445                                            // TODO: add log on blocking time
446                                            let output = self.completing_task.wait_completing_task().await?;
447                                            entering_recovery.enter(output, &mut self.control_stream_manager);
448                                        }
449                                    }
450                                }  else {
451                                    warn!(%worker_id, "no barrier to collect from worker, ignore err");
452                                }
453                                continue;
454                            }
455                            Ok(resp) => resp,
456                        };
457                        match resp {
458                            Response::CompleteBarrier(resp) => {
459                                self.checkpoint_control.barrier_collected(resp, &mut self.periodic_barriers)?;
460                            },
461                            Response::ReportDatabaseFailure(resp) => {
462                                if !self.enable_recovery {
463                                    panic!("database failure reported but recovery not enabled: {:?}", resp)
464                                }
465                                if !self.enable_per_database_isolation() {
466                                        Err(anyhow!("database {} reset", resp.database_id))?;
467                                    }
468                                let database_id = resp.database_id;
469                                if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
470                                    warn!(%database_id, "database entering recovery");
471                                    self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
472                                    // TODO: add log on blocking time
473                                    let output = self.completing_task.wait_completing_task().await?;
474                                    entering_recovery.enter(output, &mut self.control_stream_manager);
475                                }
476                            }
477                            Response::ResetDatabase(resp) => {
478                                self.checkpoint_control.on_reset_database_resp(worker_id, resp);
479                            }
480                            other @ Response::Init(_) | other @ Response::Shutdown(_) => {
481                                Err(anyhow!("get expected response: {:?}", other))?;
482                            }
483                        }
484                    };
485                    if let Err(e) = result {
486                        self.failure_recovery(e).await;
487                    }
488                }
489                new_barrier = self.periodic_barriers.next_barrier(&*self.context) => {
490                    let database_id = new_barrier.database_id;
491                    if let Err(e) = self.checkpoint_control.handle_new_barrier(new_barrier, &mut self.control_stream_manager) {
492                        if !self.enable_recovery {
493                            panic!(
494                                "failed to inject barrier to some databases but recovery not enabled: {:?}", (
495                                    database_id,
496                                    e.as_report()
497                                )
498                            );
499                        }
500                        let result: MetaResult<_> = try {
501                            if !self.enable_per_database_isolation() {
502                                let err = anyhow!("failed to inject barrier to databases: {:?}", (database_id, e.as_report()));
503                                Err(err)?;
504                            } else if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
505                                warn!(%database_id, e = %e.as_report(),"database entering recovery on inject failure");
506                                self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(e).context("inject barrier failure").into()));
507                                // TODO: add log on blocking time
508                                let output = self.completing_task.wait_completing_task().await?;
509                                entering_recovery.enter(output, &mut self.control_stream_manager);
510                            }
511                        };
512                        if let Err(e) = result {
513                            self.failure_recovery(e).await;
514                        }
515                    }
516                }
517            }
518            self.checkpoint_control.update_barrier_nums_metrics();
519        }
520    }
521}
522
523impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
524    /// We need to make sure there are no changes when doing recovery
525    pub async fn clear_on_err(&mut self, err: &MetaError) {
526        // join spawned completing command to finish no matter it succeeds or not.
527        let is_err = match replace(&mut self.completing_task, CompletingTask::None) {
528            CompletingTask::None => false,
529            CompletingTask::Completing {
530                epochs_to_ack,
531                join_handle,
532                ..
533            } => {
534                info!("waiting for completing command to finish in recovery");
535                match join_handle.await {
536                    Err(e) => {
537                        warn!(err = %e.as_report(), "failed to join completing task");
538                        true
539                    }
540                    Ok(Err(e)) => {
541                        warn!(
542                            err = %e.as_report(),
543                            "failed to complete barrier during clear"
544                        );
545                        true
546                    }
547                    Ok(Ok(hummock_version_stats)) => {
548                        self.checkpoint_control
549                            .ack_completed(BarrierCompleteOutput {
550                                epochs_to_ack,
551                                hummock_version_stats,
552                            });
553                        false
554                    }
555                }
556            }
557            CompletingTask::Err(_) => true,
558        };
559        if !is_err {
560            // continue to finish the pending collected barrier.
561            while let Some(task) = self.checkpoint_control.next_complete_barrier_task(None) {
562                let epochs_to_ack = task.epochs_to_ack();
563                match task
564                    .complete_barrier(&*self.context, self.env.clone())
565                    .await
566                {
567                    Ok(hummock_version_stats) => {
568                        self.checkpoint_control
569                            .ack_completed(BarrierCompleteOutput {
570                                epochs_to_ack,
571                                hummock_version_stats,
572                            });
573                    }
574                    Err(e) => {
575                        error!(
576                            err = %e.as_report(),
577                            "failed to complete barrier during recovery"
578                        );
579                        break;
580                    }
581                }
582            }
583        }
584        self.checkpoint_control.clear_on_err(err);
585    }
586}
587
588impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
589    /// Set barrier manager status.
590    async fn failure_recovery(&mut self, err: MetaError) {
591        self.clear_on_err(&err).await;
592
593        if self.enable_recovery {
594            let span = tracing::info_span!(
595                "failure_recovery",
596                error = %err.as_report(),
597            );
598
599            crate::telemetry::report_event(
600                risingwave_pb::telemetry::TelemetryEventStage::Recovery,
601                "failure_recovery",
602                0,
603                None,
604                None,
605                None,
606            );
607
608            let reason = RecoveryReason::Failover(err);
609
610            // No need to clean dirty tables for barrier recovery,
611            // The foreground stream job should cleanup their own tables.
612            self.recovery(false, reason).instrument(span).await;
613        } else {
614            panic!(
615                "a streaming error occurred while recovery is disabled, aborting: {:?}",
616                err.as_report()
617            );
618        }
619    }
620
621    async fn adhoc_recovery(&mut self) {
622        let err = MetaErrorInner::AdhocRecovery.into();
623        self.clear_on_err(&err).await;
624
625        let span = tracing::info_span!(
626            "adhoc_recovery",
627            error = %err.as_report(),
628        );
629
630        crate::telemetry::report_event(
631            risingwave_pb::telemetry::TelemetryEventStage::Recovery,
632            "adhoc_recovery",
633            0,
634            None,
635            None,
636            None,
637        );
638
639        // No need to clean dirty tables for barrier recovery,
640        // The foreground stream job should cleanup their own tables.
641        self.recovery(false, RecoveryReason::Adhoc)
642            .instrument(span)
643            .await;
644    }
645}
646
647impl<C> GlobalBarrierWorker<C> {
648    /// Send barrier-complete-rpc and wait for responses from all CNs
649    pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
650        // Record failure in event log.
651        use risingwave_pb::meta::event_log;
652        let event = event_log::EventCollectBarrierFail {
653            error: error.to_report_string(),
654        };
655        env.event_log_manager_ref()
656            .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
657    }
658}
659
660mod retry_strategy {
661    use std::time::Duration;
662
663    use tokio_retry::strategy::{ExponentialBackoff, jitter};
664
665    // Retry base interval in milliseconds.
666    const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
667    // Retry max interval.
668    const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
669
670    // MrCroxx: Use concrete type here to prevent unsolved compiler issue.
671    // Feel free to replace the concrete type with TAIT after fixed.
672
673    // mod retry_backoff_future {
674    //     use std::future::Future;
675    //     use std::time::Duration;
676    //
677    //     use tokio::time::sleep;
678    //
679    //     pub(crate) type RetryBackoffFuture = impl Future<Output = ()> + Unpin + Send + 'static;
680    //
681    //     #[define_opaque(RetryBackoffFuture)]
682    //     pub(super) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
683    //         Box::pin(sleep(duration))
684    //     }
685    // }
686    // pub(crate) use retry_backoff_future::*;
687
688    pub(crate) type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
689
690    pub(crate) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
691        Box::pin(tokio::time::sleep(duration))
692    }
693
694    pub(crate) type RetryBackoffStrategy =
695        impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
696
697    /// Initialize a retry strategy for operation in recovery.
698    #[inline(always)]
699    pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> + Send + 'static {
700        ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
701            .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
702            .map(jitter)
703    }
704
705    #[define_opaque(RetryBackoffStrategy)]
706    pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
707        get_retry_strategy().map(get_retry_backoff_future)
708    }
709}
710
711pub(crate) use retry_strategy::*;
712use risingwave_common::error::tonic::extra::{Score, ScoredError};
713use risingwave_pb::meta::event_log::{Event, EventRecovery};
714
715use crate::barrier::edge_builder::FragmentEdgeBuilder;
716
717impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
718    /// Recovery the whole cluster from the latest epoch.
719    ///
720    /// If `paused_reason` is `Some`, all data sources (including connectors and DMLs) will be
721    /// immediately paused after recovery, until the user manually resume them either by restarting
722    /// the cluster or `risectl` command. Used for debugging purpose.
723    ///
724    /// Returns the new state of the barrier manager after recovery.
725    pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
726        // Clear all control streams to release resources (connections to compute nodes) first.
727        self.control_stream_manager.clear();
728
729        let reason_str = match &recovery_reason {
730            RecoveryReason::Bootstrap => "bootstrap".to_owned(),
731            RecoveryReason::Failover(err) => {
732                format!("failed over: {}", err.as_report())
733            }
734            RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
735        };
736        self.context.abort_and_mark_blocked(None, recovery_reason);
737
738        self.recovery_inner(is_paused, reason_str).await;
739        self.context.mark_ready(MarkReadyOptions::Global {
740            blocked_databases: self.checkpoint_control.recovering_databases().collect(),
741        });
742    }
743
744    #[await_tree::instrument("recovery({recovery_reason})")]
745    async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
746        let event_log_manager_ref = self.env.event_log_manager_ref();
747
748        tracing::info!("recovery start!");
749        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
750            EventRecovery::global_recovery_start(recovery_reason.clone()),
751        )]);
752
753        let retry_strategy = get_retry_strategy();
754
755        // We take retry into consideration because this is the latency user sees for a cluster to
756        // get recovered.
757        let recovery_timer = GLOBAL_META_METRICS
758            .recovery_latency
759            .with_label_values(&["global"])
760            .start_timer();
761
762        let enable_per_database_isolation = self.enable_per_database_isolation();
763
764        let new_state = tokio_retry::Retry::spawn(retry_strategy, || async {
765            self.env.stream_client_pool().invalidate_all();
766            // We need to notify_creating_job_failed in every recovery retry, because in outer create_streaming_job handler,
767            // it holds the reschedule_read_lock and wait for creating job to finish, and caused the following scale_actor fail
768            // to acquire the reschedule_write_lock, and then keep recovering, and then deadlock.
769            // TODO: refactor and fix this hacky implementation.
770            self.context
771                .notify_creating_job_failed(None, recovery_reason.clone())
772                .await;
773
774            let runtime_info_snapshot = self
775                .context
776                .reload_runtime_info()
777                .await?;
778            runtime_info_snapshot.validate().inspect_err(|e| {
779                warn!(err = ?e.as_report(), ?runtime_info_snapshot, "reloaded runtime info failed to validate");
780            })?;
781            let BarrierWorkerRuntimeInfoSnapshot {
782                active_streaming_nodes,
783                database_job_infos,
784                mut state_table_committed_epochs,
785                mut state_table_log_epochs,
786                mut mv_depended_subscriptions,
787                stream_actors,
788                fragment_relations,
789                mut source_splits,
790                mut background_jobs,
791                hummock_version_stats,
792                database_infos,
793                mut cdc_table_snapshot_splits,
794            } = runtime_info_snapshot;
795
796            let term_id = Uuid::new_v4().to_string();
797
798
799            let mut control_stream_manager = ControlStreamManager::recover(
800                    self.env.clone(),
801                    active_streaming_nodes.current(),
802                    &term_id,
803                    self.context.clone(),
804                )
805                .await;
806
807
808            {
809                let mut builder = FragmentEdgeBuilder::new(database_job_infos.values().flat_map(|jobs| jobs.values().flat_map(|fragments| fragments.values())), &control_stream_manager);
810                builder.add_relations(&fragment_relations);
811                let mut edges = builder.build();
812
813                let mut collected_databases = HashMap::new();
814                let mut collecting_databases = HashMap::new();
815                let mut failed_databases = HashSet::new();
816                for (database_id, jobs) in database_job_infos {
817                    let result = control_stream_manager.inject_database_initial_barrier(
818                        database_id,
819                        jobs,
820                        &mut state_table_committed_epochs,
821                        &mut state_table_log_epochs,
822                        &mut edges,
823                        &stream_actors,
824                        &mut source_splits,
825                        &mut background_jobs,
826                        &mut mv_depended_subscriptions,
827                        is_paused,
828                        &hummock_version_stats,
829                        &mut cdc_table_snapshot_splits,
830                    );
831                    let node_to_collect = match result {
832                        Ok(info) => {
833                            info
834                        }
835                        Err(e) => {
836                            warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
837                            assert!(failed_databases.insert(database_id), "non-duplicate");
838                            continue;
839                        }
840                    };
841                    if !node_to_collect.is_collected() {
842                        assert!(collecting_databases.insert(database_id, node_to_collect).is_none());
843                    } else {
844                        warn!(%database_id, "database has no node to inject initial barrier");
845                        assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
846                    }
847                }
848                while !collecting_databases.is_empty() {
849                    let (worker_id, result) =
850                        control_stream_manager.next_response(&term_id, &self.context).await;
851                    let resp = match result {
852                        Err(e) => {
853                            warn!(%worker_id, err = %e.as_report(), "worker node failure during recovery");
854                            for (failed_database_id, _) in collecting_databases.extract_if(|_, node_to_collect| {
855                                !node_to_collect.is_valid_after_worker_err(worker_id)
856                            }) {
857                                warn!(%failed_database_id, %worker_id, "database failed to recovery in global recovery due to worker node err");
858                                assert!(failed_databases.insert(failed_database_id));
859                            }
860                            continue;
861                        }
862                        Ok(resp) => {
863                            match resp {
864                                Response::CompleteBarrier(resp) => {
865                                    resp
866                                }
867                                Response::ReportDatabaseFailure(resp) => {
868                                    let database_id = resp.database_id;
869                                    if collecting_databases.remove(&database_id).is_some() {
870                                        warn!(%database_id, %worker_id, "database reset during global recovery");
871                                        assert!(failed_databases.insert(database_id));
872                                    } else if collected_databases.remove(&database_id).is_some() {
873                                        warn!(%database_id, %worker_id, "database initialized but later reset during global recovery");
874                                        assert!(failed_databases.insert(database_id));
875                                    } else {
876                                        assert!(failed_databases.contains(&database_id));
877                                    }
878                                    continue;
879                                }
880                                other @ (Response::Init(_) | Response::Shutdown(_) | Response::ResetDatabase(_)) => {
881                                    return Err(anyhow!("get unexpected resp {:?}", other).into());
882                                }
883                            }
884                        }
885                    };
886                    assert_eq!(worker_id, resp.worker_id);
887                    let database_id = resp.database_id;
888                    if failed_databases.contains(&database_id) {
889                        assert!(!collecting_databases.contains_key(&database_id));
890                        // ignore the lately arrived collect resp of failed database
891                        continue;
892                    }
893                    let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
894                        unreachable!("should exist")
895                    };
896                    let node_to_collect = entry.get_mut();
897                    node_to_collect.collect_resp(resp);
898                    if node_to_collect.is_collected() {
899                        let node_to_collect = entry.remove();
900                        assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
901                    }
902                }
903                debug!("collected initial barrier");
904                if !stream_actors.is_empty() {
905                    warn!(actor_ids = ?stream_actors.keys().collect_vec(), "unused stream actors in recovery");
906                }
907                if !source_splits.is_empty() {
908                    warn!(actor_ids = ?source_splits.keys().collect_vec(), "unused actor source splits in recovery");
909                }
910                if !background_jobs.is_empty() {
911                    warn!(job_ids = ?background_jobs.keys().collect_vec(), "unused recovered background mview in recovery");
912                }
913                if !mv_depended_subscriptions.is_empty() {
914                    warn!(?mv_depended_subscriptions, "unused subscription infos in recovery");
915                }
916                if !state_table_committed_epochs.is_empty() {
917                    warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
918                }
919                if !enable_per_database_isolation && !failed_databases.is_empty() {
920                    return Err(anyhow!(
921                        "global recovery failed due to failure of databases {:?}",
922                        failed_databases.iter().map(|database_id| database_id.as_raw_id()).collect_vec()).into()
923                    );
924                }
925                let checkpoint_control = CheckpointControl::recover(
926                    collected_databases,
927                    failed_databases,
928                    &mut control_stream_manager,
929                    hummock_version_stats,
930                    self.env.clone(),
931                );
932
933                let reader = self.env.system_params_reader().await;
934                let checkpoint_frequency = reader.checkpoint_frequency();
935                let barrier_interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
936                let periodic_barriers = PeriodicBarriers::new(
937                    barrier_interval,
938                    checkpoint_frequency,
939                    database_infos,
940                );
941
942                Ok((
943                    active_streaming_nodes,
944                    control_stream_manager,
945                    checkpoint_control,
946                    term_id,
947                    periodic_barriers,
948                ))
949            }
950        }.inspect_err(|err: &MetaError| {
951            tracing::error!(error = %err.as_report(), "recovery failed");
952            event_log_manager_ref.add_event_logs(vec![Event::Recovery(
953                EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
954            )]);
955            GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
956        }))
957            .instrument(tracing::info_span!("recovery_attempt"))
958            .await
959            .expect("Retry until recovery success.");
960
961        let duration = recovery_timer.stop_and_record();
962
963        (
964            self.active_streaming_nodes,
965            self.control_stream_manager,
966            self.checkpoint_control,
967            self.term_id,
968            self.periodic_barriers,
969        ) = new_state;
970
971        tracing::info!("recovery success");
972
973        let recovering_databases = self
974            .checkpoint_control
975            .recovering_databases()
976            .map(|database| database.as_raw_id())
977            .collect_vec();
978        let running_databases = self
979            .checkpoint_control
980            .running_databases()
981            .map(|database| database.as_raw_id())
982            .collect_vec();
983
984        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
985            EventRecovery::global_recovery_success(
986                recovery_reason.clone(),
987                duration as f32,
988                running_databases,
989                recovering_databases,
990            ),
991        )]);
992
993        self.env
994            .notification_manager()
995            .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
996        self.env
997            .notification_manager()
998            .notify_compute_without_version(Operation::Update, Info::Recovery(Recovery {}));
999    }
1000}