risingwave_meta/barrier/
worker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::anyhow;
22use arc_swap::ArcSwap;
23use futures::TryFutureExt;
24use itertools::Itertools;
25use risingwave_common::catalog::DatabaseId;
26use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_pb::meta::Recovery;
29use risingwave_pb::meta::subscribe_response::{Info, Operation};
30use risingwave_pb::stream_service::streaming_control_stream_response::Response;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tokio::sync::oneshot::{Receiver, Sender};
34use tokio::task::JoinHandle;
35use tokio::time::Instant;
36use tonic::Status;
37use tracing::{Instrument, debug, error, info, warn};
38use uuid::Uuid;
39
40use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
41use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
42use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
43use crate::barrier::rpc::{ControlStreamManager, merge_node_rpc_errors};
44use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
45use crate::barrier::{
46    BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command,
47    RecoveryReason, schedule,
48};
49use crate::error::MetaErrorInner;
50use crate::hummock::HummockManagerRef;
51use crate::manager::sink_coordination::SinkCoordinatorManager;
52use crate::manager::{
53    ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
54    MetadataManager,
55};
56use crate::rpc::metrics::GLOBAL_META_METRICS;
57use crate::stream::{ScaleControllerRef, SourceManagerRef};
58use crate::{MetaError, MetaResult};
59
60/// [`crate::barrier::worker::GlobalBarrierWorker`] sends barriers to all registered compute nodes and
61/// collect them, with monotonic increasing epoch numbers. On compute nodes, `LocalBarrierManager`
62/// in `risingwave_stream` crate will serve these requests and dispatch them to source actors.
63///
64/// Configuration change in our system is achieved by the mutation in the barrier. Thus,
65/// [`crate::barrier::worker::GlobalBarrierWorker`] provides a set of interfaces like a state machine,
66/// accepting [`crate::barrier::command::Command`] that carries info to build `Mutation`. To keep the consistency between
67/// barrier manager and meta store, some actions like "drop materialized view" or "create mv on mv"
68/// must be done in barrier manager transactional using [`crate::barrier::command::Command`].
69pub(super) struct GlobalBarrierWorker<C> {
70    /// Enable recovery or not when failover.
71    enable_recovery: bool,
72
73    /// The queue of scheduled barriers.
74    periodic_barriers: PeriodicBarriers,
75
76    /// The max barrier nums in flight
77    in_flight_barrier_nums: usize,
78
79    /// Whether per database failure isolation is enabled in system parameters.
80    system_enable_per_database_isolation: bool,
81
82    pub(super) context: Arc<C>,
83
84    env: MetaSrvEnv,
85
86    checkpoint_control: CheckpointControl,
87
88    /// Command that has been collected but is still completing.
89    /// The join handle of the completing future is stored.
90    completing_task: CompletingTask,
91
92    request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
93
94    active_streaming_nodes: ActiveStreamingWorkerNodes,
95
96    sink_manager: SinkCoordinatorManager,
97
98    control_stream_manager: ControlStreamManager,
99
100    term_id: String,
101}
102
103impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
104    /// Create a new [`crate::barrier::worker::GlobalBarrierWorker`].
105    pub async fn new(
106        scheduled_barriers: schedule::ScheduledBarriers,
107        env: MetaSrvEnv,
108        metadata_manager: MetadataManager,
109        hummock_manager: HummockManagerRef,
110        source_manager: SourceManagerRef,
111        sink_manager: SinkCoordinatorManager,
112        scale_controller: ScaleControllerRef,
113        request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
114    ) -> Self {
115        let enable_recovery = env.opts.enable_recovery;
116        let in_flight_barrier_nums = env.opts.in_flight_barrier_nums;
117
118        let active_streaming_nodes =
119            ActiveStreamingWorkerNodes::uninitialized(metadata_manager.clone());
120
121        let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
122
123        let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
124            scheduled_barriers,
125            status,
126            metadata_manager,
127            hummock_manager,
128            source_manager,
129            scale_controller,
130            env.clone(),
131        ));
132
133        let control_stream_manager = ControlStreamManager::new(env.clone());
134
135        let reader = env.system_params_reader().await;
136        let checkpoint_frequency = reader.checkpoint_frequency() as _;
137        let system_enable_per_database_isolation = reader.per_database_isolation();
138        let interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
139        let periodic_barriers = PeriodicBarriers::new(interval, checkpoint_frequency);
140        tracing::info!(
141            "Starting barrier scheduler with: checkpoint_frequency={:?}",
142            checkpoint_frequency,
143        );
144
145        let checkpoint_control = CheckpointControl::new(env.clone());
146        Self {
147            enable_recovery,
148            periodic_barriers,
149            in_flight_barrier_nums,
150            system_enable_per_database_isolation,
151            context,
152            env,
153            checkpoint_control,
154            completing_task: CompletingTask::None,
155            request_rx,
156            active_streaming_nodes,
157            sink_manager,
158            control_stream_manager,
159            term_id: "uninitialized".into(),
160        }
161    }
162
163    pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
164        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
165        let fut = (self.env.await_tree_reg())
166            .register_derived_root("Global Barrier Worker")
167            .instrument(self.run(shutdown_rx));
168        let join_handle = tokio::spawn(fut);
169
170        (join_handle, shutdown_tx)
171    }
172
173    /// Check whether we should pause on bootstrap from the system parameter and reset it.
174    async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
175        let paused = self
176            .env
177            .system_params_reader()
178            .await
179            .pause_on_next_bootstrap();
180        if paused {
181            warn!(
182                "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
183                 It will now be reset to `false`. \
184                 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
185                PAUSE_ON_NEXT_BOOTSTRAP_KEY
186            );
187            self.env
188                .system_params_manager_impl_ref()
189                .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
190                .await?;
191        }
192        Ok(paused)
193    }
194
195    /// Start an infinite loop to take scheduled barriers and send them.
196    async fn run(mut self, shutdown_rx: Receiver<()>) {
197        tracing::info!(
198            "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
199            self.enable_recovery,
200            self.in_flight_barrier_nums,
201        );
202
203        if !self.enable_recovery {
204            let job_exist = self
205                .context
206                .metadata_manager
207                .catalog_controller
208                .has_any_streaming_jobs()
209                .await
210                .unwrap();
211            if job_exist {
212                panic!(
213                    "Some streaming jobs already exist in meta, please start with recovery enabled \
214                or clean up the metadata using `./risedev clean-data`"
215                );
216            }
217        }
218
219        {
220            // Bootstrap recovery. Here we simply trigger a recovery process to achieve the
221            // consistency.
222            // Even if there's no actor to recover, we still go through the recovery process to
223            // inject the first `Initial` barrier.
224            let span = tracing::info_span!("bootstrap_recovery");
225            crate::telemetry::report_event(
226                risingwave_pb::telemetry::TelemetryEventStage::Recovery,
227                "normal_recovery",
228                0,
229                None,
230                None,
231                None,
232            );
233
234            let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
235
236            self.recovery(paused, RecoveryReason::Bootstrap)
237                .instrument(span)
238                .await;
239        }
240
241        self.run_inner(shutdown_rx).await
242    }
243}
244
245impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
246    fn enable_per_database_isolation(&self) -> bool {
247        self.system_enable_per_database_isolation && {
248            if let Err(e) =
249                risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()
250            {
251                warn!(error = %e.as_report(), "DatabaseFailureIsolation disabled by license");
252                false
253            } else {
254                true
255            }
256        }
257    }
258
259    async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
260        let (local_notification_tx, mut local_notification_rx) =
261            tokio::sync::mpsc::unbounded_channel();
262        self.env
263            .notification_manager()
264            .insert_local_sender(local_notification_tx)
265            .await;
266
267        // Start the event loop.
268        loop {
269            tokio::select! {
270                biased;
271
272                // Shutdown
273                _ = &mut shutdown_rx => {
274                    tracing::info!("Barrier manager is stopped");
275                    break;
276                }
277
278                request = self.request_rx.recv() => {
279                    if let Some(request) = request {
280                        match request {
281                            BarrierManagerRequest::GetDdlProgress(result_tx) => {
282                                let progress = self.checkpoint_control.gen_ddl_progress();
283                                if result_tx.send(progress).is_err() {
284                                    error!("failed to send get ddl progress");
285                                }
286                            }// Handle adhoc recovery triggered by user.
287                            BarrierManagerRequest::AdhocRecovery(sender) => {
288                                self.adhoc_recovery().await;
289                                if sender.send(()).is_err() {
290                                    warn!("failed to notify finish of adhoc recovery");
291                                }
292                            }
293                        }
294                    } else {
295                        tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
296                        return;
297                    }
298                }
299
300                changed_worker = self.active_streaming_nodes.changed() => {
301                    #[cfg(debug_assertions)]
302                    {
303                        self.active_streaming_nodes.validate_change().await;
304                    }
305
306                    info!(?changed_worker, "worker changed");
307
308                    if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker {
309                        self.control_stream_manager.add_worker(node, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
310                    }
311                }
312
313                notification = local_notification_rx.recv() => {
314                    let notification = notification.unwrap();
315                    if let LocalNotification::SystemParamsChange(p) = notification {
316                        {
317                            self.periodic_barriers.set_min_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
318                            self.periodic_barriers
319                                .set_checkpoint_frequency(p.checkpoint_frequency() as usize);
320                            self.system_enable_per_database_isolation = p.per_database_isolation();
321                        }
322                    }
323                }
324                complete_result = self
325                    .completing_task
326                    .next_completed_barrier(
327                        &mut self.periodic_barriers,
328                        &mut self.checkpoint_control,
329                        &mut self.control_stream_manager,
330                        &self.context,
331                        &self.env,
332                ) => {
333                    match complete_result {
334                        Ok(output) => {
335                            self.checkpoint_control.ack_completed(output);
336                        }
337                        Err(e) => {
338                            self.failure_recovery(e).await;
339                        }
340                    }
341                },
342                event = self.checkpoint_control.next_event() => {
343                    let result: MetaResult<()> = try {
344                        match event {
345                            CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
346                                let database_id = entering_initializing.database_id();
347                                let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
348                                    resp.root_err.as_ref().map(|root_err| {
349                                        (*worker_id, ScoredError {
350                                            error: Status::internal(&root_err.err_msg),
351                                            score: Score(root_err.score)
352                                        })
353                                    })
354                                }));
355                                Self::report_collect_failure(&self.env, &error);
356                                self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
357                                match self.context.reload_database_runtime_info(database_id).await? { Some(runtime_info) => {
358                                    runtime_info.validate(database_id, &self.active_streaming_nodes).inspect_err(|e| {
359                                        warn!(database_id = database_id.database_id, err = ?e.as_report(), ?runtime_info, "reloaded database runtime info failed to validate");
360                                    })?;
361                                    let workers = InflightFragmentInfo::workers(runtime_info.job_infos.values().flat_map(|job| job.fragment_infos()));
362                                    for worker_id in workers {
363                                        if !self.control_stream_manager.is_connected(worker_id) {
364                                            self.control_stream_manager.try_reconnect_worker(worker_id, entering_initializing.control().inflight_infos(), self.term_id.clone(), &*self.context).await;
365                                        }
366                                    }
367                                    entering_initializing.enter(runtime_info, &mut self.control_stream_manager);
368                                } _ => {
369                                    info!(database_id = database_id.database_id, "database removed after reloading empty runtime info");
370                                    // mark ready to unblock subsequent request
371                                    self.context.mark_ready(MarkReadyOptions::Database(database_id));
372                                    entering_initializing.remove();
373                                }}
374                            }
375                            CheckpointControlEvent::EnteringRunning(entering_running) => {
376                                self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
377                                entering_running.enter();
378                            }
379                        }
380                    };
381                    if let Err(e) = result {
382                        self.failure_recovery(e).await;
383                    }
384                }
385                (worker_id, resp_result) = self.control_stream_manager.next_response() => {
386                    let result: MetaResult<()> = try {
387                        let resp = match resp_result {
388                            Err(err) => {
389                                let failed_databases = self.checkpoint_control.databases_failed_at_worker_err(worker_id);
390                                if !failed_databases.is_empty() {
391                                    if !self.enable_recovery {
392                                        panic!("control stream to worker {} failed but recovery not enabled: {:?}", worker_id, err.as_report());
393                                    }
394                                    if !self.enable_per_database_isolation() {
395                                        Err(err.clone())?;
396                                    }
397                                    Self::report_collect_failure(&self.env, &err);
398                                    for database_id in failed_databases {
399                                        if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
400                                            warn!(worker_id, database_id = database_id.database_id, "database entering recovery on node failure");
401                                            self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
402                                            self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
403                                            // TODO: add log on blocking time
404                                            let output = self.completing_task.wait_completing_task().await?;
405                                            entering_recovery.enter(output, &mut self.control_stream_manager);
406                                        }
407                                    }
408                                }  else {
409                                    warn!(worker_id, "no barrier to collect from worker, ignore err");
410                                }
411                                continue;
412                            }
413                            Ok(resp) => resp,
414                        };
415                        match resp {
416                            Response::CompleteBarrier(resp) => {
417                                self.checkpoint_control.barrier_collected(resp, &mut self.periodic_barriers)?;
418                            },
419                            Response::ReportDatabaseFailure(resp) => {
420                                if !self.enable_recovery {
421                                    panic!("database failure reported but recovery not enabled: {:?}", resp)
422                                }
423                                if !self.enable_per_database_isolation() {
424                                        Err(anyhow!("database {} reset", resp.database_id))?;
425                                    }
426                                let database_id = DatabaseId::new(resp.database_id);
427                                if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
428                                    warn!(database_id = database_id.database_id, "database entering recovery");
429                                    self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
430                                    // TODO: add log on blocking time
431                                    let output = self.completing_task.wait_completing_task().await?;
432                                    entering_recovery.enter(output, &mut self.control_stream_manager);
433                                }
434                            }
435                            Response::ResetDatabase(resp) => {
436                                self.checkpoint_control.on_reset_database_resp(worker_id, resp);
437                            }
438                            other @ Response::Init(_) | other @ Response::Shutdown(_) => {
439                                Err(anyhow!("get expected response: {:?}", other))?;
440                            }
441                        }
442                    };
443                    if let Err(e) = result {
444                        self.failure_recovery(e).await;
445                    }
446                }
447                new_barrier = self.periodic_barriers.next_barrier(&*self.context),
448                    if self
449                        .checkpoint_control
450                        .can_inject_barrier(self.in_flight_barrier_nums) => {
451                    if let Some((_, Command::CreateStreamingJob { info, .. }, _)) = &new_barrier.command {
452                        let worker_ids: HashSet<_> =
453                            info.stream_job_fragments.inner
454                            .actors_to_create()
455                            .flat_map(|(_, _, actors)|
456                                actors.map(|(_, worker_id)| worker_id)
457                            )
458                            .collect();
459                        for worker_id in worker_ids {
460                            if !self.control_stream_manager.is_connected(worker_id) {
461                                self.control_stream_manager.try_reconnect_worker(worker_id, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
462                            }
463                        }
464                    }
465                    if let Some(failed_databases) = self.checkpoint_control.handle_new_barrier(new_barrier, &mut self.control_stream_manager)
466                        && !failed_databases.is_empty() {
467                        if !self.enable_recovery {
468                            panic!(
469                                "failed to inject barrier for some databases but recovery not enabled: {:?}",
470                                failed_databases.iter().map(|(database_id, e)| (database_id, e.as_report())).collect_vec()
471                            );
472                        }
473                        let result: MetaResult<_> = try {
474                            if !self.enable_per_database_isolation() {
475                                let errs = failed_databases.iter().map(|(database_id, e)| (database_id, e.as_report())).collect_vec();
476                                let err = anyhow!("failed to inject barrier for databases: {:?}", errs);
477                                Err(err)?;
478                            } else {
479                                for (database_id, err) in failed_databases {
480                                    if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
481                                        warn!(%database_id, e = %err.as_report(),"database entering recovery on inject failure");
482                                        self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(err).context("inject barrier failure").into()));
483                                        // TODO: add log on blocking time
484                                        let output = self.completing_task.wait_completing_task().await?;
485                                        entering_recovery.enter(output, &mut self.control_stream_manager);
486                                    }
487                                }
488                            }
489                        };
490                        if let Err(e) = result {
491                            self.failure_recovery(e).await;
492                        }
493                    }
494                }
495            }
496            self.checkpoint_control.update_barrier_nums_metrics();
497        }
498    }
499}
500
501impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
502    /// We need to make sure there are no changes when doing recovery
503    pub async fn clear_on_err(&mut self, err: &MetaError) {
504        // join spawned completing command to finish no matter it succeeds or not.
505        let is_err = match replace(&mut self.completing_task, CompletingTask::None) {
506            CompletingTask::None => false,
507            CompletingTask::Completing {
508                epochs_to_ack,
509                join_handle,
510                ..
511            } => {
512                info!("waiting for completing command to finish in recovery");
513                match join_handle.await {
514                    Err(e) => {
515                        warn!(err = %e.as_report(), "failed to join completing task");
516                        true
517                    }
518                    Ok(Err(e)) => {
519                        warn!(
520                            err = %e.as_report(),
521                            "failed to complete barrier during clear"
522                        );
523                        true
524                    }
525                    Ok(Ok(hummock_version_stats)) => {
526                        self.checkpoint_control
527                            .ack_completed(BarrierCompleteOutput {
528                                epochs_to_ack,
529                                hummock_version_stats,
530                            });
531                        false
532                    }
533                }
534            }
535            CompletingTask::Err(_) => true,
536        };
537        if !is_err {
538            // continue to finish the pending collected barrier.
539            while let Some(task) = self.checkpoint_control.next_complete_barrier_task(None) {
540                let epochs_to_ack = task.epochs_to_ack();
541                match task
542                    .complete_barrier(&*self.context, self.env.clone())
543                    .await
544                {
545                    Ok(hummock_version_stats) => {
546                        self.checkpoint_control
547                            .ack_completed(BarrierCompleteOutput {
548                                epochs_to_ack,
549                                hummock_version_stats,
550                            });
551                    }
552                    Err(e) => {
553                        error!(
554                            err = %e.as_report(),
555                            "failed to complete barrier during recovery"
556                        );
557                        break;
558                    }
559                }
560            }
561        }
562        self.checkpoint_control.clear_on_err(err);
563    }
564}
565
566impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
567    /// Set barrier manager status.
568    async fn failure_recovery(&mut self, err: MetaError) {
569        self.clear_on_err(&err).await;
570
571        if self.enable_recovery {
572            let span = tracing::info_span!(
573                "failure_recovery",
574                error = %err.as_report(),
575            );
576
577            crate::telemetry::report_event(
578                risingwave_pb::telemetry::TelemetryEventStage::Recovery,
579                "failure_recovery",
580                0,
581                None,
582                None,
583                None,
584            );
585
586            let reason = RecoveryReason::Failover(err);
587
588            // No need to clean dirty tables for barrier recovery,
589            // The foreground stream job should cleanup their own tables.
590            self.recovery(false, reason).instrument(span).await;
591        } else {
592            panic!(
593                "a streaming error occurred while recovery is disabled, aborting: {:?}",
594                err.as_report()
595            );
596        }
597    }
598
599    async fn adhoc_recovery(&mut self) {
600        let err = MetaErrorInner::AdhocRecovery.into();
601        self.clear_on_err(&err).await;
602
603        let span = tracing::info_span!(
604            "adhoc_recovery",
605            error = %err.as_report(),
606        );
607
608        crate::telemetry::report_event(
609            risingwave_pb::telemetry::TelemetryEventStage::Recovery,
610            "adhoc_recovery",
611            0,
612            None,
613            None,
614            None,
615        );
616
617        // No need to clean dirty tables for barrier recovery,
618        // The foreground stream job should cleanup their own tables.
619        self.recovery(false, RecoveryReason::Adhoc)
620            .instrument(span)
621            .await;
622    }
623}
624
625impl<C> GlobalBarrierWorker<C> {
626    /// Send barrier-complete-rpc and wait for responses from all CNs
627    pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
628        // Record failure in event log.
629        use risingwave_pb::meta::event_log;
630        let event = event_log::EventCollectBarrierFail {
631            error: error.to_report_string(),
632        };
633        env.event_log_manager_ref()
634            .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
635    }
636}
637
638mod retry_strategy {
639    use std::time::Duration;
640
641    use tokio_retry::strategy::{ExponentialBackoff, jitter};
642
643    // Retry base interval in milliseconds.
644    const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
645    // Retry max interval.
646    const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
647
648    mod retry_backoff_future {
649        use std::future::Future;
650        use std::time::Duration;
651
652        use tokio::time::sleep;
653
654        pub(crate) type RetryBackoffFuture = impl Future<Output = ()> + Unpin + Send + 'static;
655        pub(super) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
656            Box::pin(sleep(duration))
657        }
658    }
659    pub(crate) use retry_backoff_future::*;
660
661    pub(crate) type RetryBackoffStrategy =
662        impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
663
664    #[inline(always)]
665    /// Initialize a retry strategy for operation in recovery.
666    pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> {
667        ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
668            .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
669            .map(jitter)
670    }
671
672    pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
673        get_retry_strategy().map(get_retry_backoff_future)
674    }
675}
676
677pub(crate) use retry_strategy::*;
678use risingwave_common::error::tonic::extra::{Score, ScoredError};
679use risingwave_meta_model::WorkerId;
680use risingwave_pb::meta::event_log::{Event, EventRecovery};
681
682use crate::barrier::edge_builder::FragmentEdgeBuilder;
683use crate::controller::fragment::InflightFragmentInfo;
684
685impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
686    /// Recovery the whole cluster from the latest epoch.
687    ///
688    /// If `paused_reason` is `Some`, all data sources (including connectors and DMLs) will be
689    /// immediately paused after recovery, until the user manually resume them either by restarting
690    /// the cluster or `risectl` command. Used for debugging purpose.
691    ///
692    /// Returns the new state of the barrier manager after recovery.
693    pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
694        // Clear all control streams to release resources (connections to compute nodes) first.
695        self.control_stream_manager.clear();
696
697        let reason_str = match &recovery_reason {
698            RecoveryReason::Bootstrap => "bootstrap".to_owned(),
699            RecoveryReason::Failover(err) => {
700                format!("failed over: {}", err.as_report())
701            }
702            RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
703        };
704        self.context.abort_and_mark_blocked(None, recovery_reason);
705
706        self.recovery_inner(is_paused, reason_str).await;
707        self.context.mark_ready(MarkReadyOptions::Global {
708            blocked_databases: self.checkpoint_control.recovering_databases().collect(),
709        });
710    }
711
712    #[await_tree::instrument("recovery({recovery_reason})")]
713    async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
714        let event_log_manager_ref = self.env.event_log_manager_ref();
715
716        tracing::info!("recovery start!");
717        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
718            EventRecovery::global_recovery_start(recovery_reason.clone()),
719        )]);
720
721        let retry_strategy = get_retry_strategy();
722
723        // We take retry into consideration because this is the latency user sees for a cluster to
724        // get recovered.
725        let recovery_timer = GLOBAL_META_METRICS
726            .recovery_latency
727            .with_label_values(&["global"])
728            .start_timer();
729
730        let enable_per_database_isolation = self.enable_per_database_isolation();
731
732        let new_state = tokio_retry::Retry::spawn(retry_strategy, || async {
733            self.env.stream_client_pool().invalidate_all();
734            // We need to notify_creating_job_failed in every recovery retry, because in outer create_streaming_job handler,
735            // it holds the reschedule_read_lock and wait for creating job to finish, and caused the following scale_actor fail
736            // to acquire the reschedule_write_lock, and then keep recovering, and then deadlock.
737            // TODO: refactor and fix this hacky implementation.
738            self.context
739                .notify_creating_job_failed(None, recovery_reason.clone())
740                .await;
741            let runtime_info_snapshot = self
742                .context
743                .reload_runtime_info()
744                .await?;
745            runtime_info_snapshot.validate().inspect_err(|e| {
746                warn!(err = ?e.as_report(), ?runtime_info_snapshot, "reloaded runtime info failed to validate");
747            })?;
748            let BarrierWorkerRuntimeInfoSnapshot {
749                active_streaming_nodes,
750                database_job_infos,
751                mut state_table_committed_epochs,
752                mut state_table_log_epochs,
753                mut subscription_infos,
754                stream_actors,
755                fragment_relations,
756                mut source_splits,
757                mut background_jobs,
758                hummock_version_stats,
759            } = runtime_info_snapshot;
760
761            self.sink_manager.reset().await;
762            let term_id = Uuid::new_v4().to_string();
763
764            let mut control_stream_manager = ControlStreamManager::new(self.env.clone());
765            let reset_start_time = Instant::now();
766            let unconnected_worker = control_stream_manager
767                .reset(
768                    active_streaming_nodes.current(),
769                    term_id.clone(),
770                    &*self.context,
771                )
772                .await;
773            info!(elapsed=?reset_start_time.elapsed(), ?unconnected_worker, "control stream reset");
774
775            {
776                let mut builder = FragmentEdgeBuilder::new(database_job_infos.values().flat_map(|info| info.values().flatten()), &control_stream_manager);
777                builder.add_relations(&fragment_relations);
778                let mut edges = builder.build();
779
780                let mut collected_databases = HashMap::new();
781                let mut collecting_databases = HashMap::new();
782                let mut failed_databases = HashSet::new();
783                for (database_id, jobs) in database_job_infos {
784                    let result = control_stream_manager.inject_database_initial_barrier(
785                        database_id,
786                        jobs,
787                        &mut state_table_committed_epochs,
788                        &mut state_table_log_epochs,
789                        &mut edges,
790                        &stream_actors,
791                        &mut source_splits,
792                        &mut background_jobs,
793                        subscription_infos.remove(&database_id).unwrap_or_default(),
794                        is_paused,
795                        &hummock_version_stats,
796                    );
797                    let node_to_collect = match result {
798                        Ok(info) => {
799                            info
800                        }
801                        Err(e) => {
802                            warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
803                            assert!(failed_databases.insert(database_id), "non-duplicate");
804                            continue;
805                        }
806                    };
807                    if !node_to_collect.is_collected() {
808                        assert!(collecting_databases.insert(database_id, node_to_collect).is_none());
809                    } else {
810                        warn!(database_id = database_id.database_id, "database has no node to inject initial barrier");
811                        assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
812                    }
813                }
814                while !collecting_databases.is_empty() {
815                    let (worker_id, result) =
816                        control_stream_manager.next_response().await;
817                    let resp = match result {
818                        Err(e) => {
819                            warn!(worker_id, err = %e.as_report(), "worker node failure during recovery");
820                            for (failed_database_id,_ ) in collecting_databases.extract_if(|_, node_to_collect| {
821                                !node_to_collect.is_valid_after_worker_err(worker_id)
822                            }) {
823                                warn!(%failed_database_id, worker_id, "database failed to recovery in global recovery due to worker node err");
824                                assert!(failed_databases.insert(failed_database_id));
825                            }
826                            continue;
827                        }
828                        Ok(resp) => {
829                            match resp {
830                                Response::CompleteBarrier(resp) => {
831                                    resp
832                                }
833                                Response::ReportDatabaseFailure(resp) => {
834                                    let database_id = DatabaseId::new(resp.database_id);
835                                    if collecting_databases.remove(&database_id).is_some() {
836                                        warn!(%database_id, worker_id, "database reset during global recovery");
837                                        assert!(failed_databases.insert(database_id));
838                                    } else if collected_databases.remove(&database_id).is_some() {
839                                        warn!(%database_id, worker_id, "database initialized but later reset during global recovery");
840                                        assert!(failed_databases.insert(database_id));
841                                    } else {
842                                        assert!(failed_databases.contains(&database_id));
843                                    }
844                                    continue;
845                                }
846                                other @ (Response::Init(_) | Response::Shutdown(_) | Response::ResetDatabase(_))=> {
847                                    return Err(anyhow!("get unexpected resp {:?}", other).into());
848                                }
849                            }
850                        }
851                    };
852                    assert_eq!(worker_id, resp.worker_id as WorkerId);
853                    let database_id = DatabaseId::new(resp.database_id);
854                    if failed_databases.contains(&database_id) {
855                        assert!(!collecting_databases.contains_key(&database_id));
856                        // ignore the lately arrived collect resp of failed database
857                        continue;
858                    }
859                    let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
860                        unreachable!("should exist")
861                    };
862                    let node_to_collect = entry.get_mut();
863                    node_to_collect.collect_resp(resp);
864                    if node_to_collect.is_collected() {
865                        let node_to_collect = entry.remove();
866                        assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
867                    }
868                }
869                debug!("collected initial barrier");
870                if !stream_actors.is_empty() {
871                    warn!(actor_ids = ?stream_actors.keys().collect_vec(), "unused stream actors in recovery");
872                }
873                if !source_splits.is_empty() {
874                    warn!(actor_ids = ?source_splits.keys().collect_vec(), "unused actor source splits in recovery");
875                }
876                if !background_jobs.is_empty() {
877                    warn!(job_ids = ?background_jobs.keys().collect_vec(), "unused recovered background mview in recovery");
878                }
879                if !subscription_infos.is_empty() {
880                    warn!(?subscription_infos, "unused subscription infos in recovery");
881                }
882                if !state_table_committed_epochs.is_empty() {
883                    warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
884                }
885                if !enable_per_database_isolation && !failed_databases.is_empty() {
886                    return Err(anyhow!(
887                        "global recovery failed due to failure of databases {:?}",
888                        failed_databases.iter().map(|database_id| database_id.database_id).collect_vec()).into()
889                    );
890                }
891                let checkpoint_control = CheckpointControl::recover(
892                    collected_databases,
893                    failed_databases,
894                    &mut control_stream_manager,
895                    hummock_version_stats,
896                    self.env.clone(),
897                );
898                Ok((
899                    active_streaming_nodes,
900                    control_stream_manager,
901                    checkpoint_control,
902                    term_id,
903                ))
904            }
905        }.inspect_err(|err: &MetaError| {
906            tracing::error!(error = %err.as_report(), "recovery failed");
907            event_log_manager_ref.add_event_logs(vec![Event::Recovery(
908                EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
909            )]);
910            GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
911        }))
912            .instrument(tracing::info_span!("recovery_attempt"))
913            .await
914            .expect("Retry until recovery success.");
915
916        let duration = recovery_timer.stop_and_record();
917
918        (
919            self.active_streaming_nodes,
920            self.control_stream_manager,
921            self.checkpoint_control,
922            self.term_id,
923        ) = new_state;
924
925        tracing::info!("recovery success");
926
927        let recovering_databases = self
928            .checkpoint_control
929            .recovering_databases()
930            .map(|database| database.database_id)
931            .collect_vec();
932        let running_databases = self
933            .checkpoint_control
934            .running_databases()
935            .map(|database| database.database_id)
936            .collect_vec();
937
938        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
939            EventRecovery::global_recovery_success(
940                recovery_reason.clone(),
941                duration as f32,
942                running_databases,
943                recovering_databases,
944            ),
945        )]);
946
947        self.env
948            .notification_manager()
949            .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
950        self.env
951            .notification_manager()
952            .notify_compute_without_version(Operation::Update, Info::Recovery(Recovery {}));
953    }
954}