risingwave_meta/barrier/
worker.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::anyhow;
22use arc_swap::ArcSwap;
23use futures::TryFutureExt;
24use itertools::Itertools;
25use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_pb::meta::Recovery;
28use risingwave_pb::meta::subscribe_response::{Info, Operation};
29use risingwave_pb::stream_service::streaming_control_stream_response::Response;
30use thiserror_ext::AsReport;
31use tokio::sync::mpsc;
32use tokio::sync::oneshot::{Receiver, Sender};
33use tokio::task::JoinHandle;
34use tonic::Status;
35use tracing::{Instrument, debug, error, info, warn};
36use uuid::Uuid;
37
38use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
39use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
40use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
41use crate::barrier::rpc::{
42    ControlStreamManager, WorkerNodeEvent, from_partial_graph_id, merge_node_rpc_errors,
43};
44use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
45use crate::barrier::{
46    BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, RecoveryReason,
47    schedule,
48};
49use crate::error::MetaErrorInner;
50use crate::hummock::HummockManagerRef;
51use crate::manager::sink_coordination::SinkCoordinatorManager;
52use crate::manager::{
53    ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
54    MetadataManager,
55};
56use crate::rpc::metrics::GLOBAL_META_METRICS;
57use crate::stream::{GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef};
58use crate::{MetaError, MetaResult};
59
60/// [`crate::barrier::worker::GlobalBarrierWorker`] sends barriers to all registered compute nodes and
61/// collect them, with monotonic increasing epoch numbers. On compute nodes, `LocalBarrierManager`
62/// in `risingwave_stream` crate will serve these requests and dispatch them to source actors.
63///
64/// Configuration change in our system is achieved by the mutation in the barrier. Thus,
65/// [`crate::barrier::worker::GlobalBarrierWorker`] provides a set of interfaces like a state machine,
66/// accepting [`crate::barrier::command::Command`] that carries info to build `Mutation`. To keep the consistency between
67/// barrier manager and meta store, some actions like "drop materialized view" or "create mv on mv"
68/// must be done in barrier manager transactional using [`crate::barrier::command::Command`].
69pub(super) struct GlobalBarrierWorker<C> {
70    /// Enable recovery or not when failover.
71    enable_recovery: bool,
72
73    /// The queue of scheduled barriers.
74    periodic_barriers: PeriodicBarriers,
75
76    /// Whether per database failure isolation is enabled in system parameters.
77    system_enable_per_database_isolation: bool,
78
79    pub(super) context: Arc<C>,
80
81    env: MetaSrvEnv,
82
83    checkpoint_control: CheckpointControl,
84
85    /// Command that has been collected but is still completing.
86    /// The join handle of the completing future is stored.
87    completing_task: CompletingTask,
88
89    request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
90
91    active_streaming_nodes: ActiveStreamingWorkerNodes,
92
93    control_stream_manager: ControlStreamManager,
94
95    term_id: String,
96}
97
98impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
99    pub(super) async fn new_inner(
100        env: MetaSrvEnv,
101        request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
102        context: Arc<C>,
103    ) -> Self {
104        let enable_recovery = env.opts.enable_recovery;
105
106        let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized();
107
108        let control_stream_manager = ControlStreamManager::new(env.clone());
109
110        let reader = env.system_params_reader().await;
111        let system_enable_per_database_isolation = reader.per_database_isolation();
112        // Load config will be performed in bootstrap phase.
113        let periodic_barriers = PeriodicBarriers::default();
114
115        let checkpoint_control = CheckpointControl::new(env.clone());
116        Self {
117            enable_recovery,
118            periodic_barriers,
119            system_enable_per_database_isolation,
120            context,
121            env,
122            checkpoint_control,
123            completing_task: CompletingTask::None,
124            request_rx,
125            active_streaming_nodes,
126            control_stream_manager,
127            term_id: "uninitialized".into(),
128        }
129    }
130}
131
132impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
133    /// Create a new [`crate::barrier::worker::GlobalBarrierWorker`].
134    pub async fn new(
135        scheduled_barriers: schedule::ScheduledBarriers,
136        env: MetaSrvEnv,
137        metadata_manager: MetadataManager,
138        hummock_manager: HummockManagerRef,
139        source_manager: SourceManagerRef,
140        sink_manager: SinkCoordinatorManager,
141        scale_controller: ScaleControllerRef,
142        request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
143        barrier_scheduler: schedule::BarrierScheduler,
144        refresh_manager: GlobalRefreshManagerRef,
145    ) -> Self {
146        let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
147
148        let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
149            scheduled_barriers,
150            status,
151            metadata_manager,
152            hummock_manager,
153            source_manager,
154            scale_controller,
155            env.clone(),
156            barrier_scheduler,
157            refresh_manager,
158            sink_manager,
159        ));
160
161        Self::new_inner(env, request_rx, context).await
162    }
163
164    pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
165        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
166        let fut = (self.env.await_tree_reg())
167            .register_derived_root("Global Barrier Worker")
168            .instrument(self.run(shutdown_rx));
169        let join_handle = tokio::spawn(fut);
170
171        (join_handle, shutdown_tx)
172    }
173
174    /// Check whether we should pause on bootstrap from the system parameter and reset it.
175    async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
176        let paused = self
177            .env
178            .system_params_reader()
179            .await
180            .pause_on_next_bootstrap()
181            || self.env.opts.pause_on_next_bootstrap_offline;
182
183        if paused {
184            warn!(
185                "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
186                 It will now be reset to `false`. \
187                 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
188                PAUSE_ON_NEXT_BOOTSTRAP_KEY
189            );
190            self.env
191                .system_params_manager_impl_ref()
192                .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
193                .await?;
194        }
195        Ok(paused)
196    }
197
198    /// Start an infinite loop to take scheduled barriers and send them.
199    async fn run(mut self, shutdown_rx: Receiver<()>) {
200        tracing::info!(
201            "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
202            self.enable_recovery,
203            self.checkpoint_control.in_flight_barrier_nums,
204        );
205
206        if !self.enable_recovery {
207            let job_exist = self
208                .context
209                .metadata_manager
210                .catalog_controller
211                .has_any_streaming_jobs()
212                .await
213                .unwrap();
214            if job_exist {
215                panic!(
216                    "Some streaming jobs already exist in meta, please start with recovery enabled \
217                or clean up the metadata using `./risedev clean-data`"
218                );
219            }
220        }
221
222        {
223            // Bootstrap recovery. Here we simply trigger a recovery process to achieve the
224            // consistency.
225            // Even if there's no actor to recover, we still go through the recovery process to
226            // inject the first `Initial` barrier.
227            let span = tracing::info_span!("bootstrap_recovery");
228            crate::telemetry::report_event(
229                risingwave_pb::telemetry::TelemetryEventStage::Recovery,
230                "normal_recovery",
231                0,
232                None,
233                None,
234                None,
235            );
236
237            let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
238
239            self.recovery(paused, RecoveryReason::Bootstrap)
240                .instrument(span)
241                .await;
242        }
243
244        self.run_inner(shutdown_rx).await
245    }
246}
247
248impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
249    fn enable_per_database_isolation(&self) -> bool {
250        self.system_enable_per_database_isolation && {
251            if let Err(e) =
252                risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()
253            {
254                warn!(error = %e.as_report(), "DatabaseFailureIsolation disabled by license");
255                false
256            } else {
257                true
258            }
259        }
260    }
261
262    pub(super) async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
263        let (local_notification_tx, mut local_notification_rx) =
264            tokio::sync::mpsc::unbounded_channel();
265        self.env
266            .notification_manager()
267            .insert_local_sender(local_notification_tx);
268
269        // Start the event loop.
270        loop {
271            tokio::select! {
272                biased;
273
274                // Shutdown
275                _ = &mut shutdown_rx => {
276                    tracing::info!("Barrier manager is stopped");
277                    break;
278                }
279
280                request = self.request_rx.recv() => {
281                    if let Some(request) = request {
282                        match request {
283                            BarrierManagerRequest::GetBackfillProgress(result_tx) => {
284                                let progress = self.checkpoint_control.gen_backfill_progress();
285                                if result_tx.send(progress).is_err() {
286                                    error!("failed to send get ddl progress");
287                                }
288                            }
289                            BarrierManagerRequest::GetCdcProgress(result_tx) => {
290                                let progress = self.checkpoint_control.gen_cdc_progress();
291                                if result_tx.send(progress).is_err() {
292                                    error!("failed to send get ddl progress");
293                                }
294                            }
295                            // Handle adhoc recovery triggered by user.
296                            BarrierManagerRequest::AdhocRecovery(sender) => {
297                                self.adhoc_recovery().await;
298                                if sender.send(()).is_err() {
299                                    warn!("failed to notify finish of adhoc recovery");
300                                }
301                            }
302                            BarrierManagerRequest::UpdateDatabaseBarrier {
303                                database_id,
304                                barrier_interval_ms,
305                                checkpoint_frequency,
306                                sender,
307                            } => {
308                                self.periodic_barriers
309                                    .update_database_barrier(
310                                        database_id,
311                                        barrier_interval_ms,
312                                        checkpoint_frequency,
313                                    );
314                                if sender.send(()).is_err() {
315                                    warn!("failed to notify finish of update database barrier");
316                                }
317                            }
318                        }
319                    } else {
320                        tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
321                        return;
322                    }
323                }
324
325                changed_worker = self.active_streaming_nodes.changed() => {
326                    #[cfg(debug_assertions)]
327                    {
328                        self.active_streaming_nodes.validate_change().await;
329                    }
330
331                    info!(?changed_worker, "worker changed");
332
333                    match changed_worker {
334                        ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) => {
335                            self.control_stream_manager.add_worker(node, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
336                        }
337                        ActiveStreamingWorkerChange::Remove(node) => {
338                            self.control_stream_manager.remove_worker(node);
339                        }
340                    }
341                }
342
343                notification = local_notification_rx.recv() => {
344                    let notification = notification.unwrap();
345                    if let LocalNotification::SystemParamsChange(p) = notification {
346                        {
347                            self.periodic_barriers.set_sys_barrier_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
348                            self.periodic_barriers
349                                .set_sys_checkpoint_frequency(p.checkpoint_frequency());
350                            self.system_enable_per_database_isolation = p.per_database_isolation();
351                        }
352                    }
353                }
354                complete_result = self
355                    .completing_task
356                    .next_completed_barrier(
357                        &mut self.periodic_barriers,
358                        &mut self.checkpoint_control,
359                        &mut self.control_stream_manager,
360                        &self.context,
361                        &self.env,
362                ) => {
363                    match complete_result {
364                        Ok(output) => {
365                            self.checkpoint_control.ack_completed(output);
366                        }
367                        Err(e) => {
368                            self.failure_recovery(e).await;
369                        }
370                    }
371                },
372                event = self.checkpoint_control.next_event() => {
373                    let result: MetaResult<()> = try {
374                        match event {
375                            CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
376                                let database_id = entering_initializing.database_id();
377                                let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
378                                    resp.root_err.as_ref().map(|root_err| {
379                                        (*worker_id, ScoredError {
380                                            error: Status::internal(&root_err.err_msg),
381                                            score: Score(root_err.score)
382                                        })
383                                    })
384                                }));
385                                Self::report_collect_failure(&self.env, &error);
386                                self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
387                                match self.context.reload_database_runtime_info(database_id).await.and_then(|runtime_info| {
388                                    runtime_info.map(|runtime_info| {
389                                        runtime_info.validate(database_id, &self.active_streaming_nodes).inspect_err(|e| {
390                                            warn!(%database_id, err = ?e.as_report(), ?runtime_info, "reloaded database runtime info failed to validate");
391                                        })?;
392                                        Ok(runtime_info)
393                                    })
394                                    .transpose()
395                                }) {
396                                    Ok(Some(runtime_info)) => {
397                                        entering_initializing.enter(runtime_info, &mut self.control_stream_manager);
398                                    }
399                                    Ok(None) => {
400                                        info!(%database_id, "database removed after reloading empty runtime info");
401                                        // mark ready to unblock subsequent request
402                                        self.context.mark_ready(MarkReadyOptions::Database(database_id));
403                                        entering_initializing.remove();
404                                    }
405                                    Err(e) => {
406                                        entering_initializing.fail_reload_runtime_info(e);
407                                    }
408                                }
409                            }
410                            CheckpointControlEvent::EnteringRunning(entering_running) => {
411                                self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
412                                entering_running.enter();
413                            }
414                        }
415                    };
416                    if let Err(e) = result {
417                        self.failure_recovery(e).await;
418                    }
419                }
420                (worker_id, event) = self.control_stream_manager.next_event(&self.term_id, &self.context) => {
421                    let resp_result = match event {
422                        WorkerNodeEvent::Response(result) => {
423                            result
424                        }
425                        WorkerNodeEvent::Connected(connected) => {
426                            connected.initialize(self.checkpoint_control.inflight_infos());
427                            continue;
428                        }
429                    };
430                    let result: MetaResult<()> = try {
431                        let resp = match resp_result {
432                            Err(err) => {
433                                let failed_databases = self.checkpoint_control.databases_failed_at_worker_err(worker_id);
434                                if !failed_databases.is_empty() {
435                                    if !self.enable_recovery {
436                                        panic!("control stream to worker {} failed but recovery not enabled: {:?}", worker_id, err.as_report());
437                                    }
438                                    if !self.enable_per_database_isolation() {
439                                        Err(err.clone())?;
440                                    }
441                                    Self::report_collect_failure(&self.env, &err);
442                                    for database_id in failed_databases {
443                                        if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
444                                            warn!(%worker_id, %database_id, "database entering recovery on node failure");
445                                            self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
446                                            self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
447                                            // TODO: add log on blocking time
448                                            let output = self.completing_task.wait_completing_task().await?;
449                                            entering_recovery.enter(output, &mut self.control_stream_manager);
450                                        }
451                                    }
452                                }  else {
453                                    warn!(%worker_id, "no barrier to collect from worker, ignore err");
454                                }
455                                continue;
456                            }
457                            Ok(resp) => resp,
458                        };
459                        match resp {
460                            Response::CompleteBarrier(resp) => {
461                                self.checkpoint_control.barrier_collected(resp, &mut self.periodic_barriers)?;
462                            },
463                            Response::ReportPartialGraphFailure(resp) => {
464                                if !self.enable_recovery {
465                                    panic!("database failure reported but recovery not enabled: {:?}", resp)
466                                }
467                                let (database_id, _) = from_partial_graph_id(resp.partial_graph_id);
468                                if !self.enable_per_database_isolation() {
469                                        Err(anyhow!("database {} reset", database_id))?;
470                                    }
471                                if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
472                                    warn!(%database_id, "database entering recovery");
473                                    self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
474                                    // TODO: add log on blocking time
475                                    let output = self.completing_task.wait_completing_task().await?;
476                                    entering_recovery.enter(output, &mut self.control_stream_manager);
477                                }
478                            }
479                            Response::ResetPartialGraph(resp) => {
480                                self.checkpoint_control.on_reset_partial_graph_resp(worker_id, resp);
481                            }
482                            other @ Response::Init(_) | other @ Response::Shutdown(_) => {
483                                Err(anyhow!("get expected response: {:?}", other))?;
484                            }
485                        }
486                    };
487                    if let Err(e) = result {
488                        self.failure_recovery(e).await;
489                    }
490                }
491                new_barrier = self.periodic_barriers.next_barrier(&*self.context) => {
492                    let database_id = new_barrier.database_id;
493                    if let Err(e) = self.checkpoint_control.handle_new_barrier(new_barrier, &mut self.control_stream_manager) {
494                        if !self.enable_recovery {
495                            panic!(
496                                "failed to inject barrier to some databases but recovery not enabled: {:?}", (
497                                    database_id,
498                                    e.as_report()
499                                )
500                            );
501                        }
502                        let result: MetaResult<_> = try {
503                            if !self.enable_per_database_isolation() {
504                                let err = anyhow!("failed to inject barrier to databases: {:?}", (database_id, e.as_report()));
505                                Err(err)?;
506                            } else if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
507                                warn!(%database_id, e = %e.as_report(),"database entering recovery on inject failure");
508                                self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(e).context("inject barrier failure").into()));
509                                // TODO: add log on blocking time
510                                let output = self.completing_task.wait_completing_task().await?;
511                                entering_recovery.enter(output, &mut self.control_stream_manager);
512                            }
513                        };
514                        if let Err(e) = result {
515                            self.failure_recovery(e).await;
516                        }
517                    }
518                }
519            }
520            self.checkpoint_control.update_barrier_nums_metrics();
521        }
522    }
523}
524
525impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
526    /// We need to make sure there are no changes when doing recovery
527    pub async fn clear_on_err(&mut self, err: &MetaError) {
528        // join spawned completing command to finish no matter it succeeds or not.
529        let is_err = match replace(&mut self.completing_task, CompletingTask::None) {
530            CompletingTask::None => false,
531            CompletingTask::Completing {
532                epochs_to_ack,
533                join_handle,
534                ..
535            } => {
536                info!("waiting for completing command to finish in recovery");
537                match join_handle.await {
538                    Err(e) => {
539                        warn!(err = %e.as_report(), "failed to join completing task");
540                        true
541                    }
542                    Ok(Err(e)) => {
543                        warn!(
544                            err = %e.as_report(),
545                            "failed to complete barrier during clear"
546                        );
547                        true
548                    }
549                    Ok(Ok(hummock_version_stats)) => {
550                        self.checkpoint_control
551                            .ack_completed(BarrierCompleteOutput {
552                                epochs_to_ack,
553                                hummock_version_stats,
554                            });
555                        false
556                    }
557                }
558            }
559            CompletingTask::Err(_) => true,
560        };
561        if !is_err {
562            // continue to finish the pending collected barrier.
563            while let Some(task) = self.checkpoint_control.next_complete_barrier_task(None) {
564                let epochs_to_ack = task.epochs_to_ack();
565                match task
566                    .complete_barrier(&*self.context, self.env.clone())
567                    .await
568                {
569                    Ok(hummock_version_stats) => {
570                        self.checkpoint_control
571                            .ack_completed(BarrierCompleteOutput {
572                                epochs_to_ack,
573                                hummock_version_stats,
574                            });
575                    }
576                    Err(e) => {
577                        error!(
578                            err = %e.as_report(),
579                            "failed to complete barrier during recovery"
580                        );
581                        break;
582                    }
583                }
584            }
585        }
586        self.checkpoint_control.clear_on_err(err);
587    }
588}
589
590impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
591    /// Set barrier manager status.
592    async fn failure_recovery(&mut self, err: MetaError) {
593        self.clear_on_err(&err).await;
594
595        if self.enable_recovery {
596            let span = tracing::info_span!(
597                "failure_recovery",
598                error = %err.as_report(),
599            );
600
601            crate::telemetry::report_event(
602                risingwave_pb::telemetry::TelemetryEventStage::Recovery,
603                "failure_recovery",
604                0,
605                None,
606                None,
607                None,
608            );
609
610            let reason = RecoveryReason::Failover(err);
611
612            // No need to clean dirty tables for barrier recovery,
613            // The foreground stream job should cleanup their own tables.
614            self.recovery(false, reason).instrument(span).await;
615        } else {
616            panic!(
617                "a streaming error occurred while recovery is disabled, aborting: {:?}",
618                err.as_report()
619            );
620        }
621    }
622
623    async fn adhoc_recovery(&mut self) {
624        let err = MetaErrorInner::AdhocRecovery.into();
625        self.clear_on_err(&err).await;
626
627        let span = tracing::info_span!(
628            "adhoc_recovery",
629            error = %err.as_report(),
630        );
631
632        crate::telemetry::report_event(
633            risingwave_pb::telemetry::TelemetryEventStage::Recovery,
634            "adhoc_recovery",
635            0,
636            None,
637            None,
638            None,
639        );
640
641        // No need to clean dirty tables for barrier recovery,
642        // The foreground stream job should cleanup their own tables.
643        self.recovery(false, RecoveryReason::Adhoc)
644            .instrument(span)
645            .await;
646    }
647}
648
649impl<C> GlobalBarrierWorker<C> {
650    /// Send barrier-complete-rpc and wait for responses from all CNs
651    pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
652        // Record failure in event log.
653        use risingwave_pb::meta::event_log;
654        let event = event_log::EventCollectBarrierFail {
655            error: error.to_report_string(),
656        };
657        env.event_log_manager_ref()
658            .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
659    }
660}
661
662mod retry_strategy {
663    use std::time::Duration;
664
665    use tokio_retry::strategy::{ExponentialBackoff, jitter};
666
667    // Retry base interval in milliseconds.
668    const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
669    // Retry max interval.
670    const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
671
672    // MrCroxx: Use concrete type here to prevent unsolved compiler issue.
673    // Feel free to replace the concrete type with TAIT after fixed.
674
675    // mod retry_backoff_future {
676    //     use std::future::Future;
677    //     use std::time::Duration;
678    //
679    //     use tokio::time::sleep;
680    //
681    //     pub(crate) type RetryBackoffFuture = impl Future<Output = ()> + Unpin + Send + 'static;
682    //
683    //     #[define_opaque(RetryBackoffFuture)]
684    //     pub(super) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
685    //         Box::pin(sleep(duration))
686    //     }
687    // }
688    // pub(crate) use retry_backoff_future::*;
689
690    pub(crate) type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
691
692    pub(crate) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
693        Box::pin(tokio::time::sleep(duration))
694    }
695
696    pub(crate) type RetryBackoffStrategy =
697        impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
698
699    /// Initialize a retry strategy for operation in recovery.
700    #[inline(always)]
701    pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> + Send + 'static {
702        ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
703            .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
704            .map(jitter)
705    }
706
707    #[define_opaque(RetryBackoffStrategy)]
708    pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
709        get_retry_strategy().map(get_retry_backoff_future)
710    }
711}
712
713pub(crate) use retry_strategy::*;
714use risingwave_common::error::tonic::extra::{Score, ScoredError};
715use risingwave_pb::meta::event_log::{Event, EventRecovery};
716
717impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
718    /// Recovery the whole cluster from the latest epoch.
719    ///
720    /// If `paused_reason` is `Some`, all data sources (including connectors and DMLs) will be
721    /// immediately paused after recovery, until the user manually resume them either by restarting
722    /// the cluster or `risectl` command. Used for debugging purpose.
723    ///
724    /// Returns the new state of the barrier manager after recovery.
725    pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
726        // Clear all control streams to release resources (connections to compute nodes) first.
727        self.control_stream_manager.clear();
728
729        let reason_str = match &recovery_reason {
730            RecoveryReason::Bootstrap => "bootstrap".to_owned(),
731            RecoveryReason::Failover(err) => {
732                format!("failed over: {}", err.as_report())
733            }
734            RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
735        };
736        self.context.abort_and_mark_blocked(None, recovery_reason);
737
738        self.recovery_inner(is_paused, reason_str).await;
739        self.context.mark_ready(MarkReadyOptions::Global {
740            blocked_databases: self.checkpoint_control.recovering_databases().collect(),
741        });
742    }
743
744    #[await_tree::instrument("recovery({recovery_reason})")]
745    async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
746        let event_log_manager_ref = self.env.event_log_manager_ref();
747
748        tracing::info!("recovery start!");
749        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
750            EventRecovery::global_recovery_start(recovery_reason.clone()),
751        )]);
752
753        let retry_strategy = get_retry_strategy();
754
755        // We take retry into consideration because this is the latency user sees for a cluster to
756        // get recovered.
757        let recovery_timer = GLOBAL_META_METRICS
758            .recovery_latency
759            .with_label_values(&["global"])
760            .start_timer();
761
762        let enable_per_database_isolation = self.enable_per_database_isolation();
763
764        let new_state = tokio_retry::Retry::spawn(retry_strategy, || async {
765            self.env.stream_client_pool().invalidate_all();
766            // We need to notify_creating_job_failed in every recovery retry, because in outer create_streaming_job handler,
767            // it holds the reschedule_read_lock and wait for creating job to finish, and caused the following scale_actor fail
768            // to acquire the reschedule_write_lock, and then keep recovering, and then deadlock.
769            // TODO: refactor and fix this hacky implementation.
770            self.context
771                .notify_creating_job_failed(None, recovery_reason.clone())
772                .await;
773
774            let runtime_info_snapshot = self
775                .context
776                .reload_runtime_info()
777                .await?;
778            runtime_info_snapshot.validate().inspect_err(|e| {
779                warn!(err = ?e.as_report(), ?runtime_info_snapshot, "reloaded runtime info failed to validate");
780            })?;
781            let BarrierWorkerRuntimeInfoSnapshot {
782                active_streaming_nodes,
783                database_job_infos,
784                mut backfill_orders,
785                mut state_table_committed_epochs,
786                mut state_table_log_epochs,
787                mut mv_depended_subscriptions,
788                stream_actors,
789                fragment_relations,
790                mut source_splits,
791                mut background_jobs,
792                hummock_version_stats,
793                database_infos,
794                mut cdc_table_snapshot_splits,
795            } = runtime_info_snapshot;
796
797            let term_id = Uuid::new_v4().to_string();
798
799
800            let mut control_stream_manager = ControlStreamManager::recover(
801                    self.env.clone(),
802                    active_streaming_nodes.current(),
803                    &term_id,
804                    self.context.clone(),
805                )
806                .await;
807
808
809            {
810                let mut collected_databases = HashMap::new();
811                let mut collecting_databases = HashMap::new();
812                let mut failed_databases = HashMap::new();
813                for (database_id, jobs) in database_job_infos {
814                    let mut injected_creating_jobs = HashSet::new();
815                    let database_backfill_orders = jobs.keys().map(|job_id| (*job_id, backfill_orders.remove(job_id).unwrap_or_default())).collect();
816                    let result = control_stream_manager.inject_database_initial_barrier(
817                        database_id,
818                        jobs,
819                        database_backfill_orders,
820                        &mut state_table_committed_epochs,
821                        &mut state_table_log_epochs,
822                        &fragment_relations,
823                        &stream_actors,
824                        &mut source_splits,
825                        &mut background_jobs,
826                        &mut mv_depended_subscriptions,
827                        is_paused,
828                        &hummock_version_stats,
829                        &mut cdc_table_snapshot_splits,
830                        &mut injected_creating_jobs,
831                    );
832                    let node_to_collect = match result {
833                        Ok(info) => {
834                            info
835                        }
836                        Err(e) => {
837                            warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
838                            assert!(failed_databases.insert(database_id, injected_creating_jobs).is_none(), "non-duplicate");
839                            continue;
840                        }
841                    };
842                    if !node_to_collect.is_collected() {
843                        assert!(collecting_databases.insert(database_id, node_to_collect).is_none());
844                    } else {
845                        warn!(%database_id, "database has no node to inject initial barrier");
846                        assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
847                    }
848                }
849                while !collecting_databases.is_empty() {
850                    let (worker_id, result) =
851                        control_stream_manager.next_response(&term_id, &self.context).await;
852                    let resp = match result {
853                        Err(e) => {
854                            warn!(%worker_id, err = %e.as_report(), "worker node failure during recovery");
855                            for (failed_database_id, collector) in collecting_databases.extract_if(|_, collector| {
856                                !collector.is_valid_after_worker_err(worker_id)
857                            }) {
858                                warn!(%failed_database_id, %worker_id, "database failed to recovery in global recovery due to worker node err");
859                                assert!(failed_databases.insert(failed_database_id, collector.creating_job_ids().collect()).is_none());
860                            }
861                            continue;
862                        }
863                        Ok(resp) => {
864                            match resp {
865                                Response::CompleteBarrier(resp) => {
866                                    resp
867                                }
868                                Response::ReportPartialGraphFailure(resp) => {
869                                    let (database_id, _) = from_partial_graph_id(resp.partial_graph_id);
870                                    if let Some(collector) = collecting_databases.remove(&database_id) {
871                                        warn!(%database_id, %worker_id, "database reset during global recovery");
872                                        assert!(failed_databases.insert(database_id, collector.creating_job_ids().collect()).is_none());
873                                    } else if let Some(database) = collected_databases.remove(&database_id) {
874                                        warn!(%database_id, %worker_id, "database initialized but later reset during global recovery");
875                                        assert!(failed_databases.insert(database_id, database.creating_streaming_job_controls.keys().copied().collect()).is_none());
876                                    } else {
877                                        assert!(failed_databases.contains_key(&database_id));
878                                    }
879                                    continue;
880                                }
881                                other @ (Response::Init(_) | Response::Shutdown(_) | Response::ResetPartialGraph(_)) => {
882                                    return Err(anyhow!("get unexpected resp {:?}", other).into());
883                                }
884                            }
885                        }
886                    };
887                    assert_eq!(worker_id, resp.worker_id);
888                    let (database_id, _) = from_partial_graph_id(resp.partial_graph_id);
889                    if failed_databases.contains_key(&database_id) {
890                        assert!(!collecting_databases.contains_key(&database_id));
891                        // ignore the lately arrived collect resp of failed database
892                        continue;
893                    }
894                    let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
895                        unreachable!("should exist")
896                    };
897                    let node_to_collect = entry.get_mut();
898                    node_to_collect.collect_resp(resp);
899                    if node_to_collect.is_collected() {
900                        let node_to_collect = entry.remove();
901                        assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
902                    }
903                }
904                debug!("collected initial barrier");
905                if !stream_actors.is_empty() {
906                    warn!(actor_ids = ?stream_actors.keys().collect_vec(), "unused stream actors in recovery");
907                }
908                if !source_splits.is_empty() {
909                    warn!(actor_ids = ?source_splits.keys().collect_vec(), "unused actor source splits in recovery");
910                }
911                if !background_jobs.is_empty() {
912                    warn!(job_ids = ?background_jobs.iter().collect_vec(), "unused recovered background mview in recovery");
913                }
914                if !mv_depended_subscriptions.is_empty() {
915                    warn!(?mv_depended_subscriptions, "unused subscription infos in recovery");
916                }
917                if !state_table_committed_epochs.is_empty() {
918                    warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
919                }
920                if !enable_per_database_isolation && !failed_databases.is_empty() {
921                    return Err(anyhow!(
922                        "global recovery failed due to failure of databases {:?}",
923                        failed_databases.keys().collect_vec()).into()
924                    );
925                }
926                let checkpoint_control = CheckpointControl::recover(
927                    collected_databases,
928                    failed_databases,
929                    &mut control_stream_manager,
930                    hummock_version_stats,
931                    self.env.clone(),
932                );
933
934                let reader = self.env.system_params_reader().await;
935                let checkpoint_frequency = reader.checkpoint_frequency();
936                let barrier_interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
937                let periodic_barriers = PeriodicBarriers::new(
938                    barrier_interval,
939                    checkpoint_frequency,
940                    database_infos,
941                );
942
943                Ok((
944                    active_streaming_nodes,
945                    control_stream_manager,
946                    checkpoint_control,
947                    term_id,
948                    periodic_barriers,
949                ))
950            }
951        }.inspect_err(|err: &MetaError| {
952            tracing::error!(error = %err.as_report(), "recovery failed");
953            event_log_manager_ref.add_event_logs(vec![Event::Recovery(
954                EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
955            )]);
956            GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
957        }))
958            .instrument(tracing::info_span!("recovery_attempt"))
959            .await
960            .expect("Retry until recovery success.");
961
962        let duration = recovery_timer.stop_and_record();
963
964        (
965            self.active_streaming_nodes,
966            self.control_stream_manager,
967            self.checkpoint_control,
968            self.term_id,
969            self.periodic_barriers,
970        ) = new_state;
971
972        tracing::info!("recovery success");
973
974        let recovering_databases = self
975            .checkpoint_control
976            .recovering_databases()
977            .map(|database| database.as_raw_id())
978            .collect_vec();
979        let running_databases = self
980            .checkpoint_control
981            .running_databases()
982            .map(|database| database.as_raw_id())
983            .collect_vec();
984
985        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
986            EventRecovery::global_recovery_success(
987                recovery_reason.clone(),
988                duration as f32,
989                running_databases,
990                recovering_databases,
991            ),
992        )]);
993
994        self.env
995            .notification_manager()
996            .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
997        self.env
998            .notification_manager()
999            .notify_compute_without_version(Operation::Update, Info::Recovery(Recovery {}));
1000    }
1001}