1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::anyhow;
22use arc_swap::ArcSwap;
23use futures::TryFutureExt;
24use itertools::Itertools;
25use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_pb::meta::Recovery;
28use risingwave_pb::meta::subscribe_response::{Info, Operation};
29use risingwave_pb::stream_service::streaming_control_stream_response::Response;
30use thiserror_ext::AsReport;
31use tokio::sync::mpsc;
32use tokio::sync::oneshot::{Receiver, Sender};
33use tokio::task::JoinHandle;
34use tonic::Status;
35use tracing::{Instrument, debug, error, info, warn};
36use uuid::Uuid;
37
38use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
39use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
40use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
41use crate::barrier::rpc::{ControlStreamManager, WorkerNodeEvent, merge_node_rpc_errors};
42use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
43use crate::barrier::{
44 BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, RecoveryReason,
45 schedule,
46};
47use crate::error::MetaErrorInner;
48use crate::hummock::HummockManagerRef;
49use crate::manager::sink_coordination::SinkCoordinatorManager;
50use crate::manager::{
51 ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
52 MetadataManager,
53};
54use crate::rpc::metrics::GLOBAL_META_METRICS;
55use crate::stream::{ScaleControllerRef, SourceManagerRef};
56use crate::{MetaError, MetaResult};
57
58pub(super) struct GlobalBarrierWorker<C> {
68 enable_recovery: bool,
70
71 periodic_barriers: PeriodicBarriers,
73
74 system_enable_per_database_isolation: bool,
76
77 pub(super) context: Arc<C>,
78
79 env: MetaSrvEnv,
80
81 checkpoint_control: CheckpointControl,
82
83 completing_task: CompletingTask,
86
87 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
88
89 active_streaming_nodes: ActiveStreamingWorkerNodes,
90
91 sink_manager: SinkCoordinatorManager,
92
93 control_stream_manager: ControlStreamManager,
94
95 term_id: String,
96}
97
98impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
99 pub(super) async fn new_inner(
100 env: MetaSrvEnv,
101 sink_manager: SinkCoordinatorManager,
102 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
103 context: Arc<C>,
104 ) -> Self {
105 let enable_recovery = env.opts.enable_recovery;
106
107 let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized();
108
109 let control_stream_manager = ControlStreamManager::new(env.clone());
110
111 let reader = env.system_params_reader().await;
112 let system_enable_per_database_isolation = reader.per_database_isolation();
113 let periodic_barriers = PeriodicBarriers::default();
115
116 let checkpoint_control = CheckpointControl::new(env.clone());
117 Self {
118 enable_recovery,
119 periodic_barriers,
120 system_enable_per_database_isolation,
121 context,
122 env,
123 checkpoint_control,
124 completing_task: CompletingTask::None,
125 request_rx,
126 active_streaming_nodes,
127 sink_manager,
128 control_stream_manager,
129 term_id: "uninitialized".into(),
130 }
131 }
132}
133
134impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
135 pub async fn new(
137 scheduled_barriers: schedule::ScheduledBarriers,
138 env: MetaSrvEnv,
139 metadata_manager: MetadataManager,
140 hummock_manager: HummockManagerRef,
141 source_manager: SourceManagerRef,
142 sink_manager: SinkCoordinatorManager,
143 scale_controller: ScaleControllerRef,
144 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
145 barrier_scheduler: schedule::BarrierScheduler,
146 ) -> Self {
147 let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
148
149 let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
150 scheduled_barriers,
151 status,
152 metadata_manager,
153 hummock_manager,
154 source_manager,
155 scale_controller,
156 env.clone(),
157 barrier_scheduler,
158 ));
159
160 Self::new_inner(env, sink_manager, request_rx, context).await
161 }
162
163 pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
164 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
165 let fut = (self.env.await_tree_reg())
166 .register_derived_root("Global Barrier Worker")
167 .instrument(self.run(shutdown_rx));
168 let join_handle = tokio::spawn(fut);
169
170 (join_handle, shutdown_tx)
171 }
172
173 async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
175 let paused = self
176 .env
177 .system_params_reader()
178 .await
179 .pause_on_next_bootstrap();
180 if paused {
181 warn!(
182 "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
183 It will now be reset to `false`. \
184 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
185 PAUSE_ON_NEXT_BOOTSTRAP_KEY
186 );
187 self.env
188 .system_params_manager_impl_ref()
189 .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
190 .await?;
191 }
192 Ok(paused)
193 }
194
195 async fn run(mut self, shutdown_rx: Receiver<()>) {
197 tracing::info!(
198 "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
199 self.enable_recovery,
200 self.checkpoint_control.in_flight_barrier_nums,
201 );
202
203 if !self.enable_recovery {
204 let job_exist = self
205 .context
206 .metadata_manager
207 .catalog_controller
208 .has_any_streaming_jobs()
209 .await
210 .unwrap();
211 if job_exist {
212 panic!(
213 "Some streaming jobs already exist in meta, please start with recovery enabled \
214 or clean up the metadata using `./risedev clean-data`"
215 );
216 }
217 }
218
219 {
220 let span = tracing::info_span!("bootstrap_recovery");
225 crate::telemetry::report_event(
226 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
227 "normal_recovery",
228 0,
229 None,
230 None,
231 None,
232 );
233
234 let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
235
236 self.recovery(paused, RecoveryReason::Bootstrap)
237 .instrument(span)
238 .await;
239 }
240
241 self.run_inner(shutdown_rx).await
242 }
243}
244
245impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
246 fn enable_per_database_isolation(&self) -> bool {
247 self.system_enable_per_database_isolation && {
248 if let Err(e) =
249 risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()
250 {
251 warn!(error = %e.as_report(), "DatabaseFailureIsolation disabled by license");
252 false
253 } else {
254 true
255 }
256 }
257 }
258
259 pub(super) async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
260 let (local_notification_tx, mut local_notification_rx) =
261 tokio::sync::mpsc::unbounded_channel();
262 self.env
263 .notification_manager()
264 .insert_local_sender(local_notification_tx);
265
266 loop {
268 tokio::select! {
269 biased;
270
271 _ = &mut shutdown_rx => {
273 tracing::info!("Barrier manager is stopped");
274 break;
275 }
276
277 request = self.request_rx.recv() => {
278 if let Some(request) = request {
279 match request {
280 BarrierManagerRequest::GetDdlProgress(result_tx) => {
281 let progress = self.checkpoint_control.gen_ddl_progress();
282 if result_tx.send(progress).is_err() {
283 error!("failed to send get ddl progress");
284 }
285 }BarrierManagerRequest::AdhocRecovery(sender) => {
287 self.adhoc_recovery().await;
288 if sender.send(()).is_err() {
289 warn!("failed to notify finish of adhoc recovery");
290 }
291 }
292 BarrierManagerRequest::UpdateDatabaseBarrier {
293 database_id,
294 barrier_interval_ms,
295 checkpoint_frequency,
296 sender,
297 } => {
298 self.periodic_barriers
299 .update_database_barrier(
300 database_id,
301 barrier_interval_ms,
302 checkpoint_frequency,
303 );
304 if sender.send(()).is_err() {
305 warn!("failed to notify finish of update database barrier");
306 }
307 }
308 }
309 } else {
310 tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
311 return;
312 }
313 }
314
315 changed_worker = self.active_streaming_nodes.changed() => {
316 #[cfg(debug_assertions)]
317 {
318 self.active_streaming_nodes.validate_change().await;
319 }
320
321 info!(?changed_worker, "worker changed");
322
323 match changed_worker {
324 ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) => {
325 self.control_stream_manager.add_worker(node, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
326 }
327 ActiveStreamingWorkerChange::Remove(node) => {
328 self.control_stream_manager.remove_worker(node);
329 }
330 }
331 }
332
333 notification = local_notification_rx.recv() => {
334 let notification = notification.unwrap();
335 if let LocalNotification::SystemParamsChange(p) = notification {
336 {
337 self.periodic_barriers.set_sys_barrier_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
338 self.periodic_barriers
339 .set_sys_checkpoint_frequency(p.checkpoint_frequency());
340 self.system_enable_per_database_isolation = p.per_database_isolation();
341 }
342 }
343 }
344 complete_result = self
345 .completing_task
346 .next_completed_barrier(
347 &mut self.periodic_barriers,
348 &mut self.checkpoint_control,
349 &mut self.control_stream_manager,
350 &self.context,
351 &self.env,
352 ) => {
353 match complete_result {
354 Ok(output) => {
355 self.checkpoint_control.ack_completed(output);
356 }
357 Err(e) => {
358 self.failure_recovery(e).await;
359 }
360 }
361 },
362 event = self.checkpoint_control.next_event() => {
363 let result: MetaResult<()> = try {
364 match event {
365 CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
366 let database_id = entering_initializing.database_id();
367 let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
368 resp.root_err.as_ref().map(|root_err| {
369 (*worker_id, ScoredError {
370 error: Status::internal(&root_err.err_msg),
371 score: Score(root_err.score)
372 })
373 })
374 }));
375 Self::report_collect_failure(&self.env, &error);
376 self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
377 match self.context.reload_database_runtime_info(database_id).await? { Some(runtime_info) => {
378 runtime_info.validate(database_id, &self.active_streaming_nodes).inspect_err(|e| {
379 warn!(%database_id, err = ?e.as_report(), ?runtime_info, "reloaded database runtime info failed to validate");
380 })?;
381 entering_initializing.enter(runtime_info, &mut self.control_stream_manager);
382 } _ => {
383 info!(%database_id, "database removed after reloading empty runtime info");
384 self.context.mark_ready(MarkReadyOptions::Database(database_id));
386 entering_initializing.remove();
387 }}
388 }
389 CheckpointControlEvent::EnteringRunning(entering_running) => {
390 self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
391 entering_running.enter();
392 }
393 }
394 };
395 if let Err(e) = result {
396 self.failure_recovery(e).await;
397 }
398 }
399 (worker_id, event) = self.control_stream_manager.next_event(&self.term_id, &self.context) => {
400 let resp_result = match event {
401 WorkerNodeEvent::Response(result) => {
402 result
403 }
404 WorkerNodeEvent::Connected(connected) => {
405 connected.initialize(self.checkpoint_control.inflight_infos());
406 continue;
407 }
408 };
409 let result: MetaResult<()> = try {
410 let resp = match resp_result {
411 Err(err) => {
412 let failed_databases = self.checkpoint_control.databases_failed_at_worker_err(worker_id);
413 if !failed_databases.is_empty() {
414 if !self.enable_recovery {
415 panic!("control stream to worker {} failed but recovery not enabled: {:?}", worker_id, err.as_report());
416 }
417 if !self.enable_per_database_isolation() {
418 Err(err.clone())?;
419 }
420 Self::report_collect_failure(&self.env, &err);
421 for database_id in failed_databases {
422 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
423 warn!(%worker_id, %database_id, "database entering recovery on node failure");
424 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
425 self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
426 let output = self.completing_task.wait_completing_task().await?;
428 entering_recovery.enter(output, &mut self.control_stream_manager);
429 }
430 }
431 } else {
432 warn!(%worker_id, "no barrier to collect from worker, ignore err");
433 }
434 continue;
435 }
436 Ok(resp) => resp,
437 };
438 match resp {
439 Response::CompleteBarrier(resp) => {
440 self.checkpoint_control.barrier_collected(resp, &mut self.periodic_barriers)?;
441 },
442 Response::ReportDatabaseFailure(resp) => {
443 if !self.enable_recovery {
444 panic!("database failure reported but recovery not enabled: {:?}", resp)
445 }
446 if !self.enable_per_database_isolation() {
447 Err(anyhow!("database {} reset", resp.database_id))?;
448 }
449 let database_id = resp.database_id;
450 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
451 warn!(%database_id, "database entering recovery");
452 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
453 let output = self.completing_task.wait_completing_task().await?;
455 entering_recovery.enter(output, &mut self.control_stream_manager);
456 }
457 }
458 Response::ResetDatabase(resp) => {
459 self.checkpoint_control.on_reset_database_resp(worker_id, resp);
460 }
461 other @ Response::Init(_) | other @ Response::Shutdown(_) => {
462 Err(anyhow!("get expected response: {:?}", other))?;
463 }
464 }
465 };
466 if let Err(e) = result {
467 self.failure_recovery(e).await;
468 }
469 }
470 new_barrier = self.periodic_barriers.next_barrier(&*self.context) => {
471 let database_id = new_barrier.database_id;
472 if let Err(e) = self.checkpoint_control.handle_new_barrier(new_barrier, &mut self.control_stream_manager) {
473 if !self.enable_recovery {
474 panic!(
475 "failed to inject barrier to some databases but recovery not enabled: {:?}", (
476 database_id,
477 e.as_report()
478 )
479 );
480 }
481 let result: MetaResult<_> = try {
482 if !self.enable_per_database_isolation() {
483 let err = anyhow!("failed to inject barrier to databases: {:?}", (database_id, e.as_report()));
484 Err(err)?;
485 } else if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
486 warn!(%database_id, e = %e.as_report(),"database entering recovery on inject failure");
487 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(e).context("inject barrier failure").into()));
488 let output = self.completing_task.wait_completing_task().await?;
490 entering_recovery.enter(output, &mut self.control_stream_manager);
491 }
492 };
493 if let Err(e) = result {
494 self.failure_recovery(e).await;
495 }
496 }
497 }
498 }
499 self.checkpoint_control.update_barrier_nums_metrics();
500 }
501 }
502}
503
504impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
505 pub async fn clear_on_err(&mut self, err: &MetaError) {
507 let is_err = match replace(&mut self.completing_task, CompletingTask::None) {
509 CompletingTask::None => false,
510 CompletingTask::Completing {
511 epochs_to_ack,
512 join_handle,
513 ..
514 } => {
515 info!("waiting for completing command to finish in recovery");
516 match join_handle.await {
517 Err(e) => {
518 warn!(err = %e.as_report(), "failed to join completing task");
519 true
520 }
521 Ok(Err(e)) => {
522 warn!(
523 err = %e.as_report(),
524 "failed to complete barrier during clear"
525 );
526 true
527 }
528 Ok(Ok(hummock_version_stats)) => {
529 self.checkpoint_control
530 .ack_completed(BarrierCompleteOutput {
531 epochs_to_ack,
532 hummock_version_stats,
533 });
534 false
535 }
536 }
537 }
538 CompletingTask::Err(_) => true,
539 };
540 if !is_err {
541 while let Some(task) = self.checkpoint_control.next_complete_barrier_task(None) {
543 let epochs_to_ack = task.epochs_to_ack();
544 match task
545 .complete_barrier(&*self.context, self.env.clone())
546 .await
547 {
548 Ok(hummock_version_stats) => {
549 self.checkpoint_control
550 .ack_completed(BarrierCompleteOutput {
551 epochs_to_ack,
552 hummock_version_stats,
553 });
554 }
555 Err(e) => {
556 error!(
557 err = %e.as_report(),
558 "failed to complete barrier during recovery"
559 );
560 break;
561 }
562 }
563 }
564 }
565 self.checkpoint_control.clear_on_err(err);
566 }
567}
568
569impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
570 async fn failure_recovery(&mut self, err: MetaError) {
572 self.clear_on_err(&err).await;
573
574 if self.enable_recovery {
575 let span = tracing::info_span!(
576 "failure_recovery",
577 error = %err.as_report(),
578 );
579
580 crate::telemetry::report_event(
581 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
582 "failure_recovery",
583 0,
584 None,
585 None,
586 None,
587 );
588
589 let reason = RecoveryReason::Failover(err);
590
591 self.recovery(false, reason).instrument(span).await;
594 } else {
595 panic!(
596 "a streaming error occurred while recovery is disabled, aborting: {:?}",
597 err.as_report()
598 );
599 }
600 }
601
602 async fn adhoc_recovery(&mut self) {
603 let err = MetaErrorInner::AdhocRecovery.into();
604 self.clear_on_err(&err).await;
605
606 let span = tracing::info_span!(
607 "adhoc_recovery",
608 error = %err.as_report(),
609 );
610
611 crate::telemetry::report_event(
612 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
613 "adhoc_recovery",
614 0,
615 None,
616 None,
617 None,
618 );
619
620 self.recovery(false, RecoveryReason::Adhoc)
623 .instrument(span)
624 .await;
625 }
626}
627
628impl<C> GlobalBarrierWorker<C> {
629 pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
631 use risingwave_pb::meta::event_log;
633 let event = event_log::EventCollectBarrierFail {
634 error: error.to_report_string(),
635 };
636 env.event_log_manager_ref()
637 .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
638 }
639}
640
641mod retry_strategy {
642 use std::time::Duration;
643
644 use tokio_retry::strategy::{ExponentialBackoff, jitter};
645
646 const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
648 const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
650
651 pub(crate) type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
670
671 pub(crate) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
672 Box::pin(tokio::time::sleep(duration))
673 }
674
675 pub(crate) type RetryBackoffStrategy =
676 impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
677
678 #[inline(always)]
680 pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> + Send + 'static {
681 ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
682 .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
683 .map(jitter)
684 }
685
686 #[define_opaque(RetryBackoffStrategy)]
687 pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
688 get_retry_strategy().map(get_retry_backoff_future)
689 }
690}
691
692pub(crate) use retry_strategy::*;
693use risingwave_common::error::tonic::extra::{Score, ScoredError};
694use risingwave_pb::meta::event_log::{Event, EventRecovery};
695
696use crate::barrier::edge_builder::FragmentEdgeBuilder;
697
698impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
699 pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
707 self.control_stream_manager.clear();
709
710 let reason_str = match &recovery_reason {
711 RecoveryReason::Bootstrap => "bootstrap".to_owned(),
712 RecoveryReason::Failover(err) => {
713 format!("failed over: {}", err.as_report())
714 }
715 RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
716 };
717 self.context.abort_and_mark_blocked(None, recovery_reason);
718
719 self.recovery_inner(is_paused, reason_str).await;
720 self.context.mark_ready(MarkReadyOptions::Global {
721 blocked_databases: self.checkpoint_control.recovering_databases().collect(),
722 });
723 }
724
725 #[await_tree::instrument("recovery({recovery_reason})")]
726 async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
727 let event_log_manager_ref = self.env.event_log_manager_ref();
728
729 tracing::info!("recovery start!");
730 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
731 EventRecovery::global_recovery_start(recovery_reason.clone()),
732 )]);
733
734 let retry_strategy = get_retry_strategy();
735
736 let recovery_timer = GLOBAL_META_METRICS
739 .recovery_latency
740 .with_label_values(&["global"])
741 .start_timer();
742
743 let enable_per_database_isolation = self.enable_per_database_isolation();
744
745 let new_state = tokio_retry::Retry::spawn(retry_strategy, || async {
746 self.env.stream_client_pool().invalidate_all();
747 self.context
752 .notify_creating_job_failed(None, recovery_reason.clone())
753 .await;
754 let runtime_info_snapshot = self
755 .context
756 .reload_runtime_info()
757 .await?;
758 runtime_info_snapshot.validate().inspect_err(|e| {
759 warn!(err = ?e.as_report(), ?runtime_info_snapshot, "reloaded runtime info failed to validate");
760 })?;
761 let BarrierWorkerRuntimeInfoSnapshot {
762 active_streaming_nodes,
763 database_job_infos,
764 mut state_table_committed_epochs,
765 mut state_table_log_epochs,
766 mut mv_depended_subscriptions,
767 stream_actors,
768 fragment_relations,
769 mut source_splits,
770 mut background_jobs,
771 hummock_version_stats,
772 database_infos,
773 mut cdc_table_snapshot_split_assignment,
774 } = runtime_info_snapshot;
775
776 self.sink_manager.reset().await;
777 let term_id = Uuid::new_v4().to_string();
778
779
780 let mut control_stream_manager = ControlStreamManager::recover(
781 self.env.clone(),
782 active_streaming_nodes.current(),
783 &term_id,
784 self.context.clone(),
785 )
786 .await;
787
788
789 {
790 let mut builder = FragmentEdgeBuilder::new(database_job_infos.values().flat_map(|jobs| jobs.values().flat_map(|fragments| fragments.values())), &control_stream_manager);
791 builder.add_relations(&fragment_relations);
792 let mut edges = builder.build();
793
794 let mut collected_databases = HashMap::new();
795 let mut collecting_databases = HashMap::new();
796 let mut failed_databases = HashSet::new();
797 for (database_id, jobs) in database_job_infos {
798 let result = control_stream_manager.inject_database_initial_barrier(
799 database_id,
800 jobs,
801 &mut state_table_committed_epochs,
802 &mut state_table_log_epochs,
803 &mut edges,
804 &stream_actors,
805 &mut source_splits,
806 &mut background_jobs,
807 &mut mv_depended_subscriptions,
808 is_paused,
809 &hummock_version_stats,
810 &mut cdc_table_snapshot_split_assignment,
811 );
812 let node_to_collect = match result {
813 Ok(info) => {
814 info
815 }
816 Err(e) => {
817 warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
818 assert!(failed_databases.insert(database_id), "non-duplicate");
819 continue;
820 }
821 };
822 if !node_to_collect.is_collected() {
823 assert!(collecting_databases.insert(database_id, node_to_collect).is_none());
824 } else {
825 warn!(%database_id, "database has no node to inject initial barrier");
826 assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
827 }
828 }
829 while !collecting_databases.is_empty() {
830 let (worker_id, result) =
831 control_stream_manager.next_response(&term_id, &self.context).await;
832 let resp = match result {
833 Err(e) => {
834 warn!(%worker_id, err = %e.as_report(), "worker node failure during recovery");
835 for (failed_database_id, _) in collecting_databases.extract_if(|_, node_to_collect| {
836 !node_to_collect.is_valid_after_worker_err(worker_id)
837 }) {
838 warn!(%failed_database_id, %worker_id, "database failed to recovery in global recovery due to worker node err");
839 assert!(failed_databases.insert(failed_database_id));
840 }
841 continue;
842 }
843 Ok(resp) => {
844 match resp {
845 Response::CompleteBarrier(resp) => {
846 resp
847 }
848 Response::ReportDatabaseFailure(resp) => {
849 let database_id = resp.database_id;
850 if collecting_databases.remove(&database_id).is_some() {
851 warn!(%database_id, %worker_id, "database reset during global recovery");
852 assert!(failed_databases.insert(database_id));
853 } else if collected_databases.remove(&database_id).is_some() {
854 warn!(%database_id, %worker_id, "database initialized but later reset during global recovery");
855 assert!(failed_databases.insert(database_id));
856 } else {
857 assert!(failed_databases.contains(&database_id));
858 }
859 continue;
860 }
861 other @ (Response::Init(_) | Response::Shutdown(_) | Response::ResetDatabase(_)) => {
862 return Err(anyhow!("get unexpected resp {:?}", other).into());
863 }
864 }
865 }
866 };
867 assert_eq!(worker_id, resp.worker_id);
868 let database_id = resp.database_id;
869 if failed_databases.contains(&database_id) {
870 assert!(!collecting_databases.contains_key(&database_id));
871 continue;
873 }
874 let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
875 unreachable!("should exist")
876 };
877 let node_to_collect = entry.get_mut();
878 node_to_collect.collect_resp(resp);
879 if node_to_collect.is_collected() {
880 let node_to_collect = entry.remove();
881 assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
882 }
883 }
884 debug!("collected initial barrier");
885 if !stream_actors.is_empty() {
886 warn!(actor_ids = ?stream_actors.keys().collect_vec(), "unused stream actors in recovery");
887 }
888 if !source_splits.is_empty() {
889 warn!(actor_ids = ?source_splits.keys().collect_vec(), "unused actor source splits in recovery");
890 }
891 if !background_jobs.is_empty() {
892 warn!(job_ids = ?background_jobs.keys().collect_vec(), "unused recovered background mview in recovery");
893 }
894 if !mv_depended_subscriptions.is_empty() {
895 warn!(?mv_depended_subscriptions, "unused subscription infos in recovery");
896 }
897 if !state_table_committed_epochs.is_empty() {
898 warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
899 }
900 if !enable_per_database_isolation && !failed_databases.is_empty() {
901 return Err(anyhow!(
902 "global recovery failed due to failure of databases {:?}",
903 failed_databases.iter().map(|database_id| database_id.as_raw_id()).collect_vec()).into()
904 );
905 }
906 let checkpoint_control = CheckpointControl::recover(
907 collected_databases,
908 failed_databases,
909 &mut control_stream_manager,
910 hummock_version_stats,
911 self.env.clone(),
912 );
913
914 let reader = self.env.system_params_reader().await;
915 let checkpoint_frequency = reader.checkpoint_frequency();
916 let barrier_interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
917 let periodic_barriers = PeriodicBarriers::new(
918 barrier_interval,
919 checkpoint_frequency,
920 database_infos,
921 );
922
923 Ok((
924 active_streaming_nodes,
925 control_stream_manager,
926 checkpoint_control,
927 term_id,
928 periodic_barriers,
929 ))
930 }
931 }.inspect_err(|err: &MetaError| {
932 tracing::error!(error = %err.as_report(), "recovery failed");
933 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
934 EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
935 )]);
936 GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
937 }))
938 .instrument(tracing::info_span!("recovery_attempt"))
939 .await
940 .expect("Retry until recovery success.");
941
942 let duration = recovery_timer.stop_and_record();
943
944 (
945 self.active_streaming_nodes,
946 self.control_stream_manager,
947 self.checkpoint_control,
948 self.term_id,
949 self.periodic_barriers,
950 ) = new_state;
951
952 tracing::info!("recovery success");
953
954 let recovering_databases = self
955 .checkpoint_control
956 .recovering_databases()
957 .map(|database| database.as_raw_id())
958 .collect_vec();
959 let running_databases = self
960 .checkpoint_control
961 .running_databases()
962 .map(|database| database.as_raw_id())
963 .collect_vec();
964
965 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
966 EventRecovery::global_recovery_success(
967 recovery_reason.clone(),
968 duration as f32,
969 running_databases,
970 recovering_databases,
971 ),
972 )]);
973
974 self.env
975 .notification_manager()
976 .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
977 self.env
978 .notification_manager()
979 .notify_compute_without_version(Operation::Update, Info::Recovery(Recovery {}));
980 }
981}