1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::anyhow;
22use arc_swap::ArcSwap;
23use futures::TryFutureExt;
24use itertools::Itertools;
25use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_pb::meta::Recovery;
28use risingwave_pb::meta::subscribe_response::{Info, Operation};
29use risingwave_pb::stream_service::streaming_control_stream_response::Response;
30use thiserror_ext::AsReport;
31use tokio::sync::mpsc;
32use tokio::sync::oneshot::{Receiver, Sender};
33use tokio::task::JoinHandle;
34use tonic::Status;
35use tracing::{Instrument, debug, error, info, warn};
36use uuid::Uuid;
37
38use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
39use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
40use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
41use crate::barrier::rpc::{
42 ControlStreamManager, WorkerNodeEvent, from_partial_graph_id, merge_node_rpc_errors,
43};
44use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
45use crate::barrier::{
46 BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, RecoveryReason,
47 schedule,
48};
49use crate::error::MetaErrorInner;
50use crate::hummock::HummockManagerRef;
51use crate::manager::sink_coordination::SinkCoordinatorManager;
52use crate::manager::{
53 ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
54 MetadataManager,
55};
56use crate::rpc::metrics::GLOBAL_META_METRICS;
57use crate::stream::{GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef};
58use crate::{MetaError, MetaResult};
59
60pub(super) struct GlobalBarrierWorker<C> {
70 enable_recovery: bool,
72
73 periodic_barriers: PeriodicBarriers,
75
76 system_enable_per_database_isolation: bool,
78
79 pub(super) context: Arc<C>,
80
81 env: MetaSrvEnv,
82
83 checkpoint_control: CheckpointControl,
84
85 completing_task: CompletingTask,
88
89 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
90
91 active_streaming_nodes: ActiveStreamingWorkerNodes,
92
93 control_stream_manager: ControlStreamManager,
94
95 term_id: String,
96}
97
98impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
99 pub(super) async fn new_inner(
100 env: MetaSrvEnv,
101 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
102 context: Arc<C>,
103 ) -> Self {
104 let enable_recovery = env.opts.enable_recovery;
105
106 let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized();
107
108 let control_stream_manager = ControlStreamManager::new(env.clone());
109
110 let reader = env.system_params_reader().await;
111 let system_enable_per_database_isolation = reader.per_database_isolation();
112 let periodic_barriers = PeriodicBarriers::default();
114
115 let checkpoint_control = CheckpointControl::new(env.clone());
116 Self {
117 enable_recovery,
118 periodic_barriers,
119 system_enable_per_database_isolation,
120 context,
121 env,
122 checkpoint_control,
123 completing_task: CompletingTask::None,
124 request_rx,
125 active_streaming_nodes,
126 control_stream_manager,
127 term_id: "uninitialized".into(),
128 }
129 }
130}
131
132impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
133 pub async fn new(
135 scheduled_barriers: schedule::ScheduledBarriers,
136 env: MetaSrvEnv,
137 metadata_manager: MetadataManager,
138 hummock_manager: HummockManagerRef,
139 source_manager: SourceManagerRef,
140 sink_manager: SinkCoordinatorManager,
141 scale_controller: ScaleControllerRef,
142 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
143 barrier_scheduler: schedule::BarrierScheduler,
144 refresh_manager: GlobalRefreshManagerRef,
145 ) -> Self {
146 let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
147
148 let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
149 scheduled_barriers,
150 status,
151 metadata_manager,
152 hummock_manager,
153 source_manager,
154 scale_controller,
155 env.clone(),
156 barrier_scheduler,
157 refresh_manager,
158 sink_manager,
159 ));
160
161 Self::new_inner(env, request_rx, context).await
162 }
163
164 pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
165 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
166 let fut = (self.env.await_tree_reg())
167 .register_derived_root("Global Barrier Worker")
168 .instrument(self.run(shutdown_rx));
169 let join_handle = tokio::spawn(fut);
170
171 (join_handle, shutdown_tx)
172 }
173
174 async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
176 let paused = self
177 .env
178 .system_params_reader()
179 .await
180 .pause_on_next_bootstrap()
181 || self.env.opts.pause_on_next_bootstrap_offline;
182
183 if paused {
184 warn!(
185 "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
186 It will now be reset to `false`. \
187 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
188 PAUSE_ON_NEXT_BOOTSTRAP_KEY
189 );
190 self.env
191 .system_params_manager_impl_ref()
192 .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
193 .await?;
194 }
195 Ok(paused)
196 }
197
198 async fn run(mut self, shutdown_rx: Receiver<()>) {
200 tracing::info!(
201 "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
202 self.enable_recovery,
203 self.checkpoint_control.in_flight_barrier_nums,
204 );
205
206 if !self.enable_recovery {
207 let job_exist = self
208 .context
209 .metadata_manager
210 .catalog_controller
211 .has_any_streaming_jobs()
212 .await
213 .unwrap();
214 if job_exist {
215 panic!(
216 "Some streaming jobs already exist in meta, please start with recovery enabled \
217 or clean up the metadata using `./risedev clean-data`"
218 );
219 }
220 }
221
222 {
223 let span = tracing::info_span!("bootstrap_recovery");
228 crate::telemetry::report_event(
229 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
230 "normal_recovery",
231 0,
232 None,
233 None,
234 None,
235 );
236
237 let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
238
239 self.recovery(paused, RecoveryReason::Bootstrap)
240 .instrument(span)
241 .await;
242 }
243
244 self.run_inner(shutdown_rx).await
245 }
246}
247
248impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
249 fn enable_per_database_isolation(&self) -> bool {
250 self.system_enable_per_database_isolation && {
251 if let Err(e) =
252 risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()
253 {
254 warn!(error = %e.as_report(), "DatabaseFailureIsolation disabled by license");
255 false
256 } else {
257 true
258 }
259 }
260 }
261
262 pub(super) async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
263 let (local_notification_tx, mut local_notification_rx) =
264 tokio::sync::mpsc::unbounded_channel();
265 self.env
266 .notification_manager()
267 .insert_local_sender(local_notification_tx);
268
269 loop {
271 tokio::select! {
272 biased;
273
274 _ = &mut shutdown_rx => {
276 tracing::info!("Barrier manager is stopped");
277 break;
278 }
279
280 request = self.request_rx.recv() => {
281 if let Some(request) = request {
282 match request {
283 BarrierManagerRequest::GetBackfillProgress(result_tx) => {
284 let progress = self.checkpoint_control.gen_backfill_progress();
285 if result_tx.send(progress).is_err() {
286 error!("failed to send get ddl progress");
287 }
288 }
289 BarrierManagerRequest::GetCdcProgress(result_tx) => {
290 let progress = self.checkpoint_control.gen_cdc_progress();
291 if result_tx.send(progress).is_err() {
292 error!("failed to send get ddl progress");
293 }
294 }
295 BarrierManagerRequest::AdhocRecovery(sender) => {
297 self.adhoc_recovery().await;
298 if sender.send(()).is_err() {
299 warn!("failed to notify finish of adhoc recovery");
300 }
301 }
302 BarrierManagerRequest::UpdateDatabaseBarrier {
303 database_id,
304 barrier_interval_ms,
305 checkpoint_frequency,
306 sender,
307 } => {
308 self.periodic_barriers
309 .update_database_barrier(
310 database_id,
311 barrier_interval_ms,
312 checkpoint_frequency,
313 );
314 if sender.send(()).is_err() {
315 warn!("failed to notify finish of update database barrier");
316 }
317 }
318 }
319 } else {
320 tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
321 return;
322 }
323 }
324
325 changed_worker = self.active_streaming_nodes.changed() => {
326 #[cfg(debug_assertions)]
327 {
328 self.active_streaming_nodes.validate_change().await;
329 }
330
331 info!(?changed_worker, "worker changed");
332
333 match changed_worker {
334 ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) => {
335 self.control_stream_manager.add_worker(node, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
336 }
337 ActiveStreamingWorkerChange::Remove(node) => {
338 self.control_stream_manager.remove_worker(node);
339 }
340 }
341 }
342
343 notification = local_notification_rx.recv() => {
344 let notification = notification.unwrap();
345 if let LocalNotification::SystemParamsChange(p) = notification {
346 {
347 self.periodic_barriers.set_sys_barrier_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
348 self.periodic_barriers
349 .set_sys_checkpoint_frequency(p.checkpoint_frequency());
350 self.system_enable_per_database_isolation = p.per_database_isolation();
351 }
352 }
353 }
354 complete_result = self
355 .completing_task
356 .next_completed_barrier(
357 &mut self.periodic_barriers,
358 &mut self.checkpoint_control,
359 &mut self.control_stream_manager,
360 &self.context,
361 &self.env,
362 ) => {
363 match complete_result {
364 Ok(output) => {
365 self.checkpoint_control.ack_completed(output);
366 }
367 Err(e) => {
368 self.failure_recovery(e).await;
369 }
370 }
371 },
372 event = self.checkpoint_control.next_event() => {
373 let result: MetaResult<()> = try {
374 match event {
375 CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
376 let database_id = entering_initializing.database_id();
377 let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
378 resp.root_err.as_ref().map(|root_err| {
379 (*worker_id, ScoredError {
380 error: Status::internal(&root_err.err_msg),
381 score: Score(root_err.score)
382 })
383 })
384 }));
385 Self::report_collect_failure(&self.env, &error);
386 self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
387 match self.context.reload_database_runtime_info(database_id).await.and_then(|runtime_info| {
388 runtime_info.map(|runtime_info| {
389 runtime_info.validate(database_id, &self.active_streaming_nodes).inspect_err(|e| {
390 warn!(%database_id, err = ?e.as_report(), ?runtime_info, "reloaded database runtime info failed to validate");
391 })?;
392 Ok(runtime_info)
393 })
394 .transpose()
395 }) {
396 Ok(Some(runtime_info)) => {
397 entering_initializing.enter(runtime_info, &mut self.control_stream_manager);
398 }
399 Ok(None) => {
400 info!(%database_id, "database removed after reloading empty runtime info");
401 self.context.mark_ready(MarkReadyOptions::Database(database_id));
403 entering_initializing.remove();
404 }
405 Err(e) => {
406 entering_initializing.fail_reload_runtime_info(e);
407 }
408 }
409 }
410 CheckpointControlEvent::EnteringRunning(entering_running) => {
411 self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
412 entering_running.enter();
413 }
414 }
415 };
416 if let Err(e) = result {
417 self.failure_recovery(e).await;
418 }
419 }
420 (worker_id, event) = self.control_stream_manager.next_event(&self.term_id, &self.context) => {
421 let resp_result = match event {
422 WorkerNodeEvent::Response(result) => {
423 result
424 }
425 WorkerNodeEvent::Connected(connected) => {
426 connected.initialize(self.checkpoint_control.inflight_infos());
427 continue;
428 }
429 };
430 let result: MetaResult<()> = try {
431 let resp = match resp_result {
432 Err(err) => {
433 let failed_databases = self.checkpoint_control.databases_failed_at_worker_err(worker_id);
434 if !failed_databases.is_empty() {
435 if !self.enable_recovery {
436 panic!("control stream to worker {} failed but recovery not enabled: {:?}", worker_id, err.as_report());
437 }
438 if !self.enable_per_database_isolation() {
439 Err(err.clone())?;
440 }
441 Self::report_collect_failure(&self.env, &err);
442 for database_id in failed_databases {
443 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
444 warn!(%worker_id, %database_id, "database entering recovery on node failure");
445 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
446 self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
447 let output = self.completing_task.wait_completing_task().await?;
449 entering_recovery.enter(output, &mut self.control_stream_manager);
450 }
451 }
452 } else {
453 warn!(%worker_id, "no barrier to collect from worker, ignore err");
454 }
455 continue;
456 }
457 Ok(resp) => resp,
458 };
459 match resp {
460 Response::CompleteBarrier(resp) => {
461 self.checkpoint_control.barrier_collected(resp, &mut self.periodic_barriers)?;
462 },
463 Response::ReportPartialGraphFailure(resp) => {
464 if !self.enable_recovery {
465 panic!("database failure reported but recovery not enabled: {:?}", resp)
466 }
467 let (database_id, _) = from_partial_graph_id(resp.partial_graph_id);
468 if !self.enable_per_database_isolation() {
469 Err(anyhow!("database {} reset", database_id))?;
470 }
471 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
472 warn!(%database_id, "database entering recovery");
473 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
474 let output = self.completing_task.wait_completing_task().await?;
476 entering_recovery.enter(output, &mut self.control_stream_manager);
477 }
478 }
479 Response::ResetPartialGraph(resp) => {
480 self.checkpoint_control.on_reset_partial_graph_resp(worker_id, resp);
481 }
482 other @ Response::Init(_) | other @ Response::Shutdown(_) => {
483 Err(anyhow!("get expected response: {:?}", other))?;
484 }
485 }
486 };
487 if let Err(e) = result {
488 self.failure_recovery(e).await;
489 }
490 }
491 new_barrier = self.periodic_barriers.next_barrier(&*self.context) => {
492 let database_id = new_barrier.database_id;
493 if let Err(e) = self.checkpoint_control.handle_new_barrier(new_barrier, &mut self.control_stream_manager) {
494 if !self.enable_recovery {
495 panic!(
496 "failed to inject barrier to some databases but recovery not enabled: {:?}", (
497 database_id,
498 e.as_report()
499 )
500 );
501 }
502 let result: MetaResult<_> = try {
503 if !self.enable_per_database_isolation() {
504 let err = anyhow!("failed to inject barrier to databases: {:?}", (database_id, e.as_report()));
505 Err(err)?;
506 } else if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
507 warn!(%database_id, e = %e.as_report(),"database entering recovery on inject failure");
508 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(e).context("inject barrier failure").into()));
509 let output = self.completing_task.wait_completing_task().await?;
511 entering_recovery.enter(output, &mut self.control_stream_manager);
512 }
513 };
514 if let Err(e) = result {
515 self.failure_recovery(e).await;
516 }
517 }
518 }
519 }
520 self.checkpoint_control.update_barrier_nums_metrics();
521 }
522 }
523}
524
525impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
526 pub async fn clear_on_err(&mut self, err: &MetaError) {
528 let is_err = match replace(&mut self.completing_task, CompletingTask::None) {
530 CompletingTask::None => false,
531 CompletingTask::Completing {
532 epochs_to_ack,
533 join_handle,
534 ..
535 } => {
536 info!("waiting for completing command to finish in recovery");
537 match join_handle.await {
538 Err(e) => {
539 warn!(err = %e.as_report(), "failed to join completing task");
540 true
541 }
542 Ok(Err(e)) => {
543 warn!(
544 err = %e.as_report(),
545 "failed to complete barrier during clear"
546 );
547 true
548 }
549 Ok(Ok(hummock_version_stats)) => {
550 self.checkpoint_control
551 .ack_completed(BarrierCompleteOutput {
552 epochs_to_ack,
553 hummock_version_stats,
554 });
555 false
556 }
557 }
558 }
559 CompletingTask::Err(_) => true,
560 };
561 if !is_err {
562 while let Some(task) = self.checkpoint_control.next_complete_barrier_task(None) {
564 let epochs_to_ack = task.epochs_to_ack();
565 match task
566 .complete_barrier(&*self.context, self.env.clone())
567 .await
568 {
569 Ok(hummock_version_stats) => {
570 self.checkpoint_control
571 .ack_completed(BarrierCompleteOutput {
572 epochs_to_ack,
573 hummock_version_stats,
574 });
575 }
576 Err(e) => {
577 error!(
578 err = %e.as_report(),
579 "failed to complete barrier during recovery"
580 );
581 break;
582 }
583 }
584 }
585 }
586 self.checkpoint_control.clear_on_err(err);
587 }
588}
589
590impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
591 async fn failure_recovery(&mut self, err: MetaError) {
593 self.clear_on_err(&err).await;
594
595 if self.enable_recovery {
596 let span = tracing::info_span!(
597 "failure_recovery",
598 error = %err.as_report(),
599 );
600
601 crate::telemetry::report_event(
602 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
603 "failure_recovery",
604 0,
605 None,
606 None,
607 None,
608 );
609
610 let reason = RecoveryReason::Failover(err);
611
612 self.recovery(false, reason).instrument(span).await;
615 } else {
616 panic!(
617 "a streaming error occurred while recovery is disabled, aborting: {:?}",
618 err.as_report()
619 );
620 }
621 }
622
623 async fn adhoc_recovery(&mut self) {
624 let err = MetaErrorInner::AdhocRecovery.into();
625 self.clear_on_err(&err).await;
626
627 let span = tracing::info_span!(
628 "adhoc_recovery",
629 error = %err.as_report(),
630 );
631
632 crate::telemetry::report_event(
633 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
634 "adhoc_recovery",
635 0,
636 None,
637 None,
638 None,
639 );
640
641 self.recovery(false, RecoveryReason::Adhoc)
644 .instrument(span)
645 .await;
646 }
647}
648
649impl<C> GlobalBarrierWorker<C> {
650 pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
652 use risingwave_pb::meta::event_log;
654 let event = event_log::EventCollectBarrierFail {
655 error: error.to_report_string(),
656 };
657 env.event_log_manager_ref()
658 .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
659 }
660}
661
662mod retry_strategy {
663 use std::time::Duration;
664
665 use tokio_retry::strategy::{ExponentialBackoff, jitter};
666
667 const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
669 const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
671
672 pub(crate) type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
691
692 pub(crate) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
693 Box::pin(tokio::time::sleep(duration))
694 }
695
696 pub(crate) type RetryBackoffStrategy =
697 impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
698
699 #[inline(always)]
701 pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> + Send + 'static {
702 ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
703 .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
704 .map(jitter)
705 }
706
707 #[define_opaque(RetryBackoffStrategy)]
708 pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
709 get_retry_strategy().map(get_retry_backoff_future)
710 }
711}
712
713pub(crate) use retry_strategy::*;
714use risingwave_common::error::tonic::extra::{Score, ScoredError};
715use risingwave_pb::meta::event_log::{Event, EventRecovery};
716
717impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
718 pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
726 self.control_stream_manager.clear();
728
729 let reason_str = match &recovery_reason {
730 RecoveryReason::Bootstrap => "bootstrap".to_owned(),
731 RecoveryReason::Failover(err) => {
732 format!("failed over: {}", err.as_report())
733 }
734 RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
735 };
736 self.context.abort_and_mark_blocked(None, recovery_reason);
737
738 self.recovery_inner(is_paused, reason_str).await;
739 self.context.mark_ready(MarkReadyOptions::Global {
740 blocked_databases: self.checkpoint_control.recovering_databases().collect(),
741 });
742 }
743
744 #[await_tree::instrument("recovery({recovery_reason})")]
745 async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
746 let event_log_manager_ref = self.env.event_log_manager_ref();
747
748 tracing::info!("recovery start!");
749 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
750 EventRecovery::global_recovery_start(recovery_reason.clone()),
751 )]);
752
753 let retry_strategy = get_retry_strategy();
754
755 let recovery_timer = GLOBAL_META_METRICS
758 .recovery_latency
759 .with_label_values(&["global"])
760 .start_timer();
761
762 let enable_per_database_isolation = self.enable_per_database_isolation();
763
764 let new_state = tokio_retry::Retry::spawn(retry_strategy, || async {
765 self.env.stream_client_pool().invalidate_all();
766 self.context
771 .notify_creating_job_failed(None, recovery_reason.clone())
772 .await;
773
774 let runtime_info_snapshot = self
775 .context
776 .reload_runtime_info()
777 .await?;
778 runtime_info_snapshot.validate().inspect_err(|e| {
779 warn!(err = ?e.as_report(), ?runtime_info_snapshot, "reloaded runtime info failed to validate");
780 })?;
781 let BarrierWorkerRuntimeInfoSnapshot {
782 active_streaming_nodes,
783 database_job_infos,
784 mut backfill_orders,
785 mut state_table_committed_epochs,
786 mut state_table_log_epochs,
787 mut mv_depended_subscriptions,
788 stream_actors,
789 fragment_relations,
790 mut source_splits,
791 mut background_jobs,
792 hummock_version_stats,
793 database_infos,
794 mut cdc_table_snapshot_splits,
795 } = runtime_info_snapshot;
796
797 let term_id = Uuid::new_v4().to_string();
798
799
800 let mut control_stream_manager = ControlStreamManager::recover(
801 self.env.clone(),
802 active_streaming_nodes.current(),
803 &term_id,
804 self.context.clone(),
805 )
806 .await;
807
808
809 {
810 let mut collected_databases = HashMap::new();
811 let mut collecting_databases = HashMap::new();
812 let mut failed_databases = HashMap::new();
813 for (database_id, jobs) in database_job_infos {
814 let mut injected_creating_jobs = HashSet::new();
815 let database_backfill_orders = jobs.keys().map(|job_id| (*job_id, backfill_orders.remove(job_id).unwrap_or_default())).collect();
816 let result = control_stream_manager.inject_database_initial_barrier(
817 database_id,
818 jobs,
819 database_backfill_orders,
820 &mut state_table_committed_epochs,
821 &mut state_table_log_epochs,
822 &fragment_relations,
823 &stream_actors,
824 &mut source_splits,
825 &mut background_jobs,
826 &mut mv_depended_subscriptions,
827 is_paused,
828 &hummock_version_stats,
829 &mut cdc_table_snapshot_splits,
830 &mut injected_creating_jobs,
831 );
832 let node_to_collect = match result {
833 Ok(info) => {
834 info
835 }
836 Err(e) => {
837 warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
838 assert!(failed_databases.insert(database_id, injected_creating_jobs).is_none(), "non-duplicate");
839 continue;
840 }
841 };
842 if !node_to_collect.is_collected() {
843 assert!(collecting_databases.insert(database_id, node_to_collect).is_none());
844 } else {
845 warn!(%database_id, "database has no node to inject initial barrier");
846 assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
847 }
848 }
849 while !collecting_databases.is_empty() {
850 let (worker_id, result) =
851 control_stream_manager.next_response(&term_id, &self.context).await;
852 let resp = match result {
853 Err(e) => {
854 warn!(%worker_id, err = %e.as_report(), "worker node failure during recovery");
855 for (failed_database_id, collector) in collecting_databases.extract_if(|_, collector| {
856 !collector.is_valid_after_worker_err(worker_id)
857 }) {
858 warn!(%failed_database_id, %worker_id, "database failed to recovery in global recovery due to worker node err");
859 assert!(failed_databases.insert(failed_database_id, collector.creating_job_ids().collect()).is_none());
860 }
861 continue;
862 }
863 Ok(resp) => {
864 match resp {
865 Response::CompleteBarrier(resp) => {
866 resp
867 }
868 Response::ReportPartialGraphFailure(resp) => {
869 let (database_id, _) = from_partial_graph_id(resp.partial_graph_id);
870 if let Some(collector) = collecting_databases.remove(&database_id) {
871 warn!(%database_id, %worker_id, "database reset during global recovery");
872 assert!(failed_databases.insert(database_id, collector.creating_job_ids().collect()).is_none());
873 } else if let Some(database) = collected_databases.remove(&database_id) {
874 warn!(%database_id, %worker_id, "database initialized but later reset during global recovery");
875 assert!(failed_databases.insert(database_id, database.creating_streaming_job_controls.keys().copied().collect()).is_none());
876 } else {
877 assert!(failed_databases.contains_key(&database_id));
878 }
879 continue;
880 }
881 other @ (Response::Init(_) | Response::Shutdown(_) | Response::ResetPartialGraph(_)) => {
882 return Err(anyhow!("get unexpected resp {:?}", other).into());
883 }
884 }
885 }
886 };
887 assert_eq!(worker_id, resp.worker_id);
888 let (database_id, _) = from_partial_graph_id(resp.partial_graph_id);
889 if failed_databases.contains_key(&database_id) {
890 assert!(!collecting_databases.contains_key(&database_id));
891 continue;
893 }
894 let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
895 unreachable!("should exist")
896 };
897 let node_to_collect = entry.get_mut();
898 node_to_collect.collect_resp(resp);
899 if node_to_collect.is_collected() {
900 let node_to_collect = entry.remove();
901 assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
902 }
903 }
904 debug!("collected initial barrier");
905 if !stream_actors.is_empty() {
906 warn!(actor_ids = ?stream_actors.keys().collect_vec(), "unused stream actors in recovery");
907 }
908 if !source_splits.is_empty() {
909 warn!(actor_ids = ?source_splits.keys().collect_vec(), "unused actor source splits in recovery");
910 }
911 if !background_jobs.is_empty() {
912 warn!(job_ids = ?background_jobs.iter().collect_vec(), "unused recovered background mview in recovery");
913 }
914 if !mv_depended_subscriptions.is_empty() {
915 warn!(?mv_depended_subscriptions, "unused subscription infos in recovery");
916 }
917 if !state_table_committed_epochs.is_empty() {
918 warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
919 }
920 if !enable_per_database_isolation && !failed_databases.is_empty() {
921 return Err(anyhow!(
922 "global recovery failed due to failure of databases {:?}",
923 failed_databases.keys().collect_vec()).into()
924 );
925 }
926 let checkpoint_control = CheckpointControl::recover(
927 collected_databases,
928 failed_databases,
929 &mut control_stream_manager,
930 hummock_version_stats,
931 self.env.clone(),
932 );
933
934 let reader = self.env.system_params_reader().await;
935 let checkpoint_frequency = reader.checkpoint_frequency();
936 let barrier_interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
937 let periodic_barriers = PeriodicBarriers::new(
938 barrier_interval,
939 checkpoint_frequency,
940 database_infos,
941 );
942
943 Ok((
944 active_streaming_nodes,
945 control_stream_manager,
946 checkpoint_control,
947 term_id,
948 periodic_barriers,
949 ))
950 }
951 }.inspect_err(|err: &MetaError| {
952 tracing::error!(error = %err.as_report(), "recovery failed");
953 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
954 EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
955 )]);
956 GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
957 }))
958 .instrument(tracing::info_span!("recovery_attempt"))
959 .await
960 .expect("Retry until recovery success.");
961
962 let duration = recovery_timer.stop_and_record();
963
964 (
965 self.active_streaming_nodes,
966 self.control_stream_manager,
967 self.checkpoint_control,
968 self.term_id,
969 self.periodic_barriers,
970 ) = new_state;
971
972 tracing::info!("recovery success");
973
974 let recovering_databases = self
975 .checkpoint_control
976 .recovering_databases()
977 .map(|database| database.as_raw_id())
978 .collect_vec();
979 let running_databases = self
980 .checkpoint_control
981 .running_databases()
982 .map(|database| database.as_raw_id())
983 .collect_vec();
984
985 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
986 EventRecovery::global_recovery_success(
987 recovery_reason.clone(),
988 duration as f32,
989 running_databases,
990 recovering_databases,
991 ),
992 )]);
993
994 self.env
995 .notification_manager()
996 .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
997 self.env
998 .notification_manager()
999 .notify_compute_without_version(Operation::Update, Info::Recovery(Recovery {}));
1000 }
1001}