1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::anyhow;
22use arc_swap::ArcSwap;
23use futures::TryFutureExt;
24use itertools::Itertools;
25use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_pb::meta::Recovery;
28use risingwave_pb::meta::subscribe_response::{Info, Operation};
29use risingwave_pb::stream_service::streaming_control_stream_response::Response;
30use thiserror_ext::AsReport;
31use tokio::sync::mpsc;
32use tokio::sync::oneshot::{Receiver, Sender};
33use tokio::task::JoinHandle;
34use tonic::Status;
35use tracing::{Instrument, debug, error, info, warn};
36use uuid::Uuid;
37
38use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
39use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
40use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
41use crate::barrier::rpc::{ControlStreamManager, WorkerNodeEvent, merge_node_rpc_errors};
42use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
43use crate::barrier::{
44 BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, RecoveryReason,
45 schedule,
46};
47use crate::error::MetaErrorInner;
48use crate::hummock::HummockManagerRef;
49use crate::manager::sink_coordination::SinkCoordinatorManager;
50use crate::manager::{
51 ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
52 MetadataManager,
53};
54use crate::rpc::metrics::GLOBAL_META_METRICS;
55use crate::stream::{GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef};
56use crate::{MetaError, MetaResult};
57
58pub(super) struct GlobalBarrierWorker<C> {
68 enable_recovery: bool,
70
71 periodic_barriers: PeriodicBarriers,
73
74 system_enable_per_database_isolation: bool,
76
77 pub(super) context: Arc<C>,
78
79 env: MetaSrvEnv,
80
81 checkpoint_control: CheckpointControl,
82
83 completing_task: CompletingTask,
86
87 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
88
89 active_streaming_nodes: ActiveStreamingWorkerNodes,
90
91 control_stream_manager: ControlStreamManager,
92
93 term_id: String,
94}
95
96impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
97 pub(super) async fn new_inner(
98 env: MetaSrvEnv,
99 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
100 context: Arc<C>,
101 ) -> Self {
102 let enable_recovery = env.opts.enable_recovery;
103
104 let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized();
105
106 let control_stream_manager = ControlStreamManager::new(env.clone());
107
108 let reader = env.system_params_reader().await;
109 let system_enable_per_database_isolation = reader.per_database_isolation();
110 let periodic_barriers = PeriodicBarriers::default();
112
113 let checkpoint_control = CheckpointControl::new(env.clone());
114 Self {
115 enable_recovery,
116 periodic_barriers,
117 system_enable_per_database_isolation,
118 context,
119 env,
120 checkpoint_control,
121 completing_task: CompletingTask::None,
122 request_rx,
123 active_streaming_nodes,
124 control_stream_manager,
125 term_id: "uninitialized".into(),
126 }
127 }
128}
129
130impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
131 pub async fn new(
133 scheduled_barriers: schedule::ScheduledBarriers,
134 env: MetaSrvEnv,
135 metadata_manager: MetadataManager,
136 hummock_manager: HummockManagerRef,
137 source_manager: SourceManagerRef,
138 sink_manager: SinkCoordinatorManager,
139 scale_controller: ScaleControllerRef,
140 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
141 barrier_scheduler: schedule::BarrierScheduler,
142 refresh_manager: GlobalRefreshManagerRef,
143 ) -> Self {
144 let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
145
146 let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
147 scheduled_barriers,
148 status,
149 metadata_manager,
150 hummock_manager,
151 source_manager,
152 scale_controller,
153 env.clone(),
154 barrier_scheduler,
155 refresh_manager,
156 sink_manager,
157 ));
158
159 Self::new_inner(env, request_rx, context).await
160 }
161
162 pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
163 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
164 let fut = (self.env.await_tree_reg())
165 .register_derived_root("Global Barrier Worker")
166 .instrument(self.run(shutdown_rx));
167 let join_handle = tokio::spawn(fut);
168
169 (join_handle, shutdown_tx)
170 }
171
172 async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
174 let paused = self
175 .env
176 .system_params_reader()
177 .await
178 .pause_on_next_bootstrap()
179 || self.env.opts.pause_on_next_bootstrap_offline;
180
181 if paused {
182 warn!(
183 "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
184 It will now be reset to `false`. \
185 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
186 PAUSE_ON_NEXT_BOOTSTRAP_KEY
187 );
188 self.env
189 .system_params_manager_impl_ref()
190 .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
191 .await?;
192 }
193 Ok(paused)
194 }
195
196 async fn run(mut self, shutdown_rx: Receiver<()>) {
198 tracing::info!(
199 "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
200 self.enable_recovery,
201 self.checkpoint_control.in_flight_barrier_nums,
202 );
203
204 if !self.enable_recovery {
205 let job_exist = self
206 .context
207 .metadata_manager
208 .catalog_controller
209 .has_any_streaming_jobs()
210 .await
211 .unwrap();
212 if job_exist {
213 panic!(
214 "Some streaming jobs already exist in meta, please start with recovery enabled \
215 or clean up the metadata using `./risedev clean-data`"
216 );
217 }
218 }
219
220 {
221 let span = tracing::info_span!("bootstrap_recovery");
226 crate::telemetry::report_event(
227 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
228 "normal_recovery",
229 0,
230 None,
231 None,
232 None,
233 );
234
235 let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
236
237 self.recovery(paused, RecoveryReason::Bootstrap)
238 .instrument(span)
239 .await;
240 }
241
242 self.run_inner(shutdown_rx).await
243 }
244}
245
246impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
247 fn enable_per_database_isolation(&self) -> bool {
248 self.system_enable_per_database_isolation && {
249 if let Err(e) =
250 risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()
251 {
252 warn!(error = %e.as_report(), "DatabaseFailureIsolation disabled by license");
253 false
254 } else {
255 true
256 }
257 }
258 }
259
260 pub(super) async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
261 let (local_notification_tx, mut local_notification_rx) =
262 tokio::sync::mpsc::unbounded_channel();
263 self.env
264 .notification_manager()
265 .insert_local_sender(local_notification_tx);
266
267 loop {
269 tokio::select! {
270 biased;
271
272 _ = &mut shutdown_rx => {
274 tracing::info!("Barrier manager is stopped");
275 break;
276 }
277
278 request = self.request_rx.recv() => {
279 if let Some(request) = request {
280 match request {
281 BarrierManagerRequest::GetDdlProgress(result_tx) => {
282 let progress = self.checkpoint_control.gen_ddl_progress();
283 if result_tx.send(progress).is_err() {
284 error!("failed to send get ddl progress");
285 }
286 }
287 BarrierManagerRequest::GetCdcProgress(result_tx) => {
288 let progress = self.checkpoint_control.gen_cdc_progress();
289 if result_tx.send(progress).is_err() {
290 error!("failed to send get ddl progress");
291 }
292 }
293 BarrierManagerRequest::AdhocRecovery(sender) => {
295 self.adhoc_recovery().await;
296 if sender.send(()).is_err() {
297 warn!("failed to notify finish of adhoc recovery");
298 }
299 }
300 BarrierManagerRequest::UpdateDatabaseBarrier {
301 database_id,
302 barrier_interval_ms,
303 checkpoint_frequency,
304 sender,
305 } => {
306 self.periodic_barriers
307 .update_database_barrier(
308 database_id,
309 barrier_interval_ms,
310 checkpoint_frequency,
311 );
312 if sender.send(()).is_err() {
313 warn!("failed to notify finish of update database barrier");
314 }
315 }
316 }
317 } else {
318 tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
319 return;
320 }
321 }
322
323 changed_worker = self.active_streaming_nodes.changed() => {
324 #[cfg(debug_assertions)]
325 {
326 self.active_streaming_nodes.validate_change().await;
327 }
328
329 info!(?changed_worker, "worker changed");
330
331 match changed_worker {
332 ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) => {
333 self.control_stream_manager.add_worker(node, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
334 }
335 ActiveStreamingWorkerChange::Remove(node) => {
336 self.control_stream_manager.remove_worker(node);
337 }
338 }
339 }
340
341 notification = local_notification_rx.recv() => {
342 let notification = notification.unwrap();
343 if let LocalNotification::SystemParamsChange(p) = notification {
344 {
345 self.periodic_barriers.set_sys_barrier_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
346 self.periodic_barriers
347 .set_sys_checkpoint_frequency(p.checkpoint_frequency());
348 self.system_enable_per_database_isolation = p.per_database_isolation();
349 }
350 }
351 }
352 complete_result = self
353 .completing_task
354 .next_completed_barrier(
355 &mut self.periodic_barriers,
356 &mut self.checkpoint_control,
357 &mut self.control_stream_manager,
358 &self.context,
359 &self.env,
360 ) => {
361 match complete_result {
362 Ok(output) => {
363 self.checkpoint_control.ack_completed(output);
364 }
365 Err(e) => {
366 self.failure_recovery(e).await;
367 }
368 }
369 },
370 event = self.checkpoint_control.next_event() => {
371 let result: MetaResult<()> = try {
372 match event {
373 CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
374 let database_id = entering_initializing.database_id();
375 let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
376 resp.root_err.as_ref().map(|root_err| {
377 (*worker_id, ScoredError {
378 error: Status::internal(&root_err.err_msg),
379 score: Score(root_err.score)
380 })
381 })
382 }));
383 Self::report_collect_failure(&self.env, &error);
384 self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
385 match self.context.reload_database_runtime_info(database_id).await.and_then(|runtime_info| {
386 runtime_info.map(|runtime_info| {
387 runtime_info.validate(database_id, &self.active_streaming_nodes).inspect_err(|e| {
388 warn!(%database_id, err = ?e.as_report(), ?runtime_info, "reloaded database runtime info failed to validate");
389 })?;
390 Ok(runtime_info)
391 })
392 .transpose()
393 }) {
394 Ok(Some(runtime_info)) => {
395 entering_initializing.enter(runtime_info, &mut self.control_stream_manager);
396 }
397 Ok(None) => {
398 info!(%database_id, "database removed after reloading empty runtime info");
399 self.context.mark_ready(MarkReadyOptions::Database(database_id));
401 entering_initializing.remove();
402 }
403 Err(e) => {
404 entering_initializing.fail_reload_runtime_info(e);
405 }
406 }
407 }
408 CheckpointControlEvent::EnteringRunning(entering_running) => {
409 self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
410 entering_running.enter();
411 }
412 }
413 };
414 if let Err(e) = result {
415 self.failure_recovery(e).await;
416 }
417 }
418 (worker_id, event) = self.control_stream_manager.next_event(&self.term_id, &self.context) => {
419 let resp_result = match event {
420 WorkerNodeEvent::Response(result) => {
421 result
422 }
423 WorkerNodeEvent::Connected(connected) => {
424 connected.initialize(self.checkpoint_control.inflight_infos());
425 continue;
426 }
427 };
428 let result: MetaResult<()> = try {
429 let resp = match resp_result {
430 Err(err) => {
431 let failed_databases = self.checkpoint_control.databases_failed_at_worker_err(worker_id);
432 if !failed_databases.is_empty() {
433 if !self.enable_recovery {
434 panic!("control stream to worker {} failed but recovery not enabled: {:?}", worker_id, err.as_report());
435 }
436 if !self.enable_per_database_isolation() {
437 Err(err.clone())?;
438 }
439 Self::report_collect_failure(&self.env, &err);
440 for database_id in failed_databases {
441 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
442 warn!(%worker_id, %database_id, "database entering recovery on node failure");
443 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
444 self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
445 let output = self.completing_task.wait_completing_task().await?;
447 entering_recovery.enter(output, &mut self.control_stream_manager);
448 }
449 }
450 } else {
451 warn!(%worker_id, "no barrier to collect from worker, ignore err");
452 }
453 continue;
454 }
455 Ok(resp) => resp,
456 };
457 match resp {
458 Response::CompleteBarrier(resp) => {
459 self.checkpoint_control.barrier_collected(resp, &mut self.periodic_barriers)?;
460 },
461 Response::ReportDatabaseFailure(resp) => {
462 if !self.enable_recovery {
463 panic!("database failure reported but recovery not enabled: {:?}", resp)
464 }
465 if !self.enable_per_database_isolation() {
466 Err(anyhow!("database {} reset", resp.database_id))?;
467 }
468 let database_id = resp.database_id;
469 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
470 warn!(%database_id, "database entering recovery");
471 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
472 let output = self.completing_task.wait_completing_task().await?;
474 entering_recovery.enter(output, &mut self.control_stream_manager);
475 }
476 }
477 Response::ResetDatabase(resp) => {
478 self.checkpoint_control.on_reset_database_resp(worker_id, resp);
479 }
480 other @ Response::Init(_) | other @ Response::Shutdown(_) => {
481 Err(anyhow!("get expected response: {:?}", other))?;
482 }
483 }
484 };
485 if let Err(e) = result {
486 self.failure_recovery(e).await;
487 }
488 }
489 new_barrier = self.periodic_barriers.next_barrier(&*self.context) => {
490 let database_id = new_barrier.database_id;
491 if let Err(e) = self.checkpoint_control.handle_new_barrier(new_barrier, &mut self.control_stream_manager) {
492 if !self.enable_recovery {
493 panic!(
494 "failed to inject barrier to some databases but recovery not enabled: {:?}", (
495 database_id,
496 e.as_report()
497 )
498 );
499 }
500 let result: MetaResult<_> = try {
501 if !self.enable_per_database_isolation() {
502 let err = anyhow!("failed to inject barrier to databases: {:?}", (database_id, e.as_report()));
503 Err(err)?;
504 } else if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
505 warn!(%database_id, e = %e.as_report(),"database entering recovery on inject failure");
506 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(e).context("inject barrier failure").into()));
507 let output = self.completing_task.wait_completing_task().await?;
509 entering_recovery.enter(output, &mut self.control_stream_manager);
510 }
511 };
512 if let Err(e) = result {
513 self.failure_recovery(e).await;
514 }
515 }
516 }
517 }
518 self.checkpoint_control.update_barrier_nums_metrics();
519 }
520 }
521}
522
523impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
524 pub async fn clear_on_err(&mut self, err: &MetaError) {
526 let is_err = match replace(&mut self.completing_task, CompletingTask::None) {
528 CompletingTask::None => false,
529 CompletingTask::Completing {
530 epochs_to_ack,
531 join_handle,
532 ..
533 } => {
534 info!("waiting for completing command to finish in recovery");
535 match join_handle.await {
536 Err(e) => {
537 warn!(err = %e.as_report(), "failed to join completing task");
538 true
539 }
540 Ok(Err(e)) => {
541 warn!(
542 err = %e.as_report(),
543 "failed to complete barrier during clear"
544 );
545 true
546 }
547 Ok(Ok(hummock_version_stats)) => {
548 self.checkpoint_control
549 .ack_completed(BarrierCompleteOutput {
550 epochs_to_ack,
551 hummock_version_stats,
552 });
553 false
554 }
555 }
556 }
557 CompletingTask::Err(_) => true,
558 };
559 if !is_err {
560 while let Some(task) = self.checkpoint_control.next_complete_barrier_task(None) {
562 let epochs_to_ack = task.epochs_to_ack();
563 match task
564 .complete_barrier(&*self.context, self.env.clone())
565 .await
566 {
567 Ok(hummock_version_stats) => {
568 self.checkpoint_control
569 .ack_completed(BarrierCompleteOutput {
570 epochs_to_ack,
571 hummock_version_stats,
572 });
573 }
574 Err(e) => {
575 error!(
576 err = %e.as_report(),
577 "failed to complete barrier during recovery"
578 );
579 break;
580 }
581 }
582 }
583 }
584 self.checkpoint_control.clear_on_err(err);
585 }
586}
587
588impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
589 async fn failure_recovery(&mut self, err: MetaError) {
591 self.clear_on_err(&err).await;
592
593 if self.enable_recovery {
594 let span = tracing::info_span!(
595 "failure_recovery",
596 error = %err.as_report(),
597 );
598
599 crate::telemetry::report_event(
600 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
601 "failure_recovery",
602 0,
603 None,
604 None,
605 None,
606 );
607
608 let reason = RecoveryReason::Failover(err);
609
610 self.recovery(false, reason).instrument(span).await;
613 } else {
614 panic!(
615 "a streaming error occurred while recovery is disabled, aborting: {:?}",
616 err.as_report()
617 );
618 }
619 }
620
621 async fn adhoc_recovery(&mut self) {
622 let err = MetaErrorInner::AdhocRecovery.into();
623 self.clear_on_err(&err).await;
624
625 let span = tracing::info_span!(
626 "adhoc_recovery",
627 error = %err.as_report(),
628 );
629
630 crate::telemetry::report_event(
631 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
632 "adhoc_recovery",
633 0,
634 None,
635 None,
636 None,
637 );
638
639 self.recovery(false, RecoveryReason::Adhoc)
642 .instrument(span)
643 .await;
644 }
645}
646
647impl<C> GlobalBarrierWorker<C> {
648 pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
650 use risingwave_pb::meta::event_log;
652 let event = event_log::EventCollectBarrierFail {
653 error: error.to_report_string(),
654 };
655 env.event_log_manager_ref()
656 .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
657 }
658}
659
660mod retry_strategy {
661 use std::time::Duration;
662
663 use tokio_retry::strategy::{ExponentialBackoff, jitter};
664
665 const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
667 const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
669
670 pub(crate) type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
689
690 pub(crate) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
691 Box::pin(tokio::time::sleep(duration))
692 }
693
694 pub(crate) type RetryBackoffStrategy =
695 impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
696
697 #[inline(always)]
699 pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> + Send + 'static {
700 ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
701 .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
702 .map(jitter)
703 }
704
705 #[define_opaque(RetryBackoffStrategy)]
706 pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
707 get_retry_strategy().map(get_retry_backoff_future)
708 }
709}
710
711pub(crate) use retry_strategy::*;
712use risingwave_common::error::tonic::extra::{Score, ScoredError};
713use risingwave_pb::meta::event_log::{Event, EventRecovery};
714
715use crate::barrier::edge_builder::FragmentEdgeBuilder;
716
717impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
718 pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
726 self.control_stream_manager.clear();
728
729 let reason_str = match &recovery_reason {
730 RecoveryReason::Bootstrap => "bootstrap".to_owned(),
731 RecoveryReason::Failover(err) => {
732 format!("failed over: {}", err.as_report())
733 }
734 RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
735 };
736 self.context.abort_and_mark_blocked(None, recovery_reason);
737
738 self.recovery_inner(is_paused, reason_str).await;
739 self.context.mark_ready(MarkReadyOptions::Global {
740 blocked_databases: self.checkpoint_control.recovering_databases().collect(),
741 });
742 }
743
744 #[await_tree::instrument("recovery({recovery_reason})")]
745 async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
746 let event_log_manager_ref = self.env.event_log_manager_ref();
747
748 tracing::info!("recovery start!");
749 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
750 EventRecovery::global_recovery_start(recovery_reason.clone()),
751 )]);
752
753 let retry_strategy = get_retry_strategy();
754
755 let recovery_timer = GLOBAL_META_METRICS
758 .recovery_latency
759 .with_label_values(&["global"])
760 .start_timer();
761
762 let enable_per_database_isolation = self.enable_per_database_isolation();
763
764 let new_state = tokio_retry::Retry::spawn(retry_strategy, || async {
765 self.env.stream_client_pool().invalidate_all();
766 self.context
771 .notify_creating_job_failed(None, recovery_reason.clone())
772 .await;
773
774 let runtime_info_snapshot = self
775 .context
776 .reload_runtime_info()
777 .await?;
778 runtime_info_snapshot.validate().inspect_err(|e| {
779 warn!(err = ?e.as_report(), ?runtime_info_snapshot, "reloaded runtime info failed to validate");
780 })?;
781 let BarrierWorkerRuntimeInfoSnapshot {
782 active_streaming_nodes,
783 database_job_infos,
784 mut state_table_committed_epochs,
785 mut state_table_log_epochs,
786 mut mv_depended_subscriptions,
787 stream_actors,
788 fragment_relations,
789 mut source_splits,
790 mut background_jobs,
791 hummock_version_stats,
792 database_infos,
793 mut cdc_table_snapshot_splits,
794 } = runtime_info_snapshot;
795
796 let term_id = Uuid::new_v4().to_string();
797
798
799 let mut control_stream_manager = ControlStreamManager::recover(
800 self.env.clone(),
801 active_streaming_nodes.current(),
802 &term_id,
803 self.context.clone(),
804 )
805 .await;
806
807
808 {
809 let mut builder = FragmentEdgeBuilder::new(database_job_infos.values().flat_map(|jobs| jobs.values().flat_map(|fragments| fragments.values())), &control_stream_manager);
810 builder.add_relations(&fragment_relations);
811 let mut edges = builder.build();
812
813 let mut collected_databases = HashMap::new();
814 let mut collecting_databases = HashMap::new();
815 let mut failed_databases = HashSet::new();
816 for (database_id, jobs) in database_job_infos {
817 let result = control_stream_manager.inject_database_initial_barrier(
818 database_id,
819 jobs,
820 &mut state_table_committed_epochs,
821 &mut state_table_log_epochs,
822 &mut edges,
823 &stream_actors,
824 &mut source_splits,
825 &mut background_jobs,
826 &mut mv_depended_subscriptions,
827 is_paused,
828 &hummock_version_stats,
829 &mut cdc_table_snapshot_splits,
830 );
831 let node_to_collect = match result {
832 Ok(info) => {
833 info
834 }
835 Err(e) => {
836 warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
837 assert!(failed_databases.insert(database_id), "non-duplicate");
838 continue;
839 }
840 };
841 if !node_to_collect.is_collected() {
842 assert!(collecting_databases.insert(database_id, node_to_collect).is_none());
843 } else {
844 warn!(%database_id, "database has no node to inject initial barrier");
845 assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
846 }
847 }
848 while !collecting_databases.is_empty() {
849 let (worker_id, result) =
850 control_stream_manager.next_response(&term_id, &self.context).await;
851 let resp = match result {
852 Err(e) => {
853 warn!(%worker_id, err = %e.as_report(), "worker node failure during recovery");
854 for (failed_database_id, _) in collecting_databases.extract_if(|_, node_to_collect| {
855 !node_to_collect.is_valid_after_worker_err(worker_id)
856 }) {
857 warn!(%failed_database_id, %worker_id, "database failed to recovery in global recovery due to worker node err");
858 assert!(failed_databases.insert(failed_database_id));
859 }
860 continue;
861 }
862 Ok(resp) => {
863 match resp {
864 Response::CompleteBarrier(resp) => {
865 resp
866 }
867 Response::ReportDatabaseFailure(resp) => {
868 let database_id = resp.database_id;
869 if collecting_databases.remove(&database_id).is_some() {
870 warn!(%database_id, %worker_id, "database reset during global recovery");
871 assert!(failed_databases.insert(database_id));
872 } else if collected_databases.remove(&database_id).is_some() {
873 warn!(%database_id, %worker_id, "database initialized but later reset during global recovery");
874 assert!(failed_databases.insert(database_id));
875 } else {
876 assert!(failed_databases.contains(&database_id));
877 }
878 continue;
879 }
880 other @ (Response::Init(_) | Response::Shutdown(_) | Response::ResetDatabase(_)) => {
881 return Err(anyhow!("get unexpected resp {:?}", other).into());
882 }
883 }
884 }
885 };
886 assert_eq!(worker_id, resp.worker_id);
887 let database_id = resp.database_id;
888 if failed_databases.contains(&database_id) {
889 assert!(!collecting_databases.contains_key(&database_id));
890 continue;
892 }
893 let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
894 unreachable!("should exist")
895 };
896 let node_to_collect = entry.get_mut();
897 node_to_collect.collect_resp(resp);
898 if node_to_collect.is_collected() {
899 let node_to_collect = entry.remove();
900 assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
901 }
902 }
903 debug!("collected initial barrier");
904 if !stream_actors.is_empty() {
905 warn!(actor_ids = ?stream_actors.keys().collect_vec(), "unused stream actors in recovery");
906 }
907 if !source_splits.is_empty() {
908 warn!(actor_ids = ?source_splits.keys().collect_vec(), "unused actor source splits in recovery");
909 }
910 if !background_jobs.is_empty() {
911 warn!(job_ids = ?background_jobs.keys().collect_vec(), "unused recovered background mview in recovery");
912 }
913 if !mv_depended_subscriptions.is_empty() {
914 warn!(?mv_depended_subscriptions, "unused subscription infos in recovery");
915 }
916 if !state_table_committed_epochs.is_empty() {
917 warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
918 }
919 if !enable_per_database_isolation && !failed_databases.is_empty() {
920 return Err(anyhow!(
921 "global recovery failed due to failure of databases {:?}",
922 failed_databases.iter().map(|database_id| database_id.as_raw_id()).collect_vec()).into()
923 );
924 }
925 let checkpoint_control = CheckpointControl::recover(
926 collected_databases,
927 failed_databases,
928 &mut control_stream_manager,
929 hummock_version_stats,
930 self.env.clone(),
931 );
932
933 let reader = self.env.system_params_reader().await;
934 let checkpoint_frequency = reader.checkpoint_frequency();
935 let barrier_interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
936 let periodic_barriers = PeriodicBarriers::new(
937 barrier_interval,
938 checkpoint_frequency,
939 database_infos,
940 );
941
942 Ok((
943 active_streaming_nodes,
944 control_stream_manager,
945 checkpoint_control,
946 term_id,
947 periodic_barriers,
948 ))
949 }
950 }.inspect_err(|err: &MetaError| {
951 tracing::error!(error = %err.as_report(), "recovery failed");
952 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
953 EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
954 )]);
955 GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
956 }))
957 .instrument(tracing::info_span!("recovery_attempt"))
958 .await
959 .expect("Retry until recovery success.");
960
961 let duration = recovery_timer.stop_and_record();
962
963 (
964 self.active_streaming_nodes,
965 self.control_stream_manager,
966 self.checkpoint_control,
967 self.term_id,
968 self.periodic_barriers,
969 ) = new_state;
970
971 tracing::info!("recovery success");
972
973 let recovering_databases = self
974 .checkpoint_control
975 .recovering_databases()
976 .map(|database| database.as_raw_id())
977 .collect_vec();
978 let running_databases = self
979 .checkpoint_control
980 .running_databases()
981 .map(|database| database.as_raw_id())
982 .collect_vec();
983
984 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
985 EventRecovery::global_recovery_success(
986 recovery_reason.clone(),
987 duration as f32,
988 running_databases,
989 recovering_databases,
990 ),
991 )]);
992
993 self.env
994 .notification_manager()
995 .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
996 self.env
997 .notification_manager()
998 .notify_compute_without_version(Operation::Update, Info::Recovery(Recovery {}));
999 }
1000}