1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::anyhow;
22use arc_swap::ArcSwap;
23use futures::TryFutureExt;
24use itertools::Itertools;
25use risingwave_common::catalog::DatabaseId;
26use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_pb::meta::Recovery;
29use risingwave_pb::meta::subscribe_response::{Info, Operation};
30use risingwave_pb::stream_service::streaming_control_stream_response::Response;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tokio::sync::oneshot::{Receiver, Sender};
34use tokio::task::JoinHandle;
35use tokio::time::Instant;
36use tonic::Status;
37use tracing::{Instrument, debug, error, info, warn};
38use uuid::Uuid;
39
40use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
41use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
42use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
43use crate::barrier::rpc::{ControlStreamManager, merge_node_rpc_errors};
44use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
45use crate::barrier::{
46 BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, RecoveryReason,
47 schedule,
48};
49use crate::error::MetaErrorInner;
50use crate::hummock::HummockManagerRef;
51use crate::manager::sink_coordination::SinkCoordinatorManager;
52use crate::manager::{
53 ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
54 MetadataManager,
55};
56use crate::rpc::metrics::GLOBAL_META_METRICS;
57use crate::stream::{ScaleControllerRef, SourceManagerRef};
58use crate::{MetaError, MetaResult};
59
60pub(super) struct GlobalBarrierWorker<C> {
70 enable_recovery: bool,
72
73 periodic_barriers: PeriodicBarriers,
75
76 in_flight_barrier_nums: usize,
78
79 enable_per_database_isolation: bool,
80
81 pub(super) context: Arc<C>,
82
83 env: MetaSrvEnv,
84
85 checkpoint_control: CheckpointControl,
86
87 completing_task: CompletingTask,
90
91 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
92
93 active_streaming_nodes: ActiveStreamingWorkerNodes,
94
95 sink_manager: SinkCoordinatorManager,
96
97 control_stream_manager: ControlStreamManager,
98
99 term_id: String,
100}
101
102impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
103 pub async fn new(
105 scheduled_barriers: schedule::ScheduledBarriers,
106 env: MetaSrvEnv,
107 metadata_manager: MetadataManager,
108 hummock_manager: HummockManagerRef,
109 source_manager: SourceManagerRef,
110 sink_manager: SinkCoordinatorManager,
111 scale_controller: ScaleControllerRef,
112 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
113 ) -> Self {
114 let enable_recovery = env.opts.enable_recovery;
115 let in_flight_barrier_nums = env.opts.in_flight_barrier_nums;
116
117 let active_streaming_nodes =
118 ActiveStreamingWorkerNodes::uninitialized(metadata_manager.clone());
119
120 let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
121
122 let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
123 scheduled_barriers,
124 status,
125 metadata_manager,
126 hummock_manager,
127 source_manager,
128 scale_controller,
129 env.clone(),
130 ));
131
132 let control_stream_manager = ControlStreamManager::new(env.clone());
133
134 let reader = env.system_params_reader().await;
135 let checkpoint_frequency = reader.checkpoint_frequency() as _;
136 let enable_per_database_isolation = reader.per_database_isolation();
137 let interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
138 let periodic_barriers = PeriodicBarriers::new(interval, checkpoint_frequency);
139 tracing::info!(
140 "Starting barrier scheduler with: checkpoint_frequency={:?}",
141 checkpoint_frequency,
142 );
143
144 let checkpoint_control = CheckpointControl::new(env.clone());
145 Self {
146 enable_recovery,
147 periodic_barriers,
148 in_flight_barrier_nums,
149 enable_per_database_isolation,
150 context,
151 env,
152 checkpoint_control,
153 completing_task: CompletingTask::None,
154 request_rx,
155 active_streaming_nodes,
156 sink_manager,
157 control_stream_manager,
158 term_id: "uninitialized".into(),
159 }
160 }
161
162 pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
163 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
164 let join_handle = tokio::spawn(async move {
165 self.run(shutdown_rx).await;
166 });
167
168 (join_handle, shutdown_tx)
169 }
170
171 async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
173 let paused = self
174 .env
175 .system_params_reader()
176 .await
177 .pause_on_next_bootstrap();
178 if paused {
179 warn!(
180 "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
181 It will now be reset to `false`. \
182 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
183 PAUSE_ON_NEXT_BOOTSTRAP_KEY
184 );
185 self.env
186 .system_params_manager_impl_ref()
187 .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
188 .await?;
189 }
190 Ok(paused)
191 }
192
193 async fn run(mut self, shutdown_rx: Receiver<()>) {
195 tracing::info!(
196 "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
197 self.enable_recovery,
198 self.in_flight_barrier_nums,
199 );
200
201 if !self.enable_recovery {
202 let job_exist = self
203 .context
204 .metadata_manager
205 .catalog_controller
206 .has_any_streaming_jobs()
207 .await
208 .unwrap();
209 if job_exist {
210 panic!(
211 "Some streaming jobs already exist in meta, please start with recovery enabled \
212 or clean up the metadata using `./risedev clean-data`"
213 );
214 }
215 }
216
217 {
218 let span = tracing::info_span!("bootstrap_recovery");
223 crate::telemetry::report_event(
224 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
225 "normal_recovery",
226 0,
227 None,
228 None,
229 None,
230 );
231
232 let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
233
234 self.recovery(paused, RecoveryReason::Bootstrap)
235 .instrument(span)
236 .await;
237 }
238
239 self.run_inner(shutdown_rx).await
240 }
241}
242
243impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
244 async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
245 let (local_notification_tx, mut local_notification_rx) =
246 tokio::sync::mpsc::unbounded_channel();
247 self.env
248 .notification_manager()
249 .insert_local_sender(local_notification_tx)
250 .await;
251
252 loop {
254 tokio::select! {
255 biased;
256
257 _ = &mut shutdown_rx => {
259 tracing::info!("Barrier manager is stopped");
260 break;
261 }
262
263 request = self.request_rx.recv() => {
264 if let Some(request) = request {
265 match request {
266 BarrierManagerRequest::GetDdlProgress(result_tx) => {
267 let progress = self.checkpoint_control.gen_ddl_progress();
268 if result_tx.send(progress).is_err() {
269 error!("failed to send get ddl progress");
270 }
271 }BarrierManagerRequest::AdhocRecovery(sender) => {
273 self.adhoc_recovery().await;
274 if sender.send(()).is_err() {
275 warn!("failed to notify finish of adhoc recovery");
276 }
277 }
278 }
279 } else {
280 tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
281 return;
282 }
283 }
284
285 changed_worker = self.active_streaming_nodes.changed() => {
286 #[cfg(debug_assertions)]
287 {
288 self.active_streaming_nodes.validate_change().await;
289 }
290
291 info!(?changed_worker, "worker changed");
292
293 if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker {
294 self.control_stream_manager.add_worker(node, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
295 }
296 }
297
298 notification = local_notification_rx.recv() => {
299 let notification = notification.unwrap();
300 if let LocalNotification::SystemParamsChange(p) = notification {
301 {
302 self.periodic_barriers.set_min_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
303 self.periodic_barriers
304 .set_checkpoint_frequency(p.checkpoint_frequency() as usize);
305 self.enable_per_database_isolation = p.per_database_isolation();
306 }
307 }
308 }
309 complete_result = self
310 .completing_task
311 .next_completed_barrier(
312 &mut self.periodic_barriers,
313 &mut self.checkpoint_control,
314 &mut self.control_stream_manager,
315 &self.context,
316 &self.env,
317 ) => {
318 match complete_result {
319 Ok(output) => {
320 self.checkpoint_control.ack_completed(output);
321 }
322 Err(e) => {
323 self.failure_recovery(e).await;
324 }
325 }
326 },
327 event = self.checkpoint_control.next_event() => {
328 let result: MetaResult<()> = try {
329 match event {
330 CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
331 let database_id = entering_initializing.database_id();
332 let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
333 resp.root_err.as_ref().map(|root_err| {
334 (*worker_id, ScoredError {
335 error: Status::internal(&root_err.err_msg),
336 score: Score(root_err.score)
337 })
338 })
339 }));
340 Self::report_collect_failure(&self.env, &error);
341 self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
342 match self.context.reload_database_runtime_info(database_id).await? { Some(runtime_info) => {
343 runtime_info.validate(database_id, &self.active_streaming_nodes).inspect_err(|e| {
344 warn!(database_id = database_id.database_id, err = ?e.as_report(), ?runtime_info, "reloaded database runtime info failed to validate");
345 })?;
346 let workers = runtime_info.database_fragment_info.workers();
347 for worker_id in workers {
348 if !self.control_stream_manager.contains_worker(worker_id) {
349 let node = self.active_streaming_nodes.current()[&worker_id].clone();
350 self.control_stream_manager.try_add_worker(node, entering_initializing.control().inflight_infos(), self.term_id.clone(), &*self.context).await;
351 }
352 }
353 entering_initializing.enter(runtime_info, &mut self.control_stream_manager);
354 } _ => {
355 info!(database_id = database_id.database_id, "database removed after reloading empty runtime info");
356 self.context.mark_ready(MarkReadyOptions::Database(database_id));
358 entering_initializing.remove();
359 }}
360 }
361 CheckpointControlEvent::EnteringRunning(entering_running) => {
362 self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
363 entering_running.enter();
364 }
365 }
366 };
367 if let Err(e) = result {
368 self.failure_recovery(e).await;
369 }
370 }
371 (worker_id, resp_result) = self.control_stream_manager.next_response() => {
372 let result: MetaResult<()> = try {
373 let resp = match resp_result {
374 Err(err) => {
375 let failed_databases = self.checkpoint_control.databases_failed_at_worker_err(worker_id);
376 if !failed_databases.is_empty() {
377 if !self.enable_recovery {
378 panic!("control stream to worker {} failed but recovery not enabled: {:?}", worker_id, err.as_report());
379 }
380 if !self.enable_per_database_isolation {
381 Err(err.clone())?;
382 }
383 Self::report_collect_failure(&self.env, &err);
384 for database_id in failed_databases {
385 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
386 warn!(worker_id, database_id = database_id.database_id, "database entering recovery on node failure");
387 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
388 self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
389 let output = self.completing_task.wait_completing_task().await?;
391 entering_recovery.enter(output, &mut self.control_stream_manager);
392 }
393 }
394 } else {
395 warn!(worker_id, "no barrier to collect from worker, ignore err");
396 }
397 continue;
398 }
399 Ok(resp) => resp,
400 };
401 match resp {
402 Response::CompleteBarrier(resp) => {
403 self.checkpoint_control.barrier_collected(resp, &mut self.periodic_barriers)?;
404 },
405 Response::ReportDatabaseFailure(resp) => {
406 if !self.enable_recovery {
407 panic!("database failure reported but recovery not enabled: {:?}", resp)
408 }
409 if !self.enable_per_database_isolation {
410 Err(anyhow!("database {} reset", resp.database_id))?;
411 }
412 let database_id = DatabaseId::new(resp.database_id);
413 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
414 warn!(database_id = database_id.database_id, "database entering recovery");
415 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
416 let output = self.completing_task.wait_completing_task().await?;
418 entering_recovery.enter(output, &mut self.control_stream_manager);
419 }
420 }
421 Response::ResetDatabase(resp) => {
422 self.checkpoint_control.on_reset_database_resp(worker_id, resp);
423 }
424 other @ Response::Init(_) | other @ Response::Shutdown(_) => {
425 Err(anyhow!("get expected response: {:?}", other))?;
426 }
427 }
428 };
429 if let Err(e) = result {
430 self.failure_recovery(e).await;
431 }
432 }
433 new_barrier = self.periodic_barriers.next_barrier(&*self.context),
434 if self
435 .checkpoint_control
436 .can_inject_barrier(self.in_flight_barrier_nums) => {
437 if let Some(failed_databases) = self.checkpoint_control.handle_new_barrier(new_barrier, &mut self.control_stream_manager)
438 && !failed_databases.is_empty() {
439 if !self.enable_recovery {
440 panic!(
441 "failed to inject barrier for some databases but recovery not enabled: {:?}",
442 failed_databases.iter().map(|(database_id, e)| (database_id, e.as_report())).collect_vec()
443 );
444 }
445 let result: MetaResult<_> = try {
446 if !self.enable_per_database_isolation {
447 let errs = failed_databases.iter().map(|(database_id, e)| (database_id, e.as_report())).collect_vec();
448 let err = anyhow!("failed to inject barrier for databases: {:?}", errs);
449 Err(err)?;
450 } else {
451 for (database_id, err) in failed_databases {
452 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
453 warn!(%database_id, e = %err.as_report(),"database entering recovery on inject failure");
454 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(err).context("inject barrier failure").into()));
455 let output = self.completing_task.wait_completing_task().await?;
457 entering_recovery.enter(output, &mut self.control_stream_manager);
458 }
459 }
460 }
461 };
462 if let Err(e) = result {
463 self.failure_recovery(e).await;
464 }
465 }
466 }
467 }
468 self.checkpoint_control.update_barrier_nums_metrics();
469 }
470 }
471}
472
473impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
474 pub async fn clear_on_err(&mut self, err: &MetaError) {
476 let is_err = match replace(&mut self.completing_task, CompletingTask::None) {
478 CompletingTask::None => false,
479 CompletingTask::Completing {
480 epochs_to_ack,
481 join_handle,
482 ..
483 } => {
484 info!("waiting for completing command to finish in recovery");
485 match join_handle.await {
486 Err(e) => {
487 warn!(err = %e.as_report(), "failed to join completing task");
488 true
489 }
490 Ok(Err(e)) => {
491 warn!(
492 err = %e.as_report(),
493 "failed to complete barrier during clear"
494 );
495 true
496 }
497 Ok(Ok(hummock_version_stats)) => {
498 self.checkpoint_control
499 .ack_completed(BarrierCompleteOutput {
500 epochs_to_ack,
501 hummock_version_stats,
502 });
503 false
504 }
505 }
506 }
507 CompletingTask::Err(_) => true,
508 };
509 if !is_err {
510 while let Some(task) = self.checkpoint_control.next_complete_barrier_task(None) {
512 let epochs_to_ack = task.epochs_to_ack();
513 match task
514 .complete_barrier(&*self.context, self.env.clone())
515 .await
516 {
517 Ok(hummock_version_stats) => {
518 self.checkpoint_control
519 .ack_completed(BarrierCompleteOutput {
520 epochs_to_ack,
521 hummock_version_stats,
522 });
523 }
524 Err(e) => {
525 error!(
526 err = %e.as_report(),
527 "failed to complete barrier during recovery"
528 );
529 break;
530 }
531 }
532 }
533 }
534 self.checkpoint_control.clear_on_err(err);
535 }
536}
537
538impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
539 async fn failure_recovery(&mut self, err: MetaError) {
541 self.clear_on_err(&err).await;
542
543 if self.enable_recovery {
544 let span = tracing::info_span!(
545 "failure_recovery",
546 error = %err.as_report(),
547 );
548
549 crate::telemetry::report_event(
550 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
551 "failure_recovery",
552 0,
553 None,
554 None,
555 None,
556 );
557
558 let reason = RecoveryReason::Failover(err);
559
560 self.recovery(false, reason).instrument(span).await;
563 } else {
564 panic!(
565 "a streaming error occurred while recovery is disabled, aborting: {:?}",
566 err.as_report()
567 );
568 }
569 }
570
571 async fn adhoc_recovery(&mut self) {
572 let err = MetaErrorInner::AdhocRecovery.into();
573 self.clear_on_err(&err).await;
574
575 let span = tracing::info_span!(
576 "adhoc_recovery",
577 error = %err.as_report(),
578 );
579
580 crate::telemetry::report_event(
581 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
582 "adhoc_recovery",
583 0,
584 None,
585 None,
586 None,
587 );
588
589 self.recovery(false, RecoveryReason::Adhoc)
592 .instrument(span)
593 .await;
594 }
595}
596
597impl<C> GlobalBarrierWorker<C> {
598 pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
600 use risingwave_pb::meta::event_log;
602 let event = event_log::EventCollectBarrierFail {
603 error: error.to_report_string(),
604 };
605 env.event_log_manager_ref()
606 .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
607 }
608}
609
610mod retry_strategy {
611 use std::time::Duration;
612
613 use tokio_retry::strategy::{ExponentialBackoff, jitter};
614
615 const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
617 const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
619
620 mod retry_backoff_future {
621 use std::future::Future;
622 use std::time::Duration;
623
624 use tokio::time::sleep;
625
626 pub(crate) type RetryBackoffFuture = impl Future<Output = ()> + Unpin + Send + 'static;
627 pub(super) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
628 Box::pin(sleep(duration))
629 }
630 }
631 pub(crate) use retry_backoff_future::*;
632
633 pub(crate) type RetryBackoffStrategy =
634 impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
635
636 #[inline(always)]
637 pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> {
639 ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
640 .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
641 .map(jitter)
642 }
643
644 pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
645 get_retry_strategy().map(get_retry_backoff_future)
646 }
647}
648
649pub(crate) use retry_strategy::*;
650use risingwave_common::error::tonic::extra::{Score, ScoredError};
651use risingwave_meta_model::WorkerId;
652use risingwave_pb::meta::event_log::{Event, EventRecovery};
653
654use crate::barrier::edge_builder::FragmentEdgeBuilder;
655
656impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
657 pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
665 self.control_stream_manager.clear();
667
668 let reason_str = match &recovery_reason {
669 RecoveryReason::Bootstrap => "bootstrap".to_owned(),
670 RecoveryReason::Failover(err) => {
671 format!("failed over: {}", err.as_report())
672 }
673 RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
674 };
675
676 self.context.abort_and_mark_blocked(None, recovery_reason);
677
678 self.recovery_inner(is_paused, reason_str).await;
679 self.context.mark_ready(MarkReadyOptions::Global {
680 blocked_databases: self.checkpoint_control.recovering_databases().collect(),
681 });
682 }
683
684 async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
685 let event_log_manager_ref = self.env.event_log_manager_ref();
686
687 tracing::info!("recovery start!");
688 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
689 EventRecovery::global_recovery_start(recovery_reason.clone()),
690 )]);
691
692 let retry_strategy = get_retry_strategy();
693
694 let recovery_timer = GLOBAL_META_METRICS
697 .recovery_latency
698 .with_label_values(&["global"])
699 .start_timer();
700
701 let new_state = tokio_retry::Retry::spawn(retry_strategy, || async {
702 self.context
707 .notify_creating_job_failed(None, recovery_reason.clone())
708 .await;
709 let runtime_info_snapshot = self
710 .context
711 .reload_runtime_info()
712 .await?;
713 runtime_info_snapshot.validate().inspect_err(|e| {
714 warn!(err = ?e.as_report(), ?runtime_info_snapshot, "reloaded runtime info failed to validate");
715 })?;
716 let BarrierWorkerRuntimeInfoSnapshot {
717 active_streaming_nodes,
718 database_fragment_infos,
719 mut state_table_committed_epochs,
720 mut subscription_infos,
721 stream_actors,
722 fragment_relations,
723 mut source_splits,
724 mut background_jobs,
725 hummock_version_stats,
726 } = runtime_info_snapshot;
727
728 self.sink_manager.reset().await;
729 let term_id = Uuid::new_v4().to_string();
730
731 let mut control_stream_manager = ControlStreamManager::new(self.env.clone());
732 let reset_start_time = Instant::now();
733 let unconnected_worker = control_stream_manager
734 .reset(
735 active_streaming_nodes.current(),
736 term_id.clone(),
737 &*self.context,
738 )
739 .await;
740 info!(elapsed=?reset_start_time.elapsed(), ?unconnected_worker, "control stream reset");
741
742 {
743 let mut builder = FragmentEdgeBuilder::new(database_fragment_infos.values().flat_map(|info| info.fragment_infos()));
744 builder.add_relations(&fragment_relations);
745 let mut edges = builder.build();
746
747 let mut collected_databases = HashMap::new();
748 let mut collecting_databases = HashMap::new();
749 let mut failed_databases = HashSet::new();
750 for (database_id, info) in database_fragment_infos {
751 let result = control_stream_manager.inject_database_initial_barrier(
752 database_id,
753 info,
754 &mut state_table_committed_epochs,
755 &mut edges,
756 &stream_actors,
757 &mut source_splits,
758 &mut background_jobs,
759 subscription_infos.remove(&database_id).unwrap_or_default(),
760 is_paused,
761 &hummock_version_stats,
762 );
763 let node_to_collect = match result {
764 Ok(info) => {
765 info
766 }
767 Err(e) => {
768 warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
769 assert!(failed_databases.insert(database_id), "non-duplicate");
770 continue;
771 }
772 };
773 if !node_to_collect.is_collected() {
774 assert!(collecting_databases.insert(database_id, node_to_collect).is_none());
775 } else {
776 warn!(database_id = database_id.database_id, "database has no node to inject initial barrier");
777 assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
778 }
779 }
780 while !collecting_databases.is_empty() {
781 let (worker_id, result) =
782 control_stream_manager.next_response().await;
783 let resp = match result {
784 Err(e) => {
785 warn!(worker_id, err = %e.as_report(), "worker node failure during recovery");
786 for (failed_database_id,_ ) in collecting_databases.extract_if(|_, node_to_collect| {
787 !node_to_collect.is_valid_after_worker_err(worker_id)
788 }) {
789 warn!(%failed_database_id, worker_id, "database failed to recovery in global recovery due to worker node err");
790 assert!(failed_databases.insert(failed_database_id));
791 }
792 continue;
793 }
794 Ok(resp) => {
795 match resp {
796 Response::CompleteBarrier(resp) => {
797 resp
798 }
799 Response::ReportDatabaseFailure(resp) => {
800 let database_id = DatabaseId::new(resp.database_id);
801 if collecting_databases.remove(&database_id).is_some() {
802 warn!(%database_id, worker_id, "database reset during global recovery");
803 assert!(failed_databases.insert(database_id));
804 } else if collected_databases.remove(&database_id).is_some() {
805 warn!(%database_id, worker_id, "database initialized but later reset during global recovery");
806 assert!(failed_databases.insert(database_id));
807 } else {
808 assert!(failed_databases.contains(&database_id));
809 }
810 continue;
811 }
812 other @ (Response::Init(_) | Response::Shutdown(_) | Response::ResetDatabase(_))=> {
813 return Err(anyhow!("get unexpected resp {:?}", other).into());
814 }
815 }
816 }
817 };
818 assert_eq!(worker_id, resp.worker_id as WorkerId);
819 let database_id = DatabaseId::new(resp.database_id);
820 if failed_databases.contains(&database_id) {
821 assert!(!collecting_databases.contains_key(&database_id));
822 continue;
824 }
825 let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
826 unreachable!("should exist")
827 };
828 let node_to_collect = entry.get_mut();
829 node_to_collect.collect_resp(resp);
830 if node_to_collect.is_collected() {
831 let node_to_collect = entry.remove();
832 assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
833 }
834 }
835 debug!("collected initial barrier");
836 if !stream_actors.is_empty() {
837 warn!(actor_ids = ?stream_actors.keys().collect_vec(), "unused stream actors in recovery");
838 }
839 if !source_splits.is_empty() {
840 warn!(actor_ids = ?source_splits.keys().collect_vec(), "unused actor source splits in recovery");
841 }
842 if !background_jobs.is_empty() {
843 warn!(job_ids = ?background_jobs.keys().collect_vec(), "unused recovered background mview in recovery");
844 }
845 if !subscription_infos.is_empty() {
846 warn!(?subscription_infos, "unused subscription infos in recovery");
847 }
848 if !state_table_committed_epochs.is_empty() {
849 warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
850 }
851 if !self.enable_per_database_isolation && !failed_databases.is_empty() {
852 return Err(anyhow!(
853 "global recovery failed due to failure of databases {:?}",
854 failed_databases.iter().map(|database_id| database_id.database_id).collect_vec()).into()
855 );
856 }
857 let checkpoint_control = CheckpointControl::recover(
858 collected_databases,
859 failed_databases,
860 &mut control_stream_manager,
861 hummock_version_stats,
862 );
863 Ok((
864 active_streaming_nodes,
865 control_stream_manager,
866 checkpoint_control,
867 term_id,
868 ))
869 }
870 }.inspect_err(|err: &MetaError| {
871 tracing::error!(error = %err.as_report(), "recovery failed");
872 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
873 EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
874 )]);
875 GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
876 }))
877 .instrument(tracing::info_span!("recovery_attempt"))
878 .await
879 .expect("Retry until recovery success.");
880
881 let duration = recovery_timer.stop_and_record();
882
883 (
884 self.active_streaming_nodes,
885 self.control_stream_manager,
886 self.checkpoint_control,
887 self.term_id,
888 ) = new_state;
889
890 tracing::info!("recovery success");
891
892 let recovering_databases = self
893 .checkpoint_control
894 .recovering_databases()
895 .map(|database| database.database_id)
896 .collect_vec();
897 let running_databases = self
898 .checkpoint_control
899 .running_databases()
900 .map(|database| database.database_id)
901 .collect_vec();
902
903 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
904 EventRecovery::global_recovery_success(
905 recovery_reason.clone(),
906 duration as f32,
907 running_databases,
908 recovering_databases,
909 ),
910 )]);
911
912 self.env
913 .notification_manager()
914 .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
915 }
916}