risingwave_meta/barrier/
worker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::anyhow;
22use arc_swap::ArcSwap;
23use futures::TryFutureExt;
24use itertools::Itertools;
25use risingwave_common::catalog::DatabaseId;
26use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_pb::meta::Recovery;
29use risingwave_pb::meta::subscribe_response::{Info, Operation};
30use risingwave_pb::stream_service::streaming_control_stream_response::Response;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tokio::sync::oneshot::{Receiver, Sender};
34use tokio::task::JoinHandle;
35use tokio::time::Instant;
36use tonic::Status;
37use tracing::{Instrument, debug, error, info, warn};
38use uuid::Uuid;
39
40use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
41use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
42use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
43use crate::barrier::rpc::{ControlStreamManager, merge_node_rpc_errors};
44use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
45use crate::barrier::{
46    BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command,
47    RecoveryReason, schedule,
48};
49use crate::error::MetaErrorInner;
50use crate::hummock::HummockManagerRef;
51use crate::manager::sink_coordination::SinkCoordinatorManager;
52use crate::manager::{
53    ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
54    MetadataManager,
55};
56use crate::rpc::metrics::GLOBAL_META_METRICS;
57use crate::stream::{ScaleControllerRef, SourceManagerRef};
58use crate::{MetaError, MetaResult};
59
60/// [`crate::barrier::worker::GlobalBarrierWorker`] sends barriers to all registered compute nodes and
61/// collect them, with monotonic increasing epoch numbers. On compute nodes, `LocalBarrierManager`
62/// in `risingwave_stream` crate will serve these requests and dispatch them to source actors.
63///
64/// Configuration change in our system is achieved by the mutation in the barrier. Thus,
65/// [`crate::barrier::worker::GlobalBarrierWorker`] provides a set of interfaces like a state machine,
66/// accepting [`crate::barrier::command::Command`] that carries info to build `Mutation`. To keep the consistency between
67/// barrier manager and meta store, some actions like "drop materialized view" or "create mv on mv"
68/// must be done in barrier manager transactional using [`crate::barrier::command::Command`].
69pub(super) struct GlobalBarrierWorker<C> {
70    /// Enable recovery or not when failover.
71    enable_recovery: bool,
72
73    /// The queue of scheduled barriers.
74    periodic_barriers: PeriodicBarriers,
75
76    /// Whether per database failure isolation is enabled in system parameters.
77    system_enable_per_database_isolation: bool,
78
79    pub(super) context: Arc<C>,
80
81    env: MetaSrvEnv,
82
83    checkpoint_control: CheckpointControl,
84
85    /// Command that has been collected but is still completing.
86    /// The join handle of the completing future is stored.
87    completing_task: CompletingTask,
88
89    request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
90
91    active_streaming_nodes: ActiveStreamingWorkerNodes,
92
93    sink_manager: SinkCoordinatorManager,
94
95    control_stream_manager: ControlStreamManager,
96
97    term_id: String,
98}
99
100impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
101    pub(super) async fn new_inner(
102        env: MetaSrvEnv,
103        sink_manager: SinkCoordinatorManager,
104        request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
105        context: Arc<C>,
106    ) -> Self {
107        let enable_recovery = env.opts.enable_recovery;
108
109        let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized();
110
111        let control_stream_manager = ControlStreamManager::new(env.clone());
112
113        let reader = env.system_params_reader().await;
114        let system_enable_per_database_isolation = reader.per_database_isolation();
115        // Load config will be performed in bootstrap phase.
116        let periodic_barriers = PeriodicBarriers::default();
117
118        let checkpoint_control = CheckpointControl::new(env.clone());
119        Self {
120            enable_recovery,
121            periodic_barriers,
122            system_enable_per_database_isolation,
123            context,
124            env,
125            checkpoint_control,
126            completing_task: CompletingTask::None,
127            request_rx,
128            active_streaming_nodes,
129            sink_manager,
130            control_stream_manager,
131            term_id: "uninitialized".into(),
132        }
133    }
134}
135
136impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
137    /// Create a new [`crate::barrier::worker::GlobalBarrierWorker`].
138    pub async fn new(
139        scheduled_barriers: schedule::ScheduledBarriers,
140        env: MetaSrvEnv,
141        metadata_manager: MetadataManager,
142        hummock_manager: HummockManagerRef,
143        source_manager: SourceManagerRef,
144        sink_manager: SinkCoordinatorManager,
145        scale_controller: ScaleControllerRef,
146        request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
147        barrier_scheduler: schedule::BarrierScheduler,
148    ) -> Self {
149        let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
150
151        let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
152            scheduled_barriers,
153            status,
154            metadata_manager,
155            hummock_manager,
156            source_manager,
157            scale_controller,
158            env.clone(),
159            barrier_scheduler,
160        ));
161
162        Self::new_inner(env, sink_manager, request_rx, context).await
163    }
164
165    pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
166        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
167        let fut = (self.env.await_tree_reg())
168            .register_derived_root("Global Barrier Worker")
169            .instrument(self.run(shutdown_rx));
170        let join_handle = tokio::spawn(fut);
171
172        (join_handle, shutdown_tx)
173    }
174
175    /// Check whether we should pause on bootstrap from the system parameter and reset it.
176    async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
177        let paused = self
178            .env
179            .system_params_reader()
180            .await
181            .pause_on_next_bootstrap();
182        if paused {
183            warn!(
184                "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
185                 It will now be reset to `false`. \
186                 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
187                PAUSE_ON_NEXT_BOOTSTRAP_KEY
188            );
189            self.env
190                .system_params_manager_impl_ref()
191                .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
192                .await?;
193        }
194        Ok(paused)
195    }
196
197    /// Start an infinite loop to take scheduled barriers and send them.
198    async fn run(mut self, shutdown_rx: Receiver<()>) {
199        tracing::info!(
200            "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
201            self.enable_recovery,
202            self.checkpoint_control.in_flight_barrier_nums,
203        );
204
205        if !self.enable_recovery {
206            let job_exist = self
207                .context
208                .metadata_manager
209                .catalog_controller
210                .has_any_streaming_jobs()
211                .await
212                .unwrap();
213            if job_exist {
214                panic!(
215                    "Some streaming jobs already exist in meta, please start with recovery enabled \
216                or clean up the metadata using `./risedev clean-data`"
217                );
218            }
219        }
220
221        {
222            // Bootstrap recovery. Here we simply trigger a recovery process to achieve the
223            // consistency.
224            // Even if there's no actor to recover, we still go through the recovery process to
225            // inject the first `Initial` barrier.
226            let span = tracing::info_span!("bootstrap_recovery");
227            crate::telemetry::report_event(
228                risingwave_pb::telemetry::TelemetryEventStage::Recovery,
229                "normal_recovery",
230                0,
231                None,
232                None,
233                None,
234            );
235
236            let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
237
238            self.recovery(paused, RecoveryReason::Bootstrap)
239                .instrument(span)
240                .await;
241        }
242
243        self.run_inner(shutdown_rx).await
244    }
245}
246
247impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
248    fn enable_per_database_isolation(&self) -> bool {
249        self.system_enable_per_database_isolation && {
250            if let Err(e) =
251                risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()
252            {
253                warn!(error = %e.as_report(), "DatabaseFailureIsolation disabled by license");
254                false
255            } else {
256                true
257            }
258        }
259    }
260
261    pub(super) async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
262        let (local_notification_tx, mut local_notification_rx) =
263            tokio::sync::mpsc::unbounded_channel();
264        self.env
265            .notification_manager()
266            .insert_local_sender(local_notification_tx);
267
268        // Start the event loop.
269        loop {
270            tokio::select! {
271                biased;
272
273                // Shutdown
274                _ = &mut shutdown_rx => {
275                    tracing::info!("Barrier manager is stopped");
276                    break;
277                }
278
279                request = self.request_rx.recv() => {
280                    if let Some(request) = request {
281                        match request {
282                            BarrierManagerRequest::GetDdlProgress(result_tx) => {
283                                let progress = self.checkpoint_control.gen_ddl_progress();
284                                if result_tx.send(progress).is_err() {
285                                    error!("failed to send get ddl progress");
286                                }
287                            }// Handle adhoc recovery triggered by user.
288                            BarrierManagerRequest::AdhocRecovery(sender) => {
289                                self.adhoc_recovery().await;
290                                if sender.send(()).is_err() {
291                                    warn!("failed to notify finish of adhoc recovery");
292                                }
293                            }
294                            BarrierManagerRequest::UpdateDatabaseBarrier {
295                                database_id,
296                                barrier_interval_ms,
297                                checkpoint_frequency,
298                                sender,
299                            } => {
300                                self.periodic_barriers
301                                    .update_database_barrier(
302                                        database_id,
303                                        barrier_interval_ms,
304                                        checkpoint_frequency,
305                                    );
306                                if sender.send(()).is_err() {
307                                    warn!("failed to notify finish of update database barrier");
308                                }
309                            }
310                        }
311                    } else {
312                        tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
313                        return;
314                    }
315                }
316
317                changed_worker = self.active_streaming_nodes.changed() => {
318                    #[cfg(debug_assertions)]
319                    {
320                        self.active_streaming_nodes.validate_change().await;
321                    }
322
323                    info!(?changed_worker, "worker changed");
324
325                    if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker {
326                        self.control_stream_manager.add_worker(node, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
327                    }
328                }
329
330                notification = local_notification_rx.recv() => {
331                    let notification = notification.unwrap();
332                    if let LocalNotification::SystemParamsChange(p) = notification {
333                        {
334                            self.periodic_barriers.set_sys_barrier_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
335                            self.periodic_barriers
336                                .set_sys_checkpoint_frequency(p.checkpoint_frequency());
337                            self.system_enable_per_database_isolation = p.per_database_isolation();
338                        }
339                    }
340                }
341                complete_result = self
342                    .completing_task
343                    .next_completed_barrier(
344                        &mut self.periodic_barriers,
345                        &mut self.checkpoint_control,
346                        &mut self.control_stream_manager,
347                        &self.context,
348                        &self.env,
349                ) => {
350                    match complete_result {
351                        Ok(output) => {
352                            self.checkpoint_control.ack_completed(output);
353                        }
354                        Err(e) => {
355                            self.failure_recovery(e).await;
356                        }
357                    }
358                },
359                event = self.checkpoint_control.next_event() => {
360                    let result: MetaResult<()> = try {
361                        match event {
362                            CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
363                                let database_id = entering_initializing.database_id();
364                                let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
365                                    resp.root_err.as_ref().map(|root_err| {
366                                        (*worker_id, ScoredError {
367                                            error: Status::internal(&root_err.err_msg),
368                                            score: Score(root_err.score)
369                                        })
370                                    })
371                                }));
372                                Self::report_collect_failure(&self.env, &error);
373                                self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
374                                match self.context.reload_database_runtime_info(database_id).await? { Some(runtime_info) => {
375                                    runtime_info.validate(database_id, &self.active_streaming_nodes).inspect_err(|e| {
376                                        warn!(database_id = database_id.database_id, err = ?e.as_report(), ?runtime_info, "reloaded database runtime info failed to validate");
377                                    })?;
378                                    let workers = InflightFragmentInfo::workers(runtime_info.job_infos.values().flat_map(|job| job.fragment_infos()));
379                                    for worker_id in workers {
380                                        if !self.control_stream_manager.is_connected(worker_id) {
381                                            self.control_stream_manager.try_reconnect_worker(worker_id, entering_initializing.control().inflight_infos(), self.term_id.clone(), &*self.context).await;
382                                        }
383                                    }
384                                    entering_initializing.enter(runtime_info, &mut self.control_stream_manager);
385                                } _ => {
386                                    info!(database_id = database_id.database_id, "database removed after reloading empty runtime info");
387                                    // mark ready to unblock subsequent request
388                                    self.context.mark_ready(MarkReadyOptions::Database(database_id));
389                                    entering_initializing.remove();
390                                }}
391                            }
392                            CheckpointControlEvent::EnteringRunning(entering_running) => {
393                                self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
394                                entering_running.enter();
395                            }
396                        }
397                    };
398                    if let Err(e) = result {
399                        self.failure_recovery(e).await;
400                    }
401                }
402                (worker_id, resp_result) = self.control_stream_manager.next_response() => {
403                    let result: MetaResult<()> = try {
404                        let resp = match resp_result {
405                            Err(err) => {
406                                let failed_databases = self.checkpoint_control.databases_failed_at_worker_err(worker_id);
407                                if !failed_databases.is_empty() {
408                                    if !self.enable_recovery {
409                                        panic!("control stream to worker {} failed but recovery not enabled: {:?}", worker_id, err.as_report());
410                                    }
411                                    if !self.enable_per_database_isolation() {
412                                        Err(err.clone())?;
413                                    }
414                                    Self::report_collect_failure(&self.env, &err);
415                                    for database_id in failed_databases {
416                                        if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
417                                            warn!(worker_id, database_id = database_id.database_id, "database entering recovery on node failure");
418                                            self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
419                                            self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
420                                            // TODO: add log on blocking time
421                                            let output = self.completing_task.wait_completing_task().await?;
422                                            entering_recovery.enter(output, &mut self.control_stream_manager);
423                                        }
424                                    }
425                                }  else {
426                                    warn!(worker_id, "no barrier to collect from worker, ignore err");
427                                }
428                                continue;
429                            }
430                            Ok(resp) => resp,
431                        };
432                        match resp {
433                            Response::CompleteBarrier(resp) => {
434                                self.checkpoint_control.barrier_collected(resp, &mut self.periodic_barriers)?;
435                            },
436                            Response::ReportDatabaseFailure(resp) => {
437                                if !self.enable_recovery {
438                                    panic!("database failure reported but recovery not enabled: {:?}", resp)
439                                }
440                                if !self.enable_per_database_isolation() {
441                                        Err(anyhow!("database {} reset", resp.database_id))?;
442                                    }
443                                let database_id = DatabaseId::new(resp.database_id);
444                                if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
445                                    warn!(database_id = database_id.database_id, "database entering recovery");
446                                    self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
447                                    // TODO: add log on blocking time
448                                    let output = self.completing_task.wait_completing_task().await?;
449                                    entering_recovery.enter(output, &mut self.control_stream_manager);
450                                }
451                            }
452                            Response::ResetDatabase(resp) => {
453                                self.checkpoint_control.on_reset_database_resp(worker_id, resp);
454                            }
455                            other @ Response::Init(_) | other @ Response::Shutdown(_) => {
456                                Err(anyhow!("get expected response: {:?}", other))?;
457                            }
458                        }
459                    };
460                    if let Err(e) = result {
461                        self.failure_recovery(e).await;
462                    }
463                }
464                new_barrier = self.periodic_barriers.next_barrier(&*self.context) => {
465                    if let Some((Command::CreateStreamingJob { info, .. }, _)) = &new_barrier.command {
466                        let worker_ids: HashSet<_> =
467                            info.stream_job_fragments.inner
468                            .actors_to_create()
469                            .flat_map(|(_, _, actors)|
470                                actors.map(|(_, worker_id)| worker_id)
471                            )
472                            .collect();
473                        for worker_id in worker_ids {
474                            if !self.control_stream_manager.is_connected(worker_id) {
475                                self.control_stream_manager.try_reconnect_worker(worker_id, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
476                            }
477                        }
478                    }
479                    let database_id = new_barrier.database_id;
480                    if let Err(e) = self.checkpoint_control.handle_new_barrier(new_barrier, &mut self.control_stream_manager) {
481                        if !self.enable_recovery {
482                            panic!(
483                                "failed to inject barrier to some databases but recovery not enabled: {:?}", (
484                                    database_id,
485                                    e.as_report()
486                                )
487                            );
488                        }
489                        let result: MetaResult<_> = try {
490                            if !self.enable_per_database_isolation() {
491                                let err = anyhow!("failed to inject barrier to databases: {:?}", (database_id, e.as_report()));
492                                Err(err)?;
493                            } else if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
494                                warn!(%database_id, e = %e.as_report(),"database entering recovery on inject failure");
495                                self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(e).context("inject barrier failure").into()));
496                                // TODO: add log on blocking time
497                                let output = self.completing_task.wait_completing_task().await?;
498                                entering_recovery.enter(output, &mut self.control_stream_manager);
499                            }
500                        };
501                        if let Err(e) = result {
502                            self.failure_recovery(e).await;
503                        }
504                    }
505                }
506            }
507            self.checkpoint_control.update_barrier_nums_metrics();
508        }
509    }
510}
511
512impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
513    /// We need to make sure there are no changes when doing recovery
514    pub async fn clear_on_err(&mut self, err: &MetaError) {
515        // join spawned completing command to finish no matter it succeeds or not.
516        let is_err = match replace(&mut self.completing_task, CompletingTask::None) {
517            CompletingTask::None => false,
518            CompletingTask::Completing {
519                epochs_to_ack,
520                join_handle,
521                ..
522            } => {
523                info!("waiting for completing command to finish in recovery");
524                match join_handle.await {
525                    Err(e) => {
526                        warn!(err = %e.as_report(), "failed to join completing task");
527                        true
528                    }
529                    Ok(Err(e)) => {
530                        warn!(
531                            err = %e.as_report(),
532                            "failed to complete barrier during clear"
533                        );
534                        true
535                    }
536                    Ok(Ok(hummock_version_stats)) => {
537                        self.checkpoint_control
538                            .ack_completed(BarrierCompleteOutput {
539                                epochs_to_ack,
540                                hummock_version_stats,
541                            });
542                        false
543                    }
544                }
545            }
546            CompletingTask::Err(_) => true,
547        };
548        if !is_err {
549            // continue to finish the pending collected barrier.
550            while let Some(task) = self.checkpoint_control.next_complete_barrier_task(None) {
551                let epochs_to_ack = task.epochs_to_ack();
552                match task
553                    .complete_barrier(&*self.context, self.env.clone())
554                    .await
555                {
556                    Ok(hummock_version_stats) => {
557                        self.checkpoint_control
558                            .ack_completed(BarrierCompleteOutput {
559                                epochs_to_ack,
560                                hummock_version_stats,
561                            });
562                    }
563                    Err(e) => {
564                        error!(
565                            err = %e.as_report(),
566                            "failed to complete barrier during recovery"
567                        );
568                        break;
569                    }
570                }
571            }
572        }
573        self.checkpoint_control.clear_on_err(err);
574    }
575}
576
577impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
578    /// Set barrier manager status.
579    async fn failure_recovery(&mut self, err: MetaError) {
580        self.clear_on_err(&err).await;
581
582        if self.enable_recovery {
583            let span = tracing::info_span!(
584                "failure_recovery",
585                error = %err.as_report(),
586            );
587
588            crate::telemetry::report_event(
589                risingwave_pb::telemetry::TelemetryEventStage::Recovery,
590                "failure_recovery",
591                0,
592                None,
593                None,
594                None,
595            );
596
597            let reason = RecoveryReason::Failover(err);
598
599            // No need to clean dirty tables for barrier recovery,
600            // The foreground stream job should cleanup their own tables.
601            self.recovery(false, reason).instrument(span).await;
602        } else {
603            panic!(
604                "a streaming error occurred while recovery is disabled, aborting: {:?}",
605                err.as_report()
606            );
607        }
608    }
609
610    async fn adhoc_recovery(&mut self) {
611        let err = MetaErrorInner::AdhocRecovery.into();
612        self.clear_on_err(&err).await;
613
614        let span = tracing::info_span!(
615            "adhoc_recovery",
616            error = %err.as_report(),
617        );
618
619        crate::telemetry::report_event(
620            risingwave_pb::telemetry::TelemetryEventStage::Recovery,
621            "adhoc_recovery",
622            0,
623            None,
624            None,
625            None,
626        );
627
628        // No need to clean dirty tables for barrier recovery,
629        // The foreground stream job should cleanup their own tables.
630        self.recovery(false, RecoveryReason::Adhoc)
631            .instrument(span)
632            .await;
633    }
634}
635
636impl<C> GlobalBarrierWorker<C> {
637    /// Send barrier-complete-rpc and wait for responses from all CNs
638    pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
639        // Record failure in event log.
640        use risingwave_pb::meta::event_log;
641        let event = event_log::EventCollectBarrierFail {
642            error: error.to_report_string(),
643        };
644        env.event_log_manager_ref()
645            .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
646    }
647}
648
649mod retry_strategy {
650    use std::time::Duration;
651
652    use tokio_retry::strategy::{ExponentialBackoff, jitter};
653
654    // Retry base interval in milliseconds.
655    const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
656    // Retry max interval.
657    const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
658
659    // MrCroxx: Use concrete type here to prevent unsolved compiler issue.
660    // Feel free to replace the concrete type with TAIT after fixed.
661
662    // mod retry_backoff_future {
663    //     use std::future::Future;
664    //     use std::time::Duration;
665    //
666    //     use tokio::time::sleep;
667    //
668    //     pub(crate) type RetryBackoffFuture = impl Future<Output = ()> + Unpin + Send + 'static;
669    //
670    //     #[define_opaque(RetryBackoffFuture)]
671    //     pub(super) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
672    //         Box::pin(sleep(duration))
673    //     }
674    // }
675    // pub(crate) use retry_backoff_future::*;
676
677    pub(crate) type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
678
679    pub(crate) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
680        Box::pin(tokio::time::sleep(duration))
681    }
682
683    pub(crate) type RetryBackoffStrategy =
684        impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
685
686    /// Initialize a retry strategy for operation in recovery.
687    #[inline(always)]
688    pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> + Send + 'static {
689        ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
690            .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
691            .map(jitter)
692    }
693
694    #[define_opaque(RetryBackoffStrategy)]
695    pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
696        get_retry_strategy().map(get_retry_backoff_future)
697    }
698}
699
700pub(crate) use retry_strategy::*;
701use risingwave_common::error::tonic::extra::{Score, ScoredError};
702use risingwave_meta_model::WorkerId;
703use risingwave_pb::meta::event_log::{Event, EventRecovery};
704
705use crate::barrier::edge_builder::FragmentEdgeBuilder;
706use crate::controller::fragment::InflightFragmentInfo;
707
708impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
709    /// Recovery the whole cluster from the latest epoch.
710    ///
711    /// If `paused_reason` is `Some`, all data sources (including connectors and DMLs) will be
712    /// immediately paused after recovery, until the user manually resume them either by restarting
713    /// the cluster or `risectl` command. Used for debugging purpose.
714    ///
715    /// Returns the new state of the barrier manager after recovery.
716    pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
717        // Clear all control streams to release resources (connections to compute nodes) first.
718        self.control_stream_manager.clear();
719
720        let reason_str = match &recovery_reason {
721            RecoveryReason::Bootstrap => "bootstrap".to_owned(),
722            RecoveryReason::Failover(err) => {
723                format!("failed over: {}", err.as_report())
724            }
725            RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
726        };
727        self.context.abort_and_mark_blocked(None, recovery_reason);
728
729        self.recovery_inner(is_paused, reason_str).await;
730        self.context.mark_ready(MarkReadyOptions::Global {
731            blocked_databases: self.checkpoint_control.recovering_databases().collect(),
732        });
733    }
734
735    #[await_tree::instrument("recovery({recovery_reason})")]
736    async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
737        let event_log_manager_ref = self.env.event_log_manager_ref();
738
739        tracing::info!("recovery start!");
740        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
741            EventRecovery::global_recovery_start(recovery_reason.clone()),
742        )]);
743
744        let retry_strategy = get_retry_strategy();
745
746        // We take retry into consideration because this is the latency user sees for a cluster to
747        // get recovered.
748        let recovery_timer = GLOBAL_META_METRICS
749            .recovery_latency
750            .with_label_values(&["global"])
751            .start_timer();
752
753        let enable_per_database_isolation = self.enable_per_database_isolation();
754
755        let new_state = tokio_retry::Retry::spawn(retry_strategy, || async {
756            self.env.stream_client_pool().invalidate_all();
757            // We need to notify_creating_job_failed in every recovery retry, because in outer create_streaming_job handler,
758            // it holds the reschedule_read_lock and wait for creating job to finish, and caused the following scale_actor fail
759            // to acquire the reschedule_write_lock, and then keep recovering, and then deadlock.
760            // TODO: refactor and fix this hacky implementation.
761            self.context
762                .notify_creating_job_failed(None, recovery_reason.clone())
763                .await;
764            let runtime_info_snapshot = self
765                .context
766                .reload_runtime_info()
767                .await?;
768            runtime_info_snapshot.validate().inspect_err(|e| {
769                warn!(err = ?e.as_report(), ?runtime_info_snapshot, "reloaded runtime info failed to validate");
770            })?;
771            let BarrierWorkerRuntimeInfoSnapshot {
772                active_streaming_nodes,
773                database_job_infos,
774                mut state_table_committed_epochs,
775                mut state_table_log_epochs,
776                mut subscription_infos,
777                stream_actors,
778                fragment_relations,
779                mut source_splits,
780                mut background_jobs,
781                hummock_version_stats,
782                database_infos,
783                mut cdc_table_snapshot_split_assignment,
784            } = runtime_info_snapshot;
785
786            self.sink_manager.reset().await;
787            let term_id = Uuid::new_v4().to_string();
788
789            let mut control_stream_manager = ControlStreamManager::new(self.env.clone());
790            let reset_start_time = Instant::now();
791            let unconnected_worker = control_stream_manager
792                .reset(
793                    active_streaming_nodes.current(),
794                    term_id.clone(),
795                    &*self.context,
796                )
797                .await;
798            info!(elapsed=?reset_start_time.elapsed(), ?unconnected_worker, "control stream reset");
799
800            {
801                let mut builder = FragmentEdgeBuilder::new(database_job_infos.values().flat_map(|info| info.values().flatten()), &control_stream_manager);
802                builder.add_relations(&fragment_relations);
803                let mut edges = builder.build();
804
805                let mut collected_databases = HashMap::new();
806                let mut collecting_databases = HashMap::new();
807                let mut failed_databases = HashSet::new();
808                for (database_id, jobs) in database_job_infos {
809                    let result = control_stream_manager.inject_database_initial_barrier(
810                        database_id,
811                        jobs,
812                        &mut state_table_committed_epochs,
813                        &mut state_table_log_epochs,
814                        &mut edges,
815                        &stream_actors,
816                        &mut source_splits,
817                        &mut background_jobs,
818                        subscription_infos.remove(&database_id).unwrap_or_default(),
819                        is_paused,
820                        &hummock_version_stats,
821                        &mut cdc_table_snapshot_split_assignment,
822                    );
823                    let node_to_collect = match result {
824                        Ok(info) => {
825                            info
826                        }
827                        Err(e) => {
828                            warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
829                            assert!(failed_databases.insert(database_id), "non-duplicate");
830                            continue;
831                        }
832                    };
833                    if !node_to_collect.is_collected() {
834                        assert!(collecting_databases.insert(database_id, node_to_collect).is_none());
835                    } else {
836                        warn!(database_id = database_id.database_id, "database has no node to inject initial barrier");
837                        assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
838                    }
839                }
840                while !collecting_databases.is_empty() {
841                    let (worker_id, result) =
842                        control_stream_manager.next_response().await;
843                    let resp = match result {
844                        Err(e) => {
845                            warn!(worker_id, err = %e.as_report(), "worker node failure during recovery");
846                            for (failed_database_id, _) in collecting_databases.extract_if(|_, node_to_collect| {
847                                !node_to_collect.is_valid_after_worker_err(worker_id)
848                            }) {
849                                warn!(%failed_database_id, worker_id, "database failed to recovery in global recovery due to worker node err");
850                                assert!(failed_databases.insert(failed_database_id));
851                            }
852                            continue;
853                        }
854                        Ok(resp) => {
855                            match resp {
856                                Response::CompleteBarrier(resp) => {
857                                    resp
858                                }
859                                Response::ReportDatabaseFailure(resp) => {
860                                    let database_id = DatabaseId::new(resp.database_id);
861                                    if collecting_databases.remove(&database_id).is_some() {
862                                        warn!(%database_id, worker_id, "database reset during global recovery");
863                                        assert!(failed_databases.insert(database_id));
864                                    } else if collected_databases.remove(&database_id).is_some() {
865                                        warn!(%database_id, worker_id, "database initialized but later reset during global recovery");
866                                        assert!(failed_databases.insert(database_id));
867                                    } else {
868                                        assert!(failed_databases.contains(&database_id));
869                                    }
870                                    continue;
871                                }
872                                other @ (Response::Init(_) | Response::Shutdown(_) | Response::ResetDatabase(_)) => {
873                                    return Err(anyhow!("get unexpected resp {:?}", other).into());
874                                }
875                            }
876                        }
877                    };
878                    assert_eq!(worker_id, resp.worker_id as WorkerId);
879                    let database_id = DatabaseId::new(resp.database_id);
880                    if failed_databases.contains(&database_id) {
881                        assert!(!collecting_databases.contains_key(&database_id));
882                        // ignore the lately arrived collect resp of failed database
883                        continue;
884                    }
885                    let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
886                        unreachable!("should exist")
887                    };
888                    let node_to_collect = entry.get_mut();
889                    node_to_collect.collect_resp(resp);
890                    if node_to_collect.is_collected() {
891                        let node_to_collect = entry.remove();
892                        assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
893                    }
894                }
895                debug!("collected initial barrier");
896                if !stream_actors.is_empty() {
897                    warn!(actor_ids = ?stream_actors.keys().collect_vec(), "unused stream actors in recovery");
898                }
899                if !source_splits.is_empty() {
900                    warn!(actor_ids = ?source_splits.keys().collect_vec(), "unused actor source splits in recovery");
901                }
902                if !background_jobs.is_empty() {
903                    warn!(job_ids = ?background_jobs.keys().collect_vec(), "unused recovered background mview in recovery");
904                }
905                if !subscription_infos.is_empty() {
906                    warn!(?subscription_infos, "unused subscription infos in recovery");
907                }
908                if !state_table_committed_epochs.is_empty() {
909                    warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
910                }
911                if !enable_per_database_isolation && !failed_databases.is_empty() {
912                    return Err(anyhow!(
913                        "global recovery failed due to failure of databases {:?}",
914                        failed_databases.iter().map(|database_id| database_id.database_id).collect_vec()).into()
915                    );
916                }
917                let checkpoint_control = CheckpointControl::recover(
918                    collected_databases,
919                    failed_databases,
920                    &mut control_stream_manager,
921                    hummock_version_stats,
922                    self.env.clone(),
923                );
924
925                let reader = self.env.system_params_reader().await;
926                let checkpoint_frequency = reader.checkpoint_frequency();
927                let barrier_interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
928                let periodic_barriers = PeriodicBarriers::new(
929                    barrier_interval,
930                    checkpoint_frequency,
931                    database_infos,
932                );
933
934                Ok((
935                    active_streaming_nodes,
936                    control_stream_manager,
937                    checkpoint_control,
938                    term_id,
939                    periodic_barriers,
940                ))
941            }
942        }.inspect_err(|err: &MetaError| {
943            tracing::error!(error = %err.as_report(), "recovery failed");
944            event_log_manager_ref.add_event_logs(vec![Event::Recovery(
945                EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
946            )]);
947            GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
948        }))
949            .instrument(tracing::info_span!("recovery_attempt"))
950            .await
951            .expect("Retry until recovery success.");
952
953        let duration = recovery_timer.stop_and_record();
954
955        (
956            self.active_streaming_nodes,
957            self.control_stream_manager,
958            self.checkpoint_control,
959            self.term_id,
960            self.periodic_barriers,
961        ) = new_state;
962
963        tracing::info!("recovery success");
964
965        let recovering_databases = self
966            .checkpoint_control
967            .recovering_databases()
968            .map(|database| database.database_id)
969            .collect_vec();
970        let running_databases = self
971            .checkpoint_control
972            .running_databases()
973            .map(|database| database.database_id)
974            .collect_vec();
975
976        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
977            EventRecovery::global_recovery_success(
978                recovery_reason.clone(),
979                duration as f32,
980                running_databases,
981                recovering_databases,
982            ),
983        )]);
984
985        self.env
986            .notification_manager()
987            .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
988        self.env
989            .notification_manager()
990            .notify_compute_without_version(Operation::Update, Info::Recovery(Recovery {}));
991    }
992}