1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::anyhow;
22use arc_swap::ArcSwap;
23use futures::TryFutureExt;
24use itertools::Itertools;
25use risingwave_common::catalog::DatabaseId;
26use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_pb::meta::Recovery;
29use risingwave_pb::meta::subscribe_response::{Info, Operation};
30use risingwave_pb::stream_service::streaming_control_stream_response::Response;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tokio::sync::oneshot::{Receiver, Sender};
34use tokio::task::JoinHandle;
35use tokio::time::Instant;
36use tonic::Status;
37use tracing::{Instrument, debug, error, info, warn};
38use uuid::Uuid;
39
40use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
41use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
42use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
43use crate::barrier::rpc::{ControlStreamManager, merge_node_rpc_errors};
44use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
45use crate::barrier::{
46 BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command,
47 RecoveryReason, schedule,
48};
49use crate::error::MetaErrorInner;
50use crate::hummock::HummockManagerRef;
51use crate::manager::sink_coordination::SinkCoordinatorManager;
52use crate::manager::{
53 ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
54 MetadataManager,
55};
56use crate::rpc::metrics::GLOBAL_META_METRICS;
57use crate::stream::{ScaleControllerRef, SourceManagerRef};
58use crate::{MetaError, MetaResult};
59
60pub(super) struct GlobalBarrierWorker<C> {
70 enable_recovery: bool,
72
73 periodic_barriers: PeriodicBarriers,
75
76 in_flight_barrier_nums: usize,
78
79 system_enable_per_database_isolation: bool,
81
82 pub(super) context: Arc<C>,
83
84 env: MetaSrvEnv,
85
86 checkpoint_control: CheckpointControl,
87
88 completing_task: CompletingTask,
91
92 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
93
94 active_streaming_nodes: ActiveStreamingWorkerNodes,
95
96 sink_manager: SinkCoordinatorManager,
97
98 control_stream_manager: ControlStreamManager,
99
100 term_id: String,
101}
102
103impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
104 pub async fn new(
106 scheduled_barriers: schedule::ScheduledBarriers,
107 env: MetaSrvEnv,
108 metadata_manager: MetadataManager,
109 hummock_manager: HummockManagerRef,
110 source_manager: SourceManagerRef,
111 sink_manager: SinkCoordinatorManager,
112 scale_controller: ScaleControllerRef,
113 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
114 ) -> Self {
115 let enable_recovery = env.opts.enable_recovery;
116 let in_flight_barrier_nums = env.opts.in_flight_barrier_nums;
117
118 let active_streaming_nodes =
119 ActiveStreamingWorkerNodes::uninitialized(metadata_manager.clone());
120
121 let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
122
123 let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
124 scheduled_barriers,
125 status,
126 metadata_manager,
127 hummock_manager,
128 source_manager,
129 scale_controller,
130 env.clone(),
131 ));
132
133 let control_stream_manager = ControlStreamManager::new(env.clone());
134
135 let reader = env.system_params_reader().await;
136 let checkpoint_frequency = reader.checkpoint_frequency() as _;
137 let system_enable_per_database_isolation = reader.per_database_isolation();
138 let interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
139 let periodic_barriers = PeriodicBarriers::new(interval, checkpoint_frequency);
140 tracing::info!(
141 "Starting barrier scheduler with: checkpoint_frequency={:?}",
142 checkpoint_frequency,
143 );
144
145 let checkpoint_control = CheckpointControl::new(env.clone());
146 Self {
147 enable_recovery,
148 periodic_barriers,
149 in_flight_barrier_nums,
150 system_enable_per_database_isolation,
151 context,
152 env,
153 checkpoint_control,
154 completing_task: CompletingTask::None,
155 request_rx,
156 active_streaming_nodes,
157 sink_manager,
158 control_stream_manager,
159 term_id: "uninitialized".into(),
160 }
161 }
162
163 pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
164 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
165 let fut = (self.env.await_tree_reg())
166 .register_derived_root("Global Barrier Worker")
167 .instrument(self.run(shutdown_rx));
168 let join_handle = tokio::spawn(fut);
169
170 (join_handle, shutdown_tx)
171 }
172
173 async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
175 let paused = self
176 .env
177 .system_params_reader()
178 .await
179 .pause_on_next_bootstrap();
180 if paused {
181 warn!(
182 "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
183 It will now be reset to `false`. \
184 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
185 PAUSE_ON_NEXT_BOOTSTRAP_KEY
186 );
187 self.env
188 .system_params_manager_impl_ref()
189 .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
190 .await?;
191 }
192 Ok(paused)
193 }
194
195 async fn run(mut self, shutdown_rx: Receiver<()>) {
197 tracing::info!(
198 "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
199 self.enable_recovery,
200 self.in_flight_barrier_nums,
201 );
202
203 if !self.enable_recovery {
204 let job_exist = self
205 .context
206 .metadata_manager
207 .catalog_controller
208 .has_any_streaming_jobs()
209 .await
210 .unwrap();
211 if job_exist {
212 panic!(
213 "Some streaming jobs already exist in meta, please start with recovery enabled \
214 or clean up the metadata using `./risedev clean-data`"
215 );
216 }
217 }
218
219 {
220 let span = tracing::info_span!("bootstrap_recovery");
225 crate::telemetry::report_event(
226 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
227 "normal_recovery",
228 0,
229 None,
230 None,
231 None,
232 );
233
234 let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
235
236 self.recovery(paused, RecoveryReason::Bootstrap)
237 .instrument(span)
238 .await;
239 }
240
241 self.run_inner(shutdown_rx).await
242 }
243}
244
245impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
246 fn enable_per_database_isolation(&self) -> bool {
247 self.system_enable_per_database_isolation && {
248 if let Err(e) =
249 risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()
250 {
251 warn!(error = %e.as_report(), "DatabaseFailureIsolation disabled by license");
252 false
253 } else {
254 true
255 }
256 }
257 }
258
259 async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
260 let (local_notification_tx, mut local_notification_rx) =
261 tokio::sync::mpsc::unbounded_channel();
262 self.env
263 .notification_manager()
264 .insert_local_sender(local_notification_tx)
265 .await;
266
267 loop {
269 tokio::select! {
270 biased;
271
272 _ = &mut shutdown_rx => {
274 tracing::info!("Barrier manager is stopped");
275 break;
276 }
277
278 request = self.request_rx.recv() => {
279 if let Some(request) = request {
280 match request {
281 BarrierManagerRequest::GetDdlProgress(result_tx) => {
282 let progress = self.checkpoint_control.gen_ddl_progress();
283 if result_tx.send(progress).is_err() {
284 error!("failed to send get ddl progress");
285 }
286 }BarrierManagerRequest::AdhocRecovery(sender) => {
288 self.adhoc_recovery().await;
289 if sender.send(()).is_err() {
290 warn!("failed to notify finish of adhoc recovery");
291 }
292 }
293 }
294 } else {
295 tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
296 return;
297 }
298 }
299
300 changed_worker = self.active_streaming_nodes.changed() => {
301 #[cfg(debug_assertions)]
302 {
303 self.active_streaming_nodes.validate_change().await;
304 }
305
306 info!(?changed_worker, "worker changed");
307
308 if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker {
309 self.control_stream_manager.add_worker(node, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
310 }
311 }
312
313 notification = local_notification_rx.recv() => {
314 let notification = notification.unwrap();
315 if let LocalNotification::SystemParamsChange(p) = notification {
316 {
317 self.periodic_barriers.set_min_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
318 self.periodic_barriers
319 .set_checkpoint_frequency(p.checkpoint_frequency() as usize);
320 self.system_enable_per_database_isolation = p.per_database_isolation();
321 }
322 }
323 }
324 complete_result = self
325 .completing_task
326 .next_completed_barrier(
327 &mut self.periodic_barriers,
328 &mut self.checkpoint_control,
329 &mut self.control_stream_manager,
330 &self.context,
331 &self.env,
332 ) => {
333 match complete_result {
334 Ok(output) => {
335 self.checkpoint_control.ack_completed(output);
336 }
337 Err(e) => {
338 self.failure_recovery(e).await;
339 }
340 }
341 },
342 event = self.checkpoint_control.next_event() => {
343 let result: MetaResult<()> = try {
344 match event {
345 CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
346 let database_id = entering_initializing.database_id();
347 let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
348 resp.root_err.as_ref().map(|root_err| {
349 (*worker_id, ScoredError {
350 error: Status::internal(&root_err.err_msg),
351 score: Score(root_err.score)
352 })
353 })
354 }));
355 Self::report_collect_failure(&self.env, &error);
356 self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
357 match self.context.reload_database_runtime_info(database_id).await? { Some(runtime_info) => {
358 runtime_info.validate(database_id, &self.active_streaming_nodes).inspect_err(|e| {
359 warn!(database_id = database_id.database_id, err = ?e.as_report(), ?runtime_info, "reloaded database runtime info failed to validate");
360 })?;
361 let workers = InflightFragmentInfo::workers(runtime_info.job_infos.values().flat_map(|job| job.fragment_infos()));
362 for worker_id in workers {
363 if !self.control_stream_manager.is_connected(worker_id) {
364 self.control_stream_manager.try_reconnect_worker(worker_id, entering_initializing.control().inflight_infos(), self.term_id.clone(), &*self.context).await;
365 }
366 }
367 entering_initializing.enter(runtime_info, &mut self.control_stream_manager);
368 } _ => {
369 info!(database_id = database_id.database_id, "database removed after reloading empty runtime info");
370 self.context.mark_ready(MarkReadyOptions::Database(database_id));
372 entering_initializing.remove();
373 }}
374 }
375 CheckpointControlEvent::EnteringRunning(entering_running) => {
376 self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
377 entering_running.enter();
378 }
379 }
380 };
381 if let Err(e) = result {
382 self.failure_recovery(e).await;
383 }
384 }
385 (worker_id, resp_result) = self.control_stream_manager.next_response() => {
386 let result: MetaResult<()> = try {
387 let resp = match resp_result {
388 Err(err) => {
389 let failed_databases = self.checkpoint_control.databases_failed_at_worker_err(worker_id);
390 if !failed_databases.is_empty() {
391 if !self.enable_recovery {
392 panic!("control stream to worker {} failed but recovery not enabled: {:?}", worker_id, err.as_report());
393 }
394 if !self.enable_per_database_isolation() {
395 Err(err.clone())?;
396 }
397 Self::report_collect_failure(&self.env, &err);
398 for database_id in failed_databases {
399 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
400 warn!(worker_id, database_id = database_id.database_id, "database entering recovery on node failure");
401 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
402 self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
403 let output = self.completing_task.wait_completing_task().await?;
405 entering_recovery.enter(output, &mut self.control_stream_manager);
406 }
407 }
408 } else {
409 warn!(worker_id, "no barrier to collect from worker, ignore err");
410 }
411 continue;
412 }
413 Ok(resp) => resp,
414 };
415 match resp {
416 Response::CompleteBarrier(resp) => {
417 self.checkpoint_control.barrier_collected(resp, &mut self.periodic_barriers)?;
418 },
419 Response::ReportDatabaseFailure(resp) => {
420 if !self.enable_recovery {
421 panic!("database failure reported but recovery not enabled: {:?}", resp)
422 }
423 if !self.enable_per_database_isolation() {
424 Err(anyhow!("database {} reset", resp.database_id))?;
425 }
426 let database_id = DatabaseId::new(resp.database_id);
427 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
428 warn!(database_id = database_id.database_id, "database entering recovery");
429 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
430 let output = self.completing_task.wait_completing_task().await?;
432 entering_recovery.enter(output, &mut self.control_stream_manager);
433 }
434 }
435 Response::ResetDatabase(resp) => {
436 self.checkpoint_control.on_reset_database_resp(worker_id, resp);
437 }
438 other @ Response::Init(_) | other @ Response::Shutdown(_) => {
439 Err(anyhow!("get expected response: {:?}", other))?;
440 }
441 }
442 };
443 if let Err(e) = result {
444 self.failure_recovery(e).await;
445 }
446 }
447 new_barrier = self.periodic_barriers.next_barrier(&*self.context),
448 if self
449 .checkpoint_control
450 .can_inject_barrier(self.in_flight_barrier_nums) => {
451 if let Some((_, Command::CreateStreamingJob { info, .. }, _)) = &new_barrier.command {
452 let worker_ids: HashSet<_> =
453 info.stream_job_fragments.inner
454 .actors_to_create()
455 .flat_map(|(_, _, actors)|
456 actors.map(|(_, worker_id)| worker_id)
457 )
458 .collect();
459 for worker_id in worker_ids {
460 if !self.control_stream_manager.is_connected(worker_id) {
461 self.control_stream_manager.try_reconnect_worker(worker_id, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
462 }
463 }
464 }
465 if let Some(failed_databases) = self.checkpoint_control.handle_new_barrier(new_barrier, &mut self.control_stream_manager)
466 && !failed_databases.is_empty() {
467 if !self.enable_recovery {
468 panic!(
469 "failed to inject barrier for some databases but recovery not enabled: {:?}",
470 failed_databases.iter().map(|(database_id, e)| (database_id, e.as_report())).collect_vec()
471 );
472 }
473 let result: MetaResult<_> = try {
474 if !self.enable_per_database_isolation() {
475 let errs = failed_databases.iter().map(|(database_id, e)| (database_id, e.as_report())).collect_vec();
476 let err = anyhow!("failed to inject barrier for databases: {:?}", errs);
477 Err(err)?;
478 } else {
479 for (database_id, err) in failed_databases {
480 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
481 warn!(%database_id, e = %err.as_report(),"database entering recovery on inject failure");
482 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(err).context("inject barrier failure").into()));
483 let output = self.completing_task.wait_completing_task().await?;
485 entering_recovery.enter(output, &mut self.control_stream_manager);
486 }
487 }
488 }
489 };
490 if let Err(e) = result {
491 self.failure_recovery(e).await;
492 }
493 }
494 }
495 }
496 self.checkpoint_control.update_barrier_nums_metrics();
497 }
498 }
499}
500
501impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
502 pub async fn clear_on_err(&mut self, err: &MetaError) {
504 let is_err = match replace(&mut self.completing_task, CompletingTask::None) {
506 CompletingTask::None => false,
507 CompletingTask::Completing {
508 epochs_to_ack,
509 join_handle,
510 ..
511 } => {
512 info!("waiting for completing command to finish in recovery");
513 match join_handle.await {
514 Err(e) => {
515 warn!(err = %e.as_report(), "failed to join completing task");
516 true
517 }
518 Ok(Err(e)) => {
519 warn!(
520 err = %e.as_report(),
521 "failed to complete barrier during clear"
522 );
523 true
524 }
525 Ok(Ok(hummock_version_stats)) => {
526 self.checkpoint_control
527 .ack_completed(BarrierCompleteOutput {
528 epochs_to_ack,
529 hummock_version_stats,
530 });
531 false
532 }
533 }
534 }
535 CompletingTask::Err(_) => true,
536 };
537 if !is_err {
538 while let Some(task) = self.checkpoint_control.next_complete_barrier_task(None) {
540 let epochs_to_ack = task.epochs_to_ack();
541 match task
542 .complete_barrier(&*self.context, self.env.clone())
543 .await
544 {
545 Ok(hummock_version_stats) => {
546 self.checkpoint_control
547 .ack_completed(BarrierCompleteOutput {
548 epochs_to_ack,
549 hummock_version_stats,
550 });
551 }
552 Err(e) => {
553 error!(
554 err = %e.as_report(),
555 "failed to complete barrier during recovery"
556 );
557 break;
558 }
559 }
560 }
561 }
562 self.checkpoint_control.clear_on_err(err);
563 }
564}
565
566impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
567 async fn failure_recovery(&mut self, err: MetaError) {
569 self.clear_on_err(&err).await;
570
571 if self.enable_recovery {
572 let span = tracing::info_span!(
573 "failure_recovery",
574 error = %err.as_report(),
575 );
576
577 crate::telemetry::report_event(
578 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
579 "failure_recovery",
580 0,
581 None,
582 None,
583 None,
584 );
585
586 let reason = RecoveryReason::Failover(err);
587
588 self.recovery(false, reason).instrument(span).await;
591 } else {
592 panic!(
593 "a streaming error occurred while recovery is disabled, aborting: {:?}",
594 err.as_report()
595 );
596 }
597 }
598
599 async fn adhoc_recovery(&mut self) {
600 let err = MetaErrorInner::AdhocRecovery.into();
601 self.clear_on_err(&err).await;
602
603 let span = tracing::info_span!(
604 "adhoc_recovery",
605 error = %err.as_report(),
606 );
607
608 crate::telemetry::report_event(
609 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
610 "adhoc_recovery",
611 0,
612 None,
613 None,
614 None,
615 );
616
617 self.recovery(false, RecoveryReason::Adhoc)
620 .instrument(span)
621 .await;
622 }
623}
624
625impl<C> GlobalBarrierWorker<C> {
626 pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
628 use risingwave_pb::meta::event_log;
630 let event = event_log::EventCollectBarrierFail {
631 error: error.to_report_string(),
632 };
633 env.event_log_manager_ref()
634 .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
635 }
636}
637
638mod retry_strategy {
639 use std::time::Duration;
640
641 use tokio_retry::strategy::{ExponentialBackoff, jitter};
642
643 const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
645 const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
647
648 mod retry_backoff_future {
649 use std::future::Future;
650 use std::time::Duration;
651
652 use tokio::time::sleep;
653
654 pub(crate) type RetryBackoffFuture = impl Future<Output = ()> + Unpin + Send + 'static;
655 pub(super) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
656 Box::pin(sleep(duration))
657 }
658 }
659 pub(crate) use retry_backoff_future::*;
660
661 pub(crate) type RetryBackoffStrategy =
662 impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
663
664 #[inline(always)]
665 pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> {
667 ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
668 .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
669 .map(jitter)
670 }
671
672 pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
673 get_retry_strategy().map(get_retry_backoff_future)
674 }
675}
676
677pub(crate) use retry_strategy::*;
678use risingwave_common::error::tonic::extra::{Score, ScoredError};
679use risingwave_meta_model::WorkerId;
680use risingwave_pb::meta::event_log::{Event, EventRecovery};
681
682use crate::barrier::edge_builder::FragmentEdgeBuilder;
683use crate::controller::fragment::InflightFragmentInfo;
684
685impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
686 pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
694 self.control_stream_manager.clear();
696
697 let reason_str = match &recovery_reason {
698 RecoveryReason::Bootstrap => "bootstrap".to_owned(),
699 RecoveryReason::Failover(err) => {
700 format!("failed over: {}", err.as_report())
701 }
702 RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
703 };
704 self.context.abort_and_mark_blocked(None, recovery_reason);
705
706 self.recovery_inner(is_paused, reason_str).await;
707 self.context.mark_ready(MarkReadyOptions::Global {
708 blocked_databases: self.checkpoint_control.recovering_databases().collect(),
709 });
710 }
711
712 #[await_tree::instrument("recovery({recovery_reason})")]
713 async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
714 let event_log_manager_ref = self.env.event_log_manager_ref();
715
716 tracing::info!("recovery start!");
717 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
718 EventRecovery::global_recovery_start(recovery_reason.clone()),
719 )]);
720
721 let retry_strategy = get_retry_strategy();
722
723 let recovery_timer = GLOBAL_META_METRICS
726 .recovery_latency
727 .with_label_values(&["global"])
728 .start_timer();
729
730 let enable_per_database_isolation = self.enable_per_database_isolation();
731
732 let new_state = tokio_retry::Retry::spawn(retry_strategy, || async {
733 self.env.stream_client_pool().invalidate_all();
734 self.context
739 .notify_creating_job_failed(None, recovery_reason.clone())
740 .await;
741 let runtime_info_snapshot = self
742 .context
743 .reload_runtime_info()
744 .await?;
745 runtime_info_snapshot.validate().inspect_err(|e| {
746 warn!(err = ?e.as_report(), ?runtime_info_snapshot, "reloaded runtime info failed to validate");
747 })?;
748 let BarrierWorkerRuntimeInfoSnapshot {
749 active_streaming_nodes,
750 database_job_infos,
751 mut state_table_committed_epochs,
752 mut state_table_log_epochs,
753 mut subscription_infos,
754 stream_actors,
755 fragment_relations,
756 mut source_splits,
757 mut background_jobs,
758 hummock_version_stats,
759 } = runtime_info_snapshot;
760
761 self.sink_manager.reset().await;
762 let term_id = Uuid::new_v4().to_string();
763
764 let mut control_stream_manager = ControlStreamManager::new(self.env.clone());
765 let reset_start_time = Instant::now();
766 let unconnected_worker = control_stream_manager
767 .reset(
768 active_streaming_nodes.current(),
769 term_id.clone(),
770 &*self.context,
771 )
772 .await;
773 info!(elapsed=?reset_start_time.elapsed(), ?unconnected_worker, "control stream reset");
774
775 {
776 let mut builder = FragmentEdgeBuilder::new(database_job_infos.values().flat_map(|info| info.values().flatten()), &control_stream_manager);
777 builder.add_relations(&fragment_relations);
778 let mut edges = builder.build();
779
780 let mut collected_databases = HashMap::new();
781 let mut collecting_databases = HashMap::new();
782 let mut failed_databases = HashSet::new();
783 for (database_id, jobs) in database_job_infos {
784 let result = control_stream_manager.inject_database_initial_barrier(
785 database_id,
786 jobs,
787 &mut state_table_committed_epochs,
788 &mut state_table_log_epochs,
789 &mut edges,
790 &stream_actors,
791 &mut source_splits,
792 &mut background_jobs,
793 subscription_infos.remove(&database_id).unwrap_or_default(),
794 is_paused,
795 &hummock_version_stats,
796 );
797 let node_to_collect = match result {
798 Ok(info) => {
799 info
800 }
801 Err(e) => {
802 warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
803 assert!(failed_databases.insert(database_id), "non-duplicate");
804 continue;
805 }
806 };
807 if !node_to_collect.is_collected() {
808 assert!(collecting_databases.insert(database_id, node_to_collect).is_none());
809 } else {
810 warn!(database_id = database_id.database_id, "database has no node to inject initial barrier");
811 assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
812 }
813 }
814 while !collecting_databases.is_empty() {
815 let (worker_id, result) =
816 control_stream_manager.next_response().await;
817 let resp = match result {
818 Err(e) => {
819 warn!(worker_id, err = %e.as_report(), "worker node failure during recovery");
820 for (failed_database_id,_ ) in collecting_databases.extract_if(|_, node_to_collect| {
821 !node_to_collect.is_valid_after_worker_err(worker_id)
822 }) {
823 warn!(%failed_database_id, worker_id, "database failed to recovery in global recovery due to worker node err");
824 assert!(failed_databases.insert(failed_database_id));
825 }
826 continue;
827 }
828 Ok(resp) => {
829 match resp {
830 Response::CompleteBarrier(resp) => {
831 resp
832 }
833 Response::ReportDatabaseFailure(resp) => {
834 let database_id = DatabaseId::new(resp.database_id);
835 if collecting_databases.remove(&database_id).is_some() {
836 warn!(%database_id, worker_id, "database reset during global recovery");
837 assert!(failed_databases.insert(database_id));
838 } else if collected_databases.remove(&database_id).is_some() {
839 warn!(%database_id, worker_id, "database initialized but later reset during global recovery");
840 assert!(failed_databases.insert(database_id));
841 } else {
842 assert!(failed_databases.contains(&database_id));
843 }
844 continue;
845 }
846 other @ (Response::Init(_) | Response::Shutdown(_) | Response::ResetDatabase(_))=> {
847 return Err(anyhow!("get unexpected resp {:?}", other).into());
848 }
849 }
850 }
851 };
852 assert_eq!(worker_id, resp.worker_id as WorkerId);
853 let database_id = DatabaseId::new(resp.database_id);
854 if failed_databases.contains(&database_id) {
855 assert!(!collecting_databases.contains_key(&database_id));
856 continue;
858 }
859 let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
860 unreachable!("should exist")
861 };
862 let node_to_collect = entry.get_mut();
863 node_to_collect.collect_resp(resp);
864 if node_to_collect.is_collected() {
865 let node_to_collect = entry.remove();
866 assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
867 }
868 }
869 debug!("collected initial barrier");
870 if !stream_actors.is_empty() {
871 warn!(actor_ids = ?stream_actors.keys().collect_vec(), "unused stream actors in recovery");
872 }
873 if !source_splits.is_empty() {
874 warn!(actor_ids = ?source_splits.keys().collect_vec(), "unused actor source splits in recovery");
875 }
876 if !background_jobs.is_empty() {
877 warn!(job_ids = ?background_jobs.keys().collect_vec(), "unused recovered background mview in recovery");
878 }
879 if !subscription_infos.is_empty() {
880 warn!(?subscription_infos, "unused subscription infos in recovery");
881 }
882 if !state_table_committed_epochs.is_empty() {
883 warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
884 }
885 if !enable_per_database_isolation && !failed_databases.is_empty() {
886 return Err(anyhow!(
887 "global recovery failed due to failure of databases {:?}",
888 failed_databases.iter().map(|database_id| database_id.database_id).collect_vec()).into()
889 );
890 }
891 let checkpoint_control = CheckpointControl::recover(
892 collected_databases,
893 failed_databases,
894 &mut control_stream_manager,
895 hummock_version_stats,
896 self.env.clone(),
897 );
898 Ok((
899 active_streaming_nodes,
900 control_stream_manager,
901 checkpoint_control,
902 term_id,
903 ))
904 }
905 }.inspect_err(|err: &MetaError| {
906 tracing::error!(error = %err.as_report(), "recovery failed");
907 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
908 EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
909 )]);
910 GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
911 }))
912 .instrument(tracing::info_span!("recovery_attempt"))
913 .await
914 .expect("Retry until recovery success.");
915
916 let duration = recovery_timer.stop_and_record();
917
918 (
919 self.active_streaming_nodes,
920 self.control_stream_manager,
921 self.checkpoint_control,
922 self.term_id,
923 ) = new_state;
924
925 tracing::info!("recovery success");
926
927 let recovering_databases = self
928 .checkpoint_control
929 .recovering_databases()
930 .map(|database| database.database_id)
931 .collect_vec();
932 let running_databases = self
933 .checkpoint_control
934 .running_databases()
935 .map(|database| database.database_id)
936 .collect_vec();
937
938 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
939 EventRecovery::global_recovery_success(
940 recovery_reason.clone(),
941 duration as f32,
942 running_databases,
943 recovering_databases,
944 ),
945 )]);
946
947 self.env
948 .notification_manager()
949 .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
950 self.env
951 .notification_manager()
952 .notify_compute_without_version(Operation::Update, Info::Recovery(Recovery {}));
953 }
954}