1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::anyhow;
22use arc_swap::ArcSwap;
23use futures::TryFutureExt;
24use itertools::Itertools;
25use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_pb::meta::Recovery;
28use risingwave_pb::meta::subscribe_response::{Info, Operation};
29use risingwave_pb::stream_service::streaming_control_stream_response::Response;
30use thiserror_ext::AsReport;
31use tokio::sync::mpsc;
32use tokio::sync::oneshot::{Receiver, Sender};
33use tokio::task::JoinHandle;
34use tonic::Status;
35use tracing::{Instrument, debug, error, info, warn};
36use uuid::Uuid;
37
38use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
39use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
40use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
41use crate::barrier::rpc::{ControlStreamManager, WorkerNodeEvent, merge_node_rpc_errors};
42use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
43use crate::barrier::{
44 BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, RecoveryReason,
45 schedule,
46};
47use crate::error::MetaErrorInner;
48use crate::hummock::HummockManagerRef;
49use crate::manager::sink_coordination::SinkCoordinatorManager;
50use crate::manager::{
51 ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
52 MetadataManager,
53};
54use crate::rpc::metrics::GLOBAL_META_METRICS;
55use crate::stream::{GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef};
56use crate::{MetaError, MetaResult};
57
58pub(super) struct GlobalBarrierWorker<C> {
68 enable_recovery: bool,
70
71 periodic_barriers: PeriodicBarriers,
73
74 system_enable_per_database_isolation: bool,
76
77 pub(super) context: Arc<C>,
78
79 env: MetaSrvEnv,
80
81 checkpoint_control: CheckpointControl,
82
83 completing_task: CompletingTask,
86
87 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
88
89 active_streaming_nodes: ActiveStreamingWorkerNodes,
90
91 sink_manager: SinkCoordinatorManager,
92
93 control_stream_manager: ControlStreamManager,
94
95 term_id: String,
96}
97
98impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
99 pub(super) async fn new_inner(
100 env: MetaSrvEnv,
101 sink_manager: SinkCoordinatorManager,
102 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
103 context: Arc<C>,
104 ) -> Self {
105 let enable_recovery = env.opts.enable_recovery;
106
107 let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized();
108
109 let control_stream_manager = ControlStreamManager::new(env.clone());
110
111 let reader = env.system_params_reader().await;
112 let system_enable_per_database_isolation = reader.per_database_isolation();
113 let periodic_barriers = PeriodicBarriers::default();
115
116 let checkpoint_control = CheckpointControl::new(env.clone());
117 Self {
118 enable_recovery,
119 periodic_barriers,
120 system_enable_per_database_isolation,
121 context,
122 env,
123 checkpoint_control,
124 completing_task: CompletingTask::None,
125 request_rx,
126 active_streaming_nodes,
127 sink_manager,
128 control_stream_manager,
129 term_id: "uninitialized".into(),
130 }
131 }
132}
133
134impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
135 pub async fn new(
137 scheduled_barriers: schedule::ScheduledBarriers,
138 env: MetaSrvEnv,
139 metadata_manager: MetadataManager,
140 hummock_manager: HummockManagerRef,
141 source_manager: SourceManagerRef,
142 sink_manager: SinkCoordinatorManager,
143 scale_controller: ScaleControllerRef,
144 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
145 barrier_scheduler: schedule::BarrierScheduler,
146 refresh_manager: GlobalRefreshManagerRef,
147 ) -> Self {
148 let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
149
150 let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
151 scheduled_barriers,
152 status,
153 metadata_manager,
154 hummock_manager,
155 source_manager,
156 scale_controller,
157 env.clone(),
158 barrier_scheduler,
159 refresh_manager,
160 ));
161
162 Self::new_inner(env, sink_manager, request_rx, context).await
163 }
164
165 pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
166 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
167 let fut = (self.env.await_tree_reg())
168 .register_derived_root("Global Barrier Worker")
169 .instrument(self.run(shutdown_rx));
170 let join_handle = tokio::spawn(fut);
171
172 (join_handle, shutdown_tx)
173 }
174
175 async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
177 let paused = self
178 .env
179 .system_params_reader()
180 .await
181 .pause_on_next_bootstrap();
182 if paused {
183 warn!(
184 "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
185 It will now be reset to `false`. \
186 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
187 PAUSE_ON_NEXT_BOOTSTRAP_KEY
188 );
189 self.env
190 .system_params_manager_impl_ref()
191 .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
192 .await?;
193 }
194 Ok(paused)
195 }
196
197 async fn run(mut self, shutdown_rx: Receiver<()>) {
199 tracing::info!(
200 "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
201 self.enable_recovery,
202 self.checkpoint_control.in_flight_barrier_nums,
203 );
204
205 if !self.enable_recovery {
206 let job_exist = self
207 .context
208 .metadata_manager
209 .catalog_controller
210 .has_any_streaming_jobs()
211 .await
212 .unwrap();
213 if job_exist {
214 panic!(
215 "Some streaming jobs already exist in meta, please start with recovery enabled \
216 or clean up the metadata using `./risedev clean-data`"
217 );
218 }
219 }
220
221 {
222 let span = tracing::info_span!("bootstrap_recovery");
227 crate::telemetry::report_event(
228 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
229 "normal_recovery",
230 0,
231 None,
232 None,
233 None,
234 );
235
236 let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
237
238 self.recovery(paused, RecoveryReason::Bootstrap)
239 .instrument(span)
240 .await;
241 }
242
243 self.run_inner(shutdown_rx).await
244 }
245}
246
247impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
248 fn enable_per_database_isolation(&self) -> bool {
249 self.system_enable_per_database_isolation && {
250 if let Err(e) =
251 risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()
252 {
253 warn!(error = %e.as_report(), "DatabaseFailureIsolation disabled by license");
254 false
255 } else {
256 true
257 }
258 }
259 }
260
261 pub(super) async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
262 let (local_notification_tx, mut local_notification_rx) =
263 tokio::sync::mpsc::unbounded_channel();
264 self.env
265 .notification_manager()
266 .insert_local_sender(local_notification_tx);
267
268 loop {
270 tokio::select! {
271 biased;
272
273 _ = &mut shutdown_rx => {
275 tracing::info!("Barrier manager is stopped");
276 break;
277 }
278
279 request = self.request_rx.recv() => {
280 if let Some(request) = request {
281 match request {
282 BarrierManagerRequest::GetDdlProgress(result_tx) => {
283 let progress = self.checkpoint_control.gen_ddl_progress();
284 if result_tx.send(progress).is_err() {
285 error!("failed to send get ddl progress");
286 }
287 }BarrierManagerRequest::AdhocRecovery(sender) => {
289 self.adhoc_recovery().await;
290 if sender.send(()).is_err() {
291 warn!("failed to notify finish of adhoc recovery");
292 }
293 }
294 BarrierManagerRequest::UpdateDatabaseBarrier {
295 database_id,
296 barrier_interval_ms,
297 checkpoint_frequency,
298 sender,
299 } => {
300 self.periodic_barriers
301 .update_database_barrier(
302 database_id,
303 barrier_interval_ms,
304 checkpoint_frequency,
305 );
306 if sender.send(()).is_err() {
307 warn!("failed to notify finish of update database barrier");
308 }
309 }
310 }
311 } else {
312 tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
313 return;
314 }
315 }
316
317 changed_worker = self.active_streaming_nodes.changed() => {
318 #[cfg(debug_assertions)]
319 {
320 self.active_streaming_nodes.validate_change().await;
321 }
322
323 info!(?changed_worker, "worker changed");
324
325 match changed_worker {
326 ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) => {
327 self.control_stream_manager.add_worker(node, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
328 }
329 ActiveStreamingWorkerChange::Remove(node) => {
330 self.control_stream_manager.remove_worker(node);
331 }
332 }
333 }
334
335 notification = local_notification_rx.recv() => {
336 let notification = notification.unwrap();
337 if let LocalNotification::SystemParamsChange(p) = notification {
338 {
339 self.periodic_barriers.set_sys_barrier_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
340 self.periodic_barriers
341 .set_sys_checkpoint_frequency(p.checkpoint_frequency());
342 self.system_enable_per_database_isolation = p.per_database_isolation();
343 }
344 }
345 }
346 complete_result = self
347 .completing_task
348 .next_completed_barrier(
349 &mut self.periodic_barriers,
350 &mut self.checkpoint_control,
351 &mut self.control_stream_manager,
352 &self.context,
353 &self.env,
354 ) => {
355 match complete_result {
356 Ok(output) => {
357 self.checkpoint_control.ack_completed(output);
358 }
359 Err(e) => {
360 self.failure_recovery(e).await;
361 }
362 }
363 },
364 event = self.checkpoint_control.next_event() => {
365 let result: MetaResult<()> = try {
366 match event {
367 CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
368 let database_id = entering_initializing.database_id();
369 let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
370 resp.root_err.as_ref().map(|root_err| {
371 (*worker_id, ScoredError {
372 error: Status::internal(&root_err.err_msg),
373 score: Score(root_err.score)
374 })
375 })
376 }));
377 Self::report_collect_failure(&self.env, &error);
378 self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
379 match self.context.reload_database_runtime_info(database_id).await? { Some(runtime_info) => {
380 runtime_info.validate(database_id, &self.active_streaming_nodes).inspect_err(|e| {
381 warn!(%database_id, err = ?e.as_report(), ?runtime_info, "reloaded database runtime info failed to validate");
382 })?;
383 entering_initializing.enter(runtime_info, &mut self.control_stream_manager);
384 } _ => {
385 info!(%database_id, "database removed after reloading empty runtime info");
386 self.context.mark_ready(MarkReadyOptions::Database(database_id));
388 entering_initializing.remove();
389 }}
390 }
391 CheckpointControlEvent::EnteringRunning(entering_running) => {
392 self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
393 entering_running.enter();
394 }
395 }
396 };
397 if let Err(e) = result {
398 self.failure_recovery(e).await;
399 }
400 }
401 (worker_id, event) = self.control_stream_manager.next_event(&self.term_id, &self.context) => {
402 let resp_result = match event {
403 WorkerNodeEvent::Response(result) => {
404 result
405 }
406 WorkerNodeEvent::Connected(connected) => {
407 connected.initialize(self.checkpoint_control.inflight_infos());
408 continue;
409 }
410 };
411 let result: MetaResult<()> = try {
412 let resp = match resp_result {
413 Err(err) => {
414 let failed_databases = self.checkpoint_control.databases_failed_at_worker_err(worker_id);
415 if !failed_databases.is_empty() {
416 if !self.enable_recovery {
417 panic!("control stream to worker {} failed but recovery not enabled: {:?}", worker_id, err.as_report());
418 }
419 if !self.enable_per_database_isolation() {
420 Err(err.clone())?;
421 }
422 Self::report_collect_failure(&self.env, &err);
423 for database_id in failed_databases {
424 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
425 warn!(%worker_id, %database_id, "database entering recovery on node failure");
426 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
427 self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
428 let output = self.completing_task.wait_completing_task().await?;
430 entering_recovery.enter(output, &mut self.control_stream_manager);
431 }
432 }
433 } else {
434 warn!(%worker_id, "no barrier to collect from worker, ignore err");
435 }
436 continue;
437 }
438 Ok(resp) => resp,
439 };
440 match resp {
441 Response::CompleteBarrier(resp) => {
442 self.checkpoint_control.barrier_collected(resp, &mut self.periodic_barriers)?;
443 },
444 Response::ReportDatabaseFailure(resp) => {
445 if !self.enable_recovery {
446 panic!("database failure reported but recovery not enabled: {:?}", resp)
447 }
448 if !self.enable_per_database_isolation() {
449 Err(anyhow!("database {} reset", resp.database_id))?;
450 }
451 let database_id = resp.database_id;
452 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
453 warn!(%database_id, "database entering recovery");
454 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
455 let output = self.completing_task.wait_completing_task().await?;
457 entering_recovery.enter(output, &mut self.control_stream_manager);
458 }
459 }
460 Response::ResetDatabase(resp) => {
461 self.checkpoint_control.on_reset_database_resp(worker_id, resp);
462 }
463 other @ Response::Init(_) | other @ Response::Shutdown(_) => {
464 Err(anyhow!("get expected response: {:?}", other))?;
465 }
466 }
467 };
468 if let Err(e) = result {
469 self.failure_recovery(e).await;
470 }
471 }
472 new_barrier = self.periodic_barriers.next_barrier(&*self.context) => {
473 let database_id = new_barrier.database_id;
474 if let Err(e) = self.checkpoint_control.handle_new_barrier(new_barrier, &mut self.control_stream_manager) {
475 if !self.enable_recovery {
476 panic!(
477 "failed to inject barrier to some databases but recovery not enabled: {:?}", (
478 database_id,
479 e.as_report()
480 )
481 );
482 }
483 let result: MetaResult<_> = try {
484 if !self.enable_per_database_isolation() {
485 let err = anyhow!("failed to inject barrier to databases: {:?}", (database_id, e.as_report()));
486 Err(err)?;
487 } else if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
488 warn!(%database_id, e = %e.as_report(),"database entering recovery on inject failure");
489 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(e).context("inject barrier failure").into()));
490 let output = self.completing_task.wait_completing_task().await?;
492 entering_recovery.enter(output, &mut self.control_stream_manager);
493 }
494 };
495 if let Err(e) = result {
496 self.failure_recovery(e).await;
497 }
498 }
499 }
500 }
501 self.checkpoint_control.update_barrier_nums_metrics();
502 }
503 }
504}
505
506impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
507 pub async fn clear_on_err(&mut self, err: &MetaError) {
509 let is_err = match replace(&mut self.completing_task, CompletingTask::None) {
511 CompletingTask::None => false,
512 CompletingTask::Completing {
513 epochs_to_ack,
514 join_handle,
515 ..
516 } => {
517 info!("waiting for completing command to finish in recovery");
518 match join_handle.await {
519 Err(e) => {
520 warn!(err = %e.as_report(), "failed to join completing task");
521 true
522 }
523 Ok(Err(e)) => {
524 warn!(
525 err = %e.as_report(),
526 "failed to complete barrier during clear"
527 );
528 true
529 }
530 Ok(Ok(hummock_version_stats)) => {
531 self.checkpoint_control
532 .ack_completed(BarrierCompleteOutput {
533 epochs_to_ack,
534 hummock_version_stats,
535 });
536 false
537 }
538 }
539 }
540 CompletingTask::Err(_) => true,
541 };
542 if !is_err {
543 while let Some(task) = self.checkpoint_control.next_complete_barrier_task(None) {
545 let epochs_to_ack = task.epochs_to_ack();
546 match task
547 .complete_barrier(&*self.context, self.env.clone())
548 .await
549 {
550 Ok(hummock_version_stats) => {
551 self.checkpoint_control
552 .ack_completed(BarrierCompleteOutput {
553 epochs_to_ack,
554 hummock_version_stats,
555 });
556 }
557 Err(e) => {
558 error!(
559 err = %e.as_report(),
560 "failed to complete barrier during recovery"
561 );
562 break;
563 }
564 }
565 }
566 }
567 self.checkpoint_control.clear_on_err(err);
568 }
569}
570
571impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
572 async fn failure_recovery(&mut self, err: MetaError) {
574 self.clear_on_err(&err).await;
575
576 if self.enable_recovery {
577 let span = tracing::info_span!(
578 "failure_recovery",
579 error = %err.as_report(),
580 );
581
582 crate::telemetry::report_event(
583 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
584 "failure_recovery",
585 0,
586 None,
587 None,
588 None,
589 );
590
591 let reason = RecoveryReason::Failover(err);
592
593 self.recovery(false, reason).instrument(span).await;
596 } else {
597 panic!(
598 "a streaming error occurred while recovery is disabled, aborting: {:?}",
599 err.as_report()
600 );
601 }
602 }
603
604 async fn adhoc_recovery(&mut self) {
605 let err = MetaErrorInner::AdhocRecovery.into();
606 self.clear_on_err(&err).await;
607
608 let span = tracing::info_span!(
609 "adhoc_recovery",
610 error = %err.as_report(),
611 );
612
613 crate::telemetry::report_event(
614 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
615 "adhoc_recovery",
616 0,
617 None,
618 None,
619 None,
620 );
621
622 self.recovery(false, RecoveryReason::Adhoc)
625 .instrument(span)
626 .await;
627 }
628}
629
630impl<C> GlobalBarrierWorker<C> {
631 pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
633 use risingwave_pb::meta::event_log;
635 let event = event_log::EventCollectBarrierFail {
636 error: error.to_report_string(),
637 };
638 env.event_log_manager_ref()
639 .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
640 }
641}
642
643mod retry_strategy {
644 use std::time::Duration;
645
646 use tokio_retry::strategy::{ExponentialBackoff, jitter};
647
648 const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
650 const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
652
653 pub(crate) type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
672
673 pub(crate) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
674 Box::pin(tokio::time::sleep(duration))
675 }
676
677 pub(crate) type RetryBackoffStrategy =
678 impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
679
680 #[inline(always)]
682 pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> + Send + 'static {
683 ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
684 .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
685 .map(jitter)
686 }
687
688 #[define_opaque(RetryBackoffStrategy)]
689 pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
690 get_retry_strategy().map(get_retry_backoff_future)
691 }
692}
693
694pub(crate) use retry_strategy::*;
695use risingwave_common::error::tonic::extra::{Score, ScoredError};
696use risingwave_pb::meta::event_log::{Event, EventRecovery};
697
698use crate::barrier::edge_builder::FragmentEdgeBuilder;
699
700impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
701 pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
709 self.control_stream_manager.clear();
711
712 let reason_str = match &recovery_reason {
713 RecoveryReason::Bootstrap => "bootstrap".to_owned(),
714 RecoveryReason::Failover(err) => {
715 format!("failed over: {}", err.as_report())
716 }
717 RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
718 };
719 self.context.abort_and_mark_blocked(None, recovery_reason);
720
721 self.recovery_inner(is_paused, reason_str).await;
722 self.context.mark_ready(MarkReadyOptions::Global {
723 blocked_databases: self.checkpoint_control.recovering_databases().collect(),
724 });
725 }
726
727 #[await_tree::instrument("recovery({recovery_reason})")]
728 async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
729 let event_log_manager_ref = self.env.event_log_manager_ref();
730
731 tracing::info!("recovery start!");
732 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
733 EventRecovery::global_recovery_start(recovery_reason.clone()),
734 )]);
735
736 let retry_strategy = get_retry_strategy();
737
738 let recovery_timer = GLOBAL_META_METRICS
741 .recovery_latency
742 .with_label_values(&["global"])
743 .start_timer();
744
745 let enable_per_database_isolation = self.enable_per_database_isolation();
746
747 let new_state = tokio_retry::Retry::spawn(retry_strategy, || async {
748 self.env.stream_client_pool().invalidate_all();
749 self.context
754 .notify_creating_job_failed(None, recovery_reason.clone())
755 .await;
756 let runtime_info_snapshot = self
757 .context
758 .reload_runtime_info()
759 .await?;
760 runtime_info_snapshot.validate().inspect_err(|e| {
761 warn!(err = ?e.as_report(), ?runtime_info_snapshot, "reloaded runtime info failed to validate");
762 })?;
763 let BarrierWorkerRuntimeInfoSnapshot {
764 active_streaming_nodes,
765 database_job_infos,
766 mut state_table_committed_epochs,
767 mut state_table_log_epochs,
768 mut mv_depended_subscriptions,
769 stream_actors,
770 fragment_relations,
771 mut source_splits,
772 mut background_jobs,
773 hummock_version_stats,
774 database_infos,
775 mut cdc_table_snapshot_split_assignment,
776 } = runtime_info_snapshot;
777
778 self.sink_manager.reset().await;
779 let term_id = Uuid::new_v4().to_string();
780
781
782 let mut control_stream_manager = ControlStreamManager::recover(
783 self.env.clone(),
784 active_streaming_nodes.current(),
785 &term_id,
786 self.context.clone(),
787 )
788 .await;
789
790
791 {
792 let mut builder = FragmentEdgeBuilder::new(database_job_infos.values().flat_map(|jobs| jobs.values().flat_map(|fragments| fragments.values())), &control_stream_manager);
793 builder.add_relations(&fragment_relations);
794 let mut edges = builder.build();
795
796 let mut collected_databases = HashMap::new();
797 let mut collecting_databases = HashMap::new();
798 let mut failed_databases = HashSet::new();
799 for (database_id, jobs) in database_job_infos {
800 let result = control_stream_manager.inject_database_initial_barrier(
801 database_id,
802 jobs,
803 &mut state_table_committed_epochs,
804 &mut state_table_log_epochs,
805 &mut edges,
806 &stream_actors,
807 &mut source_splits,
808 &mut background_jobs,
809 &mut mv_depended_subscriptions,
810 is_paused,
811 &hummock_version_stats,
812 &mut cdc_table_snapshot_split_assignment,
813 );
814 let node_to_collect = match result {
815 Ok(info) => {
816 info
817 }
818 Err(e) => {
819 warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
820 assert!(failed_databases.insert(database_id), "non-duplicate");
821 continue;
822 }
823 };
824 if !node_to_collect.is_collected() {
825 assert!(collecting_databases.insert(database_id, node_to_collect).is_none());
826 } else {
827 warn!(%database_id, "database has no node to inject initial barrier");
828 assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
829 }
830 }
831 while !collecting_databases.is_empty() {
832 let (worker_id, result) =
833 control_stream_manager.next_response(&term_id, &self.context).await;
834 let resp = match result {
835 Err(e) => {
836 warn!(%worker_id, err = %e.as_report(), "worker node failure during recovery");
837 for (failed_database_id, _) in collecting_databases.extract_if(|_, node_to_collect| {
838 !node_to_collect.is_valid_after_worker_err(worker_id)
839 }) {
840 warn!(%failed_database_id, %worker_id, "database failed to recovery in global recovery due to worker node err");
841 assert!(failed_databases.insert(failed_database_id));
842 }
843 continue;
844 }
845 Ok(resp) => {
846 match resp {
847 Response::CompleteBarrier(resp) => {
848 resp
849 }
850 Response::ReportDatabaseFailure(resp) => {
851 let database_id = resp.database_id;
852 if collecting_databases.remove(&database_id).is_some() {
853 warn!(%database_id, %worker_id, "database reset during global recovery");
854 assert!(failed_databases.insert(database_id));
855 } else if collected_databases.remove(&database_id).is_some() {
856 warn!(%database_id, %worker_id, "database initialized but later reset during global recovery");
857 assert!(failed_databases.insert(database_id));
858 } else {
859 assert!(failed_databases.contains(&database_id));
860 }
861 continue;
862 }
863 other @ (Response::Init(_) | Response::Shutdown(_) | Response::ResetDatabase(_)) => {
864 return Err(anyhow!("get unexpected resp {:?}", other).into());
865 }
866 }
867 }
868 };
869 assert_eq!(worker_id, resp.worker_id);
870 let database_id = resp.database_id;
871 if failed_databases.contains(&database_id) {
872 assert!(!collecting_databases.contains_key(&database_id));
873 continue;
875 }
876 let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
877 unreachable!("should exist")
878 };
879 let node_to_collect = entry.get_mut();
880 node_to_collect.collect_resp(resp);
881 if node_to_collect.is_collected() {
882 let node_to_collect = entry.remove();
883 assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
884 }
885 }
886 debug!("collected initial barrier");
887 if !stream_actors.is_empty() {
888 warn!(actor_ids = ?stream_actors.keys().collect_vec(), "unused stream actors in recovery");
889 }
890 if !source_splits.is_empty() {
891 warn!(actor_ids = ?source_splits.keys().collect_vec(), "unused actor source splits in recovery");
892 }
893 if !background_jobs.is_empty() {
894 warn!(job_ids = ?background_jobs.keys().collect_vec(), "unused recovered background mview in recovery");
895 }
896 if !mv_depended_subscriptions.is_empty() {
897 warn!(?mv_depended_subscriptions, "unused subscription infos in recovery");
898 }
899 if !state_table_committed_epochs.is_empty() {
900 warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
901 }
902 if !enable_per_database_isolation && !failed_databases.is_empty() {
903 return Err(anyhow!(
904 "global recovery failed due to failure of databases {:?}",
905 failed_databases.iter().map(|database_id| database_id.as_raw_id()).collect_vec()).into()
906 );
907 }
908 let checkpoint_control = CheckpointControl::recover(
909 collected_databases,
910 failed_databases,
911 &mut control_stream_manager,
912 hummock_version_stats,
913 self.env.clone(),
914 );
915
916 let reader = self.env.system_params_reader().await;
917 let checkpoint_frequency = reader.checkpoint_frequency();
918 let barrier_interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
919 let periodic_barriers = PeriodicBarriers::new(
920 barrier_interval,
921 checkpoint_frequency,
922 database_infos,
923 );
924
925 Ok((
926 active_streaming_nodes,
927 control_stream_manager,
928 checkpoint_control,
929 term_id,
930 periodic_barriers,
931 ))
932 }
933 }.inspect_err(|err: &MetaError| {
934 tracing::error!(error = %err.as_report(), "recovery failed");
935 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
936 EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
937 )]);
938 GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
939 }))
940 .instrument(tracing::info_span!("recovery_attempt"))
941 .await
942 .expect("Retry until recovery success.");
943
944 let duration = recovery_timer.stop_and_record();
945
946 (
947 self.active_streaming_nodes,
948 self.control_stream_manager,
949 self.checkpoint_control,
950 self.term_id,
951 self.periodic_barriers,
952 ) = new_state;
953
954 tracing::info!("recovery success");
955
956 let recovering_databases = self
957 .checkpoint_control
958 .recovering_databases()
959 .map(|database| database.as_raw_id())
960 .collect_vec();
961 let running_databases = self
962 .checkpoint_control
963 .running_databases()
964 .map(|database| database.as_raw_id())
965 .collect_vec();
966
967 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
968 EventRecovery::global_recovery_success(
969 recovery_reason.clone(),
970 duration as f32,
971 running_databases,
972 recovering_databases,
973 ),
974 )]);
975
976 self.env
977 .notification_manager()
978 .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
979 self.env
980 .notification_manager()
981 .notify_compute_without_version(Operation::Update, Info::Recovery(Recovery {}));
982 }
983}