risingwave_meta/barrier/
worker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::anyhow;
22use arc_swap::ArcSwap;
23use futures::TryFutureExt;
24use itertools::Itertools;
25use risingwave_common::catalog::DatabaseId;
26use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_pb::meta::Recovery;
29use risingwave_pb::meta::subscribe_response::{Info, Operation};
30use risingwave_pb::stream_service::streaming_control_stream_response::Response;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tokio::sync::oneshot::{Receiver, Sender};
34use tokio::task::JoinHandle;
35use tokio::time::Instant;
36use tonic::Status;
37use tracing::{Instrument, debug, error, info, warn};
38use uuid::Uuid;
39
40use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
41use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
42use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
43use crate::barrier::rpc::{ControlStreamManager, merge_node_rpc_errors};
44use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
45use crate::barrier::{
46    BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command,
47    RecoveryReason, schedule,
48};
49use crate::error::MetaErrorInner;
50use crate::hummock::HummockManagerRef;
51use crate::manager::sink_coordination::SinkCoordinatorManager;
52use crate::manager::{
53    ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
54    MetadataManager,
55};
56use crate::rpc::metrics::GLOBAL_META_METRICS;
57use crate::stream::{ScaleControllerRef, SourceManagerRef};
58use crate::{MetaError, MetaResult};
59
60/// [`crate::barrier::worker::GlobalBarrierWorker`] sends barriers to all registered compute nodes and
61/// collect them, with monotonic increasing epoch numbers. On compute nodes, `LocalBarrierManager`
62/// in `risingwave_stream` crate will serve these requests and dispatch them to source actors.
63///
64/// Configuration change in our system is achieved by the mutation in the barrier. Thus,
65/// [`crate::barrier::worker::GlobalBarrierWorker`] provides a set of interfaces like a state machine,
66/// accepting [`crate::barrier::command::Command`] that carries info to build `Mutation`. To keep the consistency between
67/// barrier manager and meta store, some actions like "drop materialized view" or "create mv on mv"
68/// must be done in barrier manager transactional using [`crate::barrier::command::Command`].
69pub(super) struct GlobalBarrierWorker<C> {
70    /// Enable recovery or not when failover.
71    enable_recovery: bool,
72
73    /// The queue of scheduled barriers.
74    periodic_barriers: PeriodicBarriers,
75
76    /// The max barrier nums in flight
77    in_flight_barrier_nums: usize,
78
79    /// Whether per database failure isolation is enabled in system parameters.
80    system_enable_per_database_isolation: bool,
81
82    pub(super) context: Arc<C>,
83
84    env: MetaSrvEnv,
85
86    checkpoint_control: CheckpointControl,
87
88    /// Command that has been collected but is still completing.
89    /// The join handle of the completing future is stored.
90    completing_task: CompletingTask,
91
92    request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
93
94    active_streaming_nodes: ActiveStreamingWorkerNodes,
95
96    sink_manager: SinkCoordinatorManager,
97
98    control_stream_manager: ControlStreamManager,
99
100    term_id: String,
101}
102
103impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
104    pub(super) async fn new_inner(
105        env: MetaSrvEnv,
106        sink_manager: SinkCoordinatorManager,
107        request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
108        context: Arc<C>,
109    ) -> Self {
110        let enable_recovery = env.opts.enable_recovery;
111        let in_flight_barrier_nums = env.opts.in_flight_barrier_nums;
112
113        let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized();
114
115        let control_stream_manager = ControlStreamManager::new(env.clone());
116
117        let reader = env.system_params_reader().await;
118        let system_enable_per_database_isolation = reader.per_database_isolation();
119        // Load config will be performed in bootstrap phase.
120        let periodic_barriers = PeriodicBarriers::default();
121
122        let checkpoint_control = CheckpointControl::new(env.clone());
123        Self {
124            enable_recovery,
125            periodic_barriers,
126            in_flight_barrier_nums,
127            system_enable_per_database_isolation,
128            context,
129            env,
130            checkpoint_control,
131            completing_task: CompletingTask::None,
132            request_rx,
133            active_streaming_nodes,
134            sink_manager,
135            control_stream_manager,
136            term_id: "uninitialized".into(),
137        }
138    }
139}
140
141impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
142    /// Create a new [`crate::barrier::worker::GlobalBarrierWorker`].
143    pub async fn new(
144        scheduled_barriers: schedule::ScheduledBarriers,
145        env: MetaSrvEnv,
146        metadata_manager: MetadataManager,
147        hummock_manager: HummockManagerRef,
148        source_manager: SourceManagerRef,
149        sink_manager: SinkCoordinatorManager,
150        scale_controller: ScaleControllerRef,
151        request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
152    ) -> Self {
153        let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
154
155        let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
156            scheduled_barriers,
157            status,
158            metadata_manager,
159            hummock_manager,
160            source_manager,
161            scale_controller,
162            env.clone(),
163        ));
164
165        Self::new_inner(env, sink_manager, request_rx, context).await
166    }
167
168    pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
169        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
170        let fut = (self.env.await_tree_reg())
171            .register_derived_root("Global Barrier Worker")
172            .instrument(self.run(shutdown_rx));
173        let join_handle = tokio::spawn(fut);
174
175        (join_handle, shutdown_tx)
176    }
177
178    /// Check whether we should pause on bootstrap from the system parameter and reset it.
179    async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
180        let paused = self
181            .env
182            .system_params_reader()
183            .await
184            .pause_on_next_bootstrap();
185        if paused {
186            warn!(
187                "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
188                 It will now be reset to `false`. \
189                 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
190                PAUSE_ON_NEXT_BOOTSTRAP_KEY
191            );
192            self.env
193                .system_params_manager_impl_ref()
194                .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
195                .await?;
196        }
197        Ok(paused)
198    }
199
200    /// Start an infinite loop to take scheduled barriers and send them.
201    async fn run(mut self, shutdown_rx: Receiver<()>) {
202        tracing::info!(
203            "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
204            self.enable_recovery,
205            self.in_flight_barrier_nums,
206        );
207
208        if !self.enable_recovery {
209            let job_exist = self
210                .context
211                .metadata_manager
212                .catalog_controller
213                .has_any_streaming_jobs()
214                .await
215                .unwrap();
216            if job_exist {
217                panic!(
218                    "Some streaming jobs already exist in meta, please start with recovery enabled \
219                or clean up the metadata using `./risedev clean-data`"
220                );
221            }
222        }
223
224        {
225            // Bootstrap recovery. Here we simply trigger a recovery process to achieve the
226            // consistency.
227            // Even if there's no actor to recover, we still go through the recovery process to
228            // inject the first `Initial` barrier.
229            let span = tracing::info_span!("bootstrap_recovery");
230            crate::telemetry::report_event(
231                risingwave_pb::telemetry::TelemetryEventStage::Recovery,
232                "normal_recovery",
233                0,
234                None,
235                None,
236                None,
237            );
238
239            let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
240
241            self.recovery(paused, RecoveryReason::Bootstrap)
242                .instrument(span)
243                .await;
244        }
245
246        self.run_inner(shutdown_rx).await
247    }
248}
249
250impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
251    fn enable_per_database_isolation(&self) -> bool {
252        self.system_enable_per_database_isolation && {
253            if let Err(e) =
254                risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()
255            {
256                warn!(error = %e.as_report(), "DatabaseFailureIsolation disabled by license");
257                false
258            } else {
259                true
260            }
261        }
262    }
263
264    pub(super) async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
265        let (local_notification_tx, mut local_notification_rx) =
266            tokio::sync::mpsc::unbounded_channel();
267        self.env
268            .notification_manager()
269            .insert_local_sender(local_notification_tx)
270            .await;
271
272        // Start the event loop.
273        loop {
274            tokio::select! {
275                biased;
276
277                // Shutdown
278                _ = &mut shutdown_rx => {
279                    tracing::info!("Barrier manager is stopped");
280                    break;
281                }
282
283                request = self.request_rx.recv() => {
284                    if let Some(request) = request {
285                        match request {
286                            BarrierManagerRequest::GetDdlProgress(result_tx) => {
287                                let progress = self.checkpoint_control.gen_ddl_progress();
288                                if result_tx.send(progress).is_err() {
289                                    error!("failed to send get ddl progress");
290                                }
291                            }// Handle adhoc recovery triggered by user.
292                            BarrierManagerRequest::AdhocRecovery(sender) => {
293                                self.adhoc_recovery().await;
294                                if sender.send(()).is_err() {
295                                    warn!("failed to notify finish of adhoc recovery");
296                                }
297                            }
298                            BarrierManagerRequest::UpdateDatabaseBarrier {
299                                database_id,
300                                barrier_interval_ms,
301                                checkpoint_frequency,
302                                sender,
303                            } => {
304                                self.periodic_barriers
305                                    .update_database_barrier(
306                                        database_id,
307                                        barrier_interval_ms,
308                                        checkpoint_frequency,
309                                    );
310                                if sender.send(()).is_err() {
311                                    warn!("failed to notify finish of update database barrier");
312                                }
313                            }
314                        }
315                    } else {
316                        tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
317                        return;
318                    }
319                }
320
321                changed_worker = self.active_streaming_nodes.changed() => {
322                    #[cfg(debug_assertions)]
323                    {
324                        self.active_streaming_nodes.validate_change().await;
325                    }
326
327                    info!(?changed_worker, "worker changed");
328
329                    if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker {
330                        self.control_stream_manager.add_worker(node, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
331                    }
332                }
333
334                notification = local_notification_rx.recv() => {
335                    let notification = notification.unwrap();
336                    if let LocalNotification::SystemParamsChange(p) = notification {
337                        {
338                            self.periodic_barriers.set_sys_barrier_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
339                            self.periodic_barriers
340                                .set_sys_checkpoint_frequency(p.checkpoint_frequency());
341                            self.system_enable_per_database_isolation = p.per_database_isolation();
342                        }
343                    }
344                }
345                complete_result = self
346                    .completing_task
347                    .next_completed_barrier(
348                        &mut self.periodic_barriers,
349                        &mut self.checkpoint_control,
350                        &mut self.control_stream_manager,
351                        &self.context,
352                        &self.env,
353                ) => {
354                    match complete_result {
355                        Ok(output) => {
356                            self.checkpoint_control.ack_completed(output);
357                        }
358                        Err(e) => {
359                            self.failure_recovery(e).await;
360                        }
361                    }
362                },
363                event = self.checkpoint_control.next_event() => {
364                    let result: MetaResult<()> = try {
365                        match event {
366                            CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
367                                let database_id = entering_initializing.database_id();
368                                let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
369                                    resp.root_err.as_ref().map(|root_err| {
370                                        (*worker_id, ScoredError {
371                                            error: Status::internal(&root_err.err_msg),
372                                            score: Score(root_err.score)
373                                        })
374                                    })
375                                }));
376                                Self::report_collect_failure(&self.env, &error);
377                                self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
378                                match self.context.reload_database_runtime_info(database_id).await? { Some(runtime_info) => {
379                                    runtime_info.validate(database_id, &self.active_streaming_nodes).inspect_err(|e| {
380                                        warn!(database_id = database_id.database_id, err = ?e.as_report(), ?runtime_info, "reloaded database runtime info failed to validate");
381                                    })?;
382                                    let workers = InflightFragmentInfo::workers(runtime_info.job_infos.values().flat_map(|job| job.fragment_infos()));
383                                    for worker_id in workers {
384                                        if !self.control_stream_manager.is_connected(worker_id) {
385                                            self.control_stream_manager.try_reconnect_worker(worker_id, entering_initializing.control().inflight_infos(), self.term_id.clone(), &*self.context).await;
386                                        }
387                                    }
388                                    entering_initializing.enter(runtime_info, &mut self.control_stream_manager);
389                                } _ => {
390                                    info!(database_id = database_id.database_id, "database removed after reloading empty runtime info");
391                                    // mark ready to unblock subsequent request
392                                    self.context.mark_ready(MarkReadyOptions::Database(database_id));
393                                    entering_initializing.remove();
394                                }}
395                            }
396                            CheckpointControlEvent::EnteringRunning(entering_running) => {
397                                self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
398                                entering_running.enter();
399                            }
400                        }
401                    };
402                    if let Err(e) = result {
403                        self.failure_recovery(e).await;
404                    }
405                }
406                (worker_id, resp_result) = self.control_stream_manager.next_response() => {
407                    let result: MetaResult<()> = try {
408                        let resp = match resp_result {
409                            Err(err) => {
410                                let failed_databases = self.checkpoint_control.databases_failed_at_worker_err(worker_id);
411                                if !failed_databases.is_empty() {
412                                    if !self.enable_recovery {
413                                        panic!("control stream to worker {} failed but recovery not enabled: {:?}", worker_id, err.as_report());
414                                    }
415                                    if !self.enable_per_database_isolation() {
416                                        Err(err.clone())?;
417                                    }
418                                    Self::report_collect_failure(&self.env, &err);
419                                    for database_id in failed_databases {
420                                        if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
421                                            warn!(worker_id, database_id = database_id.database_id, "database entering recovery on node failure");
422                                            self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
423                                            self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
424                                            // TODO: add log on blocking time
425                                            let output = self.completing_task.wait_completing_task().await?;
426                                            entering_recovery.enter(output, &mut self.control_stream_manager);
427                                        }
428                                    }
429                                }  else {
430                                    warn!(worker_id, "no barrier to collect from worker, ignore err");
431                                }
432                                continue;
433                            }
434                            Ok(resp) => resp,
435                        };
436                        match resp {
437                            Response::CompleteBarrier(resp) => {
438                                self.checkpoint_control.barrier_collected(resp, &mut self.periodic_barriers)?;
439                            },
440                            Response::ReportDatabaseFailure(resp) => {
441                                if !self.enable_recovery {
442                                    panic!("database failure reported but recovery not enabled: {:?}", resp)
443                                }
444                                if !self.enable_per_database_isolation() {
445                                        Err(anyhow!("database {} reset", resp.database_id))?;
446                                    }
447                                let database_id = DatabaseId::new(resp.database_id);
448                                if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
449                                    warn!(database_id = database_id.database_id, "database entering recovery");
450                                    self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
451                                    // TODO: add log on blocking time
452                                    let output = self.completing_task.wait_completing_task().await?;
453                                    entering_recovery.enter(output, &mut self.control_stream_manager);
454                                }
455                            }
456                            Response::ResetDatabase(resp) => {
457                                self.checkpoint_control.on_reset_database_resp(worker_id, resp);
458                            }
459                            other @ Response::Init(_) | other @ Response::Shutdown(_) => {
460                                Err(anyhow!("get expected response: {:?}", other))?;
461                            }
462                        }
463                    };
464                    if let Err(e) = result {
465                        self.failure_recovery(e).await;
466                    }
467                }
468                new_barrier = self.periodic_barriers.next_barrier(&*self.context),
469                    if self
470                        .checkpoint_control
471                        .can_inject_barrier(self.in_flight_barrier_nums) => {
472                    if let Some((Command::CreateStreamingJob { info, .. }, _)) = &new_barrier.command {
473                        let worker_ids: HashSet<_> =
474                            info.stream_job_fragments.inner
475                            .actors_to_create()
476                            .flat_map(|(_, _, actors)|
477                                actors.map(|(_, worker_id)| worker_id)
478                            )
479                            .collect();
480                        for worker_id in worker_ids {
481                            if !self.control_stream_manager.is_connected(worker_id) {
482                                self.control_stream_manager.try_reconnect_worker(worker_id, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
483                            }
484                        }
485                    }
486                    let database_id = new_barrier.database_id;
487                    if let Err(e) = self.checkpoint_control.handle_new_barrier(new_barrier, &mut self.control_stream_manager) {
488                        if !self.enable_recovery {
489                            panic!(
490                                "failed to inject barrier to some databases but recovery not enabled: {:?}", (
491                                    database_id,
492                                    e.as_report()
493                                )
494                            );
495                        }
496                        let result: MetaResult<_> = try {
497                            if !self.enable_per_database_isolation() {
498                                let err = anyhow!("failed to inject barrier to databases: {:?}", (database_id, e.as_report()));
499                                Err(err)?;
500                            } else if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
501                                warn!(%database_id, e = %e.as_report(),"database entering recovery on inject failure");
502                                self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(e).context("inject barrier failure").into()));
503                                // TODO: add log on blocking time
504                                let output = self.completing_task.wait_completing_task().await?;
505                                entering_recovery.enter(output, &mut self.control_stream_manager);
506                            }
507                        };
508                        if let Err(e) = result {
509                            self.failure_recovery(e).await;
510                        }
511                    }
512                }
513            }
514            self.checkpoint_control.update_barrier_nums_metrics();
515        }
516    }
517}
518
519impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
520    /// We need to make sure there are no changes when doing recovery
521    pub async fn clear_on_err(&mut self, err: &MetaError) {
522        // join spawned completing command to finish no matter it succeeds or not.
523        let is_err = match replace(&mut self.completing_task, CompletingTask::None) {
524            CompletingTask::None => false,
525            CompletingTask::Completing {
526                epochs_to_ack,
527                join_handle,
528                ..
529            } => {
530                info!("waiting for completing command to finish in recovery");
531                match join_handle.await {
532                    Err(e) => {
533                        warn!(err = %e.as_report(), "failed to join completing task");
534                        true
535                    }
536                    Ok(Err(e)) => {
537                        warn!(
538                            err = %e.as_report(),
539                            "failed to complete barrier during clear"
540                        );
541                        true
542                    }
543                    Ok(Ok(hummock_version_stats)) => {
544                        self.checkpoint_control
545                            .ack_completed(BarrierCompleteOutput {
546                                epochs_to_ack,
547                                hummock_version_stats,
548                            });
549                        false
550                    }
551                }
552            }
553            CompletingTask::Err(_) => true,
554        };
555        if !is_err {
556            // continue to finish the pending collected barrier.
557            while let Some(task) = self.checkpoint_control.next_complete_barrier_task(None) {
558                let epochs_to_ack = task.epochs_to_ack();
559                match task
560                    .complete_barrier(&*self.context, self.env.clone())
561                    .await
562                {
563                    Ok(hummock_version_stats) => {
564                        self.checkpoint_control
565                            .ack_completed(BarrierCompleteOutput {
566                                epochs_to_ack,
567                                hummock_version_stats,
568                            });
569                    }
570                    Err(e) => {
571                        error!(
572                            err = %e.as_report(),
573                            "failed to complete barrier during recovery"
574                        );
575                        break;
576                    }
577                }
578            }
579        }
580        self.checkpoint_control.clear_on_err(err);
581    }
582}
583
584impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
585    /// Set barrier manager status.
586    async fn failure_recovery(&mut self, err: MetaError) {
587        self.clear_on_err(&err).await;
588
589        if self.enable_recovery {
590            let span = tracing::info_span!(
591                "failure_recovery",
592                error = %err.as_report(),
593            );
594
595            crate::telemetry::report_event(
596                risingwave_pb::telemetry::TelemetryEventStage::Recovery,
597                "failure_recovery",
598                0,
599                None,
600                None,
601                None,
602            );
603
604            let reason = RecoveryReason::Failover(err);
605
606            // No need to clean dirty tables for barrier recovery,
607            // The foreground stream job should cleanup their own tables.
608            self.recovery(false, reason).instrument(span).await;
609        } else {
610            panic!(
611                "a streaming error occurred while recovery is disabled, aborting: {:?}",
612                err.as_report()
613            );
614        }
615    }
616
617    async fn adhoc_recovery(&mut self) {
618        let err = MetaErrorInner::AdhocRecovery.into();
619        self.clear_on_err(&err).await;
620
621        let span = tracing::info_span!(
622            "adhoc_recovery",
623            error = %err.as_report(),
624        );
625
626        crate::telemetry::report_event(
627            risingwave_pb::telemetry::TelemetryEventStage::Recovery,
628            "adhoc_recovery",
629            0,
630            None,
631            None,
632            None,
633        );
634
635        // No need to clean dirty tables for barrier recovery,
636        // The foreground stream job should cleanup their own tables.
637        self.recovery(false, RecoveryReason::Adhoc)
638            .instrument(span)
639            .await;
640    }
641}
642
643impl<C> GlobalBarrierWorker<C> {
644    /// Send barrier-complete-rpc and wait for responses from all CNs
645    pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
646        // Record failure in event log.
647        use risingwave_pb::meta::event_log;
648        let event = event_log::EventCollectBarrierFail {
649            error: error.to_report_string(),
650        };
651        env.event_log_manager_ref()
652            .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
653    }
654}
655
656mod retry_strategy {
657    use std::time::Duration;
658
659    use tokio_retry::strategy::{ExponentialBackoff, jitter};
660
661    // Retry base interval in milliseconds.
662    const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
663    // Retry max interval.
664    const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
665
666    // MrCroxx: Use concrete type here to prevent unsolved compiler issue.
667    // Feel free to replace the concrete type with TAIT after fixed.
668
669    // mod retry_backoff_future {
670    //     use std::future::Future;
671    //     use std::time::Duration;
672    //
673    //     use tokio::time::sleep;
674    //
675    //     pub(crate) type RetryBackoffFuture = impl Future<Output = ()> + Unpin + Send + 'static;
676    //
677    //     #[define_opaque(RetryBackoffFuture)]
678    //     pub(super) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
679    //         Box::pin(sleep(duration))
680    //     }
681    // }
682    // pub(crate) use retry_backoff_future::*;
683
684    pub(crate) type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
685
686    pub(crate) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
687        Box::pin(tokio::time::sleep(duration))
688    }
689
690    pub(crate) type RetryBackoffStrategy =
691        impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
692
693    /// Initialize a retry strategy for operation in recovery.
694    #[inline(always)]
695    pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> + Send + 'static {
696        ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
697            .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
698            .map(jitter)
699    }
700
701    #[define_opaque(RetryBackoffStrategy)]
702    pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
703        get_retry_strategy().map(get_retry_backoff_future)
704    }
705}
706
707pub(crate) use retry_strategy::*;
708use risingwave_common::error::tonic::extra::{Score, ScoredError};
709use risingwave_meta_model::WorkerId;
710use risingwave_pb::meta::event_log::{Event, EventRecovery};
711
712use crate::barrier::edge_builder::FragmentEdgeBuilder;
713use crate::controller::fragment::InflightFragmentInfo;
714
715impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
716    /// Recovery the whole cluster from the latest epoch.
717    ///
718    /// If `paused_reason` is `Some`, all data sources (including connectors and DMLs) will be
719    /// immediately paused after recovery, until the user manually resume them either by restarting
720    /// the cluster or `risectl` command. Used for debugging purpose.
721    ///
722    /// Returns the new state of the barrier manager after recovery.
723    pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
724        // Clear all control streams to release resources (connections to compute nodes) first.
725        self.control_stream_manager.clear();
726
727        let reason_str = match &recovery_reason {
728            RecoveryReason::Bootstrap => "bootstrap".to_owned(),
729            RecoveryReason::Failover(err) => {
730                format!("failed over: {}", err.as_report())
731            }
732            RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
733        };
734        self.context.abort_and_mark_blocked(None, recovery_reason);
735
736        self.recovery_inner(is_paused, reason_str).await;
737        self.context.mark_ready(MarkReadyOptions::Global {
738            blocked_databases: self.checkpoint_control.recovering_databases().collect(),
739        });
740    }
741
742    #[await_tree::instrument("recovery({recovery_reason})")]
743    async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
744        let event_log_manager_ref = self.env.event_log_manager_ref();
745
746        tracing::info!("recovery start!");
747        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
748            EventRecovery::global_recovery_start(recovery_reason.clone()),
749        )]);
750
751        let retry_strategy = get_retry_strategy();
752
753        // We take retry into consideration because this is the latency user sees for a cluster to
754        // get recovered.
755        let recovery_timer = GLOBAL_META_METRICS
756            .recovery_latency
757            .with_label_values(&["global"])
758            .start_timer();
759
760        let enable_per_database_isolation = self.enable_per_database_isolation();
761
762        let new_state = tokio_retry::Retry::spawn(retry_strategy, || async {
763            self.env.stream_client_pool().invalidate_all();
764            // We need to notify_creating_job_failed in every recovery retry, because in outer create_streaming_job handler,
765            // it holds the reschedule_read_lock and wait for creating job to finish, and caused the following scale_actor fail
766            // to acquire the reschedule_write_lock, and then keep recovering, and then deadlock.
767            // TODO: refactor and fix this hacky implementation.
768            self.context
769                .notify_creating_job_failed(None, recovery_reason.clone())
770                .await;
771            let runtime_info_snapshot = self
772                .context
773                .reload_runtime_info()
774                .await?;
775            runtime_info_snapshot.validate().inspect_err(|e| {
776                warn!(err = ?e.as_report(), ?runtime_info_snapshot, "reloaded runtime info failed to validate");
777            })?;
778            let BarrierWorkerRuntimeInfoSnapshot {
779                active_streaming_nodes,
780                database_job_infos,
781                mut state_table_committed_epochs,
782                mut state_table_log_epochs,
783                mut subscription_infos,
784                stream_actors,
785                fragment_relations,
786                mut source_splits,
787                mut background_jobs,
788                hummock_version_stats,
789                database_infos,
790            } = runtime_info_snapshot;
791
792            self.sink_manager.reset().await;
793            let term_id = Uuid::new_v4().to_string();
794
795            let mut control_stream_manager = ControlStreamManager::new(self.env.clone());
796            let reset_start_time = Instant::now();
797            let unconnected_worker = control_stream_manager
798                .reset(
799                    active_streaming_nodes.current(),
800                    term_id.clone(),
801                    &*self.context,
802                )
803                .await;
804            info!(elapsed=?reset_start_time.elapsed(), ?unconnected_worker, "control stream reset");
805
806            {
807                let mut builder = FragmentEdgeBuilder::new(database_job_infos.values().flat_map(|info| info.values().flatten()), &control_stream_manager);
808                builder.add_relations(&fragment_relations);
809                let mut edges = builder.build();
810
811                let mut collected_databases = HashMap::new();
812                let mut collecting_databases = HashMap::new();
813                let mut failed_databases = HashSet::new();
814                for (database_id, jobs) in database_job_infos {
815                    let result = control_stream_manager.inject_database_initial_barrier(
816                        database_id,
817                        jobs,
818                        &mut state_table_committed_epochs,
819                        &mut state_table_log_epochs,
820                        &mut edges,
821                        &stream_actors,
822                        &mut source_splits,
823                        &mut background_jobs,
824                        subscription_infos.remove(&database_id).unwrap_or_default(),
825                        is_paused,
826                        &hummock_version_stats,
827                    );
828                    let node_to_collect = match result {
829                        Ok(info) => {
830                            info
831                        }
832                        Err(e) => {
833                            warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
834                            assert!(failed_databases.insert(database_id), "non-duplicate");
835                            continue;
836                        }
837                    };
838                    if !node_to_collect.is_collected() {
839                        assert!(collecting_databases.insert(database_id, node_to_collect).is_none());
840                    } else {
841                        warn!(database_id = database_id.database_id, "database has no node to inject initial barrier");
842                        assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
843                    }
844                }
845                while !collecting_databases.is_empty() {
846                    let (worker_id, result) =
847                        control_stream_manager.next_response().await;
848                    let resp = match result {
849                        Err(e) => {
850                            warn!(worker_id, err = %e.as_report(), "worker node failure during recovery");
851                            for (failed_database_id,_ ) in collecting_databases.extract_if(|_, node_to_collect| {
852                                !node_to_collect.is_valid_after_worker_err(worker_id)
853                            }) {
854                                warn!(%failed_database_id, worker_id, "database failed to recovery in global recovery due to worker node err");
855                                assert!(failed_databases.insert(failed_database_id));
856                            }
857                            continue;
858                        }
859                        Ok(resp) => {
860                            match resp {
861                                Response::CompleteBarrier(resp) => {
862                                    resp
863                                }
864                                Response::ReportDatabaseFailure(resp) => {
865                                    let database_id = DatabaseId::new(resp.database_id);
866                                    if collecting_databases.remove(&database_id).is_some() {
867                                        warn!(%database_id, worker_id, "database reset during global recovery");
868                                        assert!(failed_databases.insert(database_id));
869                                    } else if collected_databases.remove(&database_id).is_some() {
870                                        warn!(%database_id, worker_id, "database initialized but later reset during global recovery");
871                                        assert!(failed_databases.insert(database_id));
872                                    } else {
873                                        assert!(failed_databases.contains(&database_id));
874                                    }
875                                    continue;
876                                }
877                                other @ (Response::Init(_) | Response::Shutdown(_) | Response::ResetDatabase(_))=> {
878                                    return Err(anyhow!("get unexpected resp {:?}", other).into());
879                                }
880                            }
881                        }
882                    };
883                    assert_eq!(worker_id, resp.worker_id as WorkerId);
884                    let database_id = DatabaseId::new(resp.database_id);
885                    if failed_databases.contains(&database_id) {
886                        assert!(!collecting_databases.contains_key(&database_id));
887                        // ignore the lately arrived collect resp of failed database
888                        continue;
889                    }
890                    let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
891                        unreachable!("should exist")
892                    };
893                    let node_to_collect = entry.get_mut();
894                    node_to_collect.collect_resp(resp);
895                    if node_to_collect.is_collected() {
896                        let node_to_collect = entry.remove();
897                        assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
898                    }
899                }
900                debug!("collected initial barrier");
901                if !stream_actors.is_empty() {
902                    warn!(actor_ids = ?stream_actors.keys().collect_vec(), "unused stream actors in recovery");
903                }
904                if !source_splits.is_empty() {
905                    warn!(actor_ids = ?source_splits.keys().collect_vec(), "unused actor source splits in recovery");
906                }
907                if !background_jobs.is_empty() {
908                    warn!(job_ids = ?background_jobs.keys().collect_vec(), "unused recovered background mview in recovery");
909                }
910                if !subscription_infos.is_empty() {
911                    warn!(?subscription_infos, "unused subscription infos in recovery");
912                }
913                if !state_table_committed_epochs.is_empty() {
914                    warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
915                }
916                if !enable_per_database_isolation && !failed_databases.is_empty() {
917                    return Err(anyhow!(
918                        "global recovery failed due to failure of databases {:?}",
919                        failed_databases.iter().map(|database_id| database_id.database_id).collect_vec()).into()
920                    );
921                }
922                let checkpoint_control = CheckpointControl::recover(
923                    collected_databases,
924                    failed_databases,
925                    &mut control_stream_manager,
926                    hummock_version_stats,
927                    self.env.clone(),
928                );
929
930                let reader = self.env.system_params_reader().await;
931                let checkpoint_frequency = reader.checkpoint_frequency();
932                let barrier_interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
933                let periodic_barriers = PeriodicBarriers::new(
934                    barrier_interval,
935                    checkpoint_frequency,
936                    database_infos,
937                );
938
939                Ok((
940                    active_streaming_nodes,
941                    control_stream_manager,
942                    checkpoint_control,
943                    term_id,
944                    periodic_barriers,
945                ))
946            }
947        }.inspect_err(|err: &MetaError| {
948            tracing::error!(error = %err.as_report(), "recovery failed");
949            event_log_manager_ref.add_event_logs(vec![Event::Recovery(
950                EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
951            )]);
952            GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
953        }))
954            .instrument(tracing::info_span!("recovery_attempt"))
955            .await
956            .expect("Retry until recovery success.");
957
958        let duration = recovery_timer.stop_and_record();
959
960        (
961            self.active_streaming_nodes,
962            self.control_stream_manager,
963            self.checkpoint_control,
964            self.term_id,
965            self.periodic_barriers,
966        ) = new_state;
967
968        tracing::info!("recovery success");
969
970        let recovering_databases = self
971            .checkpoint_control
972            .recovering_databases()
973            .map(|database| database.database_id)
974            .collect_vec();
975        let running_databases = self
976            .checkpoint_control
977            .running_databases()
978            .map(|database| database.database_id)
979            .collect_vec();
980
981        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
982            EventRecovery::global_recovery_success(
983                recovery_reason.clone(),
984                duration as f32,
985                running_databases,
986                recovering_databases,
987            ),
988        )]);
989
990        self.env
991            .notification_manager()
992            .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
993        self.env
994            .notification_manager()
995            .notify_compute_without_version(Operation::Update, Info::Recovery(Recovery {}));
996    }
997}