1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::anyhow;
22use arc_swap::ArcSwap;
23use futures::TryFutureExt;
24use itertools::Itertools;
25use risingwave_common::catalog::DatabaseId;
26use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_pb::meta::Recovery;
29use risingwave_pb::meta::subscribe_response::{Info, Operation};
30use risingwave_pb::stream_service::streaming_control_stream_response::Response;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tokio::sync::oneshot::{Receiver, Sender};
34use tokio::task::JoinHandle;
35use tokio::time::Instant;
36use tonic::Status;
37use tracing::{Instrument, debug, error, info, warn};
38use uuid::Uuid;
39
40use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
41use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
42use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
43use crate::barrier::rpc::{ControlStreamManager, merge_node_rpc_errors};
44use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
45use crate::barrier::{
46 BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command,
47 RecoveryReason, schedule,
48};
49use crate::error::MetaErrorInner;
50use crate::hummock::HummockManagerRef;
51use crate::manager::sink_coordination::SinkCoordinatorManager;
52use crate::manager::{
53 ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
54 MetadataManager,
55};
56use crate::rpc::metrics::GLOBAL_META_METRICS;
57use crate::stream::{ScaleControllerRef, SourceManagerRef};
58use crate::{MetaError, MetaResult};
59
60pub(super) struct GlobalBarrierWorker<C> {
70 enable_recovery: bool,
72
73 periodic_barriers: PeriodicBarriers,
75
76 in_flight_barrier_nums: usize,
78
79 system_enable_per_database_isolation: bool,
81
82 pub(super) context: Arc<C>,
83
84 env: MetaSrvEnv,
85
86 checkpoint_control: CheckpointControl,
87
88 completing_task: CompletingTask,
91
92 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
93
94 active_streaming_nodes: ActiveStreamingWorkerNodes,
95
96 sink_manager: SinkCoordinatorManager,
97
98 control_stream_manager: ControlStreamManager,
99
100 term_id: String,
101}
102
103impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
104 pub(super) async fn new_inner(
105 env: MetaSrvEnv,
106 sink_manager: SinkCoordinatorManager,
107 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
108 context: Arc<C>,
109 ) -> Self {
110 let enable_recovery = env.opts.enable_recovery;
111 let in_flight_barrier_nums = env.opts.in_flight_barrier_nums;
112
113 let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized();
114
115 let control_stream_manager = ControlStreamManager::new(env.clone());
116
117 let reader = env.system_params_reader().await;
118 let system_enable_per_database_isolation = reader.per_database_isolation();
119 let periodic_barriers = PeriodicBarriers::default();
121
122 let checkpoint_control = CheckpointControl::new(env.clone());
123 Self {
124 enable_recovery,
125 periodic_barriers,
126 in_flight_barrier_nums,
127 system_enable_per_database_isolation,
128 context,
129 env,
130 checkpoint_control,
131 completing_task: CompletingTask::None,
132 request_rx,
133 active_streaming_nodes,
134 sink_manager,
135 control_stream_manager,
136 term_id: "uninitialized".into(),
137 }
138 }
139}
140
141impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
142 pub async fn new(
144 scheduled_barriers: schedule::ScheduledBarriers,
145 env: MetaSrvEnv,
146 metadata_manager: MetadataManager,
147 hummock_manager: HummockManagerRef,
148 source_manager: SourceManagerRef,
149 sink_manager: SinkCoordinatorManager,
150 scale_controller: ScaleControllerRef,
151 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
152 barrier_scheduler: schedule::BarrierScheduler,
153 ) -> Self {
154 let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
155
156 let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
157 scheduled_barriers,
158 status,
159 metadata_manager,
160 hummock_manager,
161 source_manager,
162 scale_controller,
163 env.clone(),
164 barrier_scheduler,
165 ));
166
167 Self::new_inner(env, sink_manager, request_rx, context).await
168 }
169
170 pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
171 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
172 let fut = (self.env.await_tree_reg())
173 .register_derived_root("Global Barrier Worker")
174 .instrument(self.run(shutdown_rx));
175 let join_handle = tokio::spawn(fut);
176
177 (join_handle, shutdown_tx)
178 }
179
180 async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
182 let paused = self
183 .env
184 .system_params_reader()
185 .await
186 .pause_on_next_bootstrap();
187 if paused {
188 warn!(
189 "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
190 It will now be reset to `false`. \
191 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
192 PAUSE_ON_NEXT_BOOTSTRAP_KEY
193 );
194 self.env
195 .system_params_manager_impl_ref()
196 .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
197 .await?;
198 }
199 Ok(paused)
200 }
201
202 async fn run(mut self, shutdown_rx: Receiver<()>) {
204 tracing::info!(
205 "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
206 self.enable_recovery,
207 self.in_flight_barrier_nums,
208 );
209
210 if !self.enable_recovery {
211 let job_exist = self
212 .context
213 .metadata_manager
214 .catalog_controller
215 .has_any_streaming_jobs()
216 .await
217 .unwrap();
218 if job_exist {
219 panic!(
220 "Some streaming jobs already exist in meta, please start with recovery enabled \
221 or clean up the metadata using `./risedev clean-data`"
222 );
223 }
224 }
225
226 {
227 let span = tracing::info_span!("bootstrap_recovery");
232 crate::telemetry::report_event(
233 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
234 "normal_recovery",
235 0,
236 None,
237 None,
238 None,
239 );
240
241 let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
242
243 self.recovery(paused, RecoveryReason::Bootstrap)
244 .instrument(span)
245 .await;
246 }
247
248 self.run_inner(shutdown_rx).await
249 }
250}
251
252impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
253 fn enable_per_database_isolation(&self) -> bool {
254 self.system_enable_per_database_isolation && {
255 if let Err(e) =
256 risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()
257 {
258 warn!(error = %e.as_report(), "DatabaseFailureIsolation disabled by license");
259 false
260 } else {
261 true
262 }
263 }
264 }
265
266 pub(super) async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
267 let (local_notification_tx, mut local_notification_rx) =
268 tokio::sync::mpsc::unbounded_channel();
269 self.env
270 .notification_manager()
271 .insert_local_sender(local_notification_tx);
272
273 loop {
275 tokio::select! {
276 biased;
277
278 _ = &mut shutdown_rx => {
280 tracing::info!("Barrier manager is stopped");
281 break;
282 }
283
284 request = self.request_rx.recv() => {
285 if let Some(request) = request {
286 match request {
287 BarrierManagerRequest::GetDdlProgress(result_tx) => {
288 let progress = self.checkpoint_control.gen_ddl_progress();
289 if result_tx.send(progress).is_err() {
290 error!("failed to send get ddl progress");
291 }
292 }BarrierManagerRequest::AdhocRecovery(sender) => {
294 self.adhoc_recovery().await;
295 if sender.send(()).is_err() {
296 warn!("failed to notify finish of adhoc recovery");
297 }
298 }
299 BarrierManagerRequest::UpdateDatabaseBarrier {
300 database_id,
301 barrier_interval_ms,
302 checkpoint_frequency,
303 sender,
304 } => {
305 self.periodic_barriers
306 .update_database_barrier(
307 database_id,
308 barrier_interval_ms,
309 checkpoint_frequency,
310 );
311 if sender.send(()).is_err() {
312 warn!("failed to notify finish of update database barrier");
313 }
314 }
315 }
316 } else {
317 tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
318 return;
319 }
320 }
321
322 changed_worker = self.active_streaming_nodes.changed() => {
323 #[cfg(debug_assertions)]
324 {
325 self.active_streaming_nodes.validate_change().await;
326 }
327
328 info!(?changed_worker, "worker changed");
329
330 if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker {
331 self.control_stream_manager.add_worker(node, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
332 }
333 }
334
335 notification = local_notification_rx.recv() => {
336 let notification = notification.unwrap();
337 if let LocalNotification::SystemParamsChange(p) = notification {
338 {
339 self.periodic_barriers.set_sys_barrier_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
340 self.periodic_barriers
341 .set_sys_checkpoint_frequency(p.checkpoint_frequency());
342 self.system_enable_per_database_isolation = p.per_database_isolation();
343 }
344 }
345 }
346 complete_result = self
347 .completing_task
348 .next_completed_barrier(
349 &mut self.periodic_barriers,
350 &mut self.checkpoint_control,
351 &mut self.control_stream_manager,
352 &self.context,
353 &self.env,
354 ) => {
355 match complete_result {
356 Ok(output) => {
357 self.checkpoint_control.ack_completed(output);
358 }
359 Err(e) => {
360 self.failure_recovery(e).await;
361 }
362 }
363 },
364 event = self.checkpoint_control.next_event() => {
365 let result: MetaResult<()> = try {
366 match event {
367 CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
368 let database_id = entering_initializing.database_id();
369 let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
370 resp.root_err.as_ref().map(|root_err| {
371 (*worker_id, ScoredError {
372 error: Status::internal(&root_err.err_msg),
373 score: Score(root_err.score)
374 })
375 })
376 }));
377 Self::report_collect_failure(&self.env, &error);
378 self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
379 match self.context.reload_database_runtime_info(database_id).await? { Some(runtime_info) => {
380 runtime_info.validate(database_id, &self.active_streaming_nodes).inspect_err(|e| {
381 warn!(database_id = database_id.database_id, err = ?e.as_report(), ?runtime_info, "reloaded database runtime info failed to validate");
382 })?;
383 let workers = InflightFragmentInfo::workers(runtime_info.job_infos.values().flat_map(|job| job.fragment_infos()));
384 for worker_id in workers {
385 if !self.control_stream_manager.is_connected(worker_id) {
386 self.control_stream_manager.try_reconnect_worker(worker_id, entering_initializing.control().inflight_infos(), self.term_id.clone(), &*self.context).await;
387 }
388 }
389 entering_initializing.enter(runtime_info, &mut self.control_stream_manager);
390 } _ => {
391 info!(database_id = database_id.database_id, "database removed after reloading empty runtime info");
392 self.context.mark_ready(MarkReadyOptions::Database(database_id));
394 entering_initializing.remove();
395 }}
396 }
397 CheckpointControlEvent::EnteringRunning(entering_running) => {
398 self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
399 entering_running.enter();
400 }
401 }
402 };
403 if let Err(e) = result {
404 self.failure_recovery(e).await;
405 }
406 }
407 (worker_id, resp_result) = self.control_stream_manager.next_response() => {
408 let result: MetaResult<()> = try {
409 let resp = match resp_result {
410 Err(err) => {
411 let failed_databases = self.checkpoint_control.databases_failed_at_worker_err(worker_id);
412 if !failed_databases.is_empty() {
413 if !self.enable_recovery {
414 panic!("control stream to worker {} failed but recovery not enabled: {:?}", worker_id, err.as_report());
415 }
416 if !self.enable_per_database_isolation() {
417 Err(err.clone())?;
418 }
419 Self::report_collect_failure(&self.env, &err);
420 for database_id in failed_databases {
421 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
422 warn!(worker_id, database_id = database_id.database_id, "database entering recovery on node failure");
423 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
424 self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
425 let output = self.completing_task.wait_completing_task().await?;
427 entering_recovery.enter(output, &mut self.control_stream_manager);
428 }
429 }
430 } else {
431 warn!(worker_id, "no barrier to collect from worker, ignore err");
432 }
433 continue;
434 }
435 Ok(resp) => resp,
436 };
437 match resp {
438 Response::CompleteBarrier(resp) => {
439 self.checkpoint_control.barrier_collected(resp, &mut self.periodic_barriers)?;
440 },
441 Response::ReportDatabaseFailure(resp) => {
442 if !self.enable_recovery {
443 panic!("database failure reported but recovery not enabled: {:?}", resp)
444 }
445 if !self.enable_per_database_isolation() {
446 Err(anyhow!("database {} reset", resp.database_id))?;
447 }
448 let database_id = DatabaseId::new(resp.database_id);
449 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
450 warn!(database_id = database_id.database_id, "database entering recovery");
451 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
452 let output = self.completing_task.wait_completing_task().await?;
454 entering_recovery.enter(output, &mut self.control_stream_manager);
455 }
456 }
457 Response::ResetDatabase(resp) => {
458 self.checkpoint_control.on_reset_database_resp(worker_id, resp);
459 }
460 other @ Response::Init(_) | other @ Response::Shutdown(_) => {
461 Err(anyhow!("get expected response: {:?}", other))?;
462 }
463 }
464 };
465 if let Err(e) = result {
466 self.failure_recovery(e).await;
467 }
468 }
469 new_barrier = self.periodic_barriers.next_barrier(&*self.context),
470 if self
471 .checkpoint_control
472 .can_inject_barrier(self.in_flight_barrier_nums) => {
473 if let Some((Command::CreateStreamingJob { info, .. }, _)) = &new_barrier.command {
474 let worker_ids: HashSet<_> =
475 info.stream_job_fragments.inner
476 .actors_to_create()
477 .flat_map(|(_, _, actors)|
478 actors.map(|(_, worker_id)| worker_id)
479 )
480 .collect();
481 for worker_id in worker_ids {
482 if !self.control_stream_manager.is_connected(worker_id) {
483 self.control_stream_manager.try_reconnect_worker(worker_id, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
484 }
485 }
486 }
487 let database_id = new_barrier.database_id;
488 if let Err(e) = self.checkpoint_control.handle_new_barrier(new_barrier, &mut self.control_stream_manager) {
489 if !self.enable_recovery {
490 panic!(
491 "failed to inject barrier to some databases but recovery not enabled: {:?}", (
492 database_id,
493 e.as_report()
494 )
495 );
496 }
497 let result: MetaResult<_> = try {
498 if !self.enable_per_database_isolation() {
499 let err = anyhow!("failed to inject barrier to databases: {:?}", (database_id, e.as_report()));
500 Err(err)?;
501 } else if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
502 warn!(%database_id, e = %e.as_report(),"database entering recovery on inject failure");
503 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(e).context("inject barrier failure").into()));
504 let output = self.completing_task.wait_completing_task().await?;
506 entering_recovery.enter(output, &mut self.control_stream_manager);
507 }
508 };
509 if let Err(e) = result {
510 self.failure_recovery(e).await;
511 }
512 }
513 }
514 }
515 self.checkpoint_control.update_barrier_nums_metrics();
516 }
517 }
518}
519
520impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
521 pub async fn clear_on_err(&mut self, err: &MetaError) {
523 let is_err = match replace(&mut self.completing_task, CompletingTask::None) {
525 CompletingTask::None => false,
526 CompletingTask::Completing {
527 epochs_to_ack,
528 join_handle,
529 ..
530 } => {
531 info!("waiting for completing command to finish in recovery");
532 match join_handle.await {
533 Err(e) => {
534 warn!(err = %e.as_report(), "failed to join completing task");
535 true
536 }
537 Ok(Err(e)) => {
538 warn!(
539 err = %e.as_report(),
540 "failed to complete barrier during clear"
541 );
542 true
543 }
544 Ok(Ok(hummock_version_stats)) => {
545 self.checkpoint_control
546 .ack_completed(BarrierCompleteOutput {
547 epochs_to_ack,
548 hummock_version_stats,
549 });
550 false
551 }
552 }
553 }
554 CompletingTask::Err(_) => true,
555 };
556 if !is_err {
557 while let Some(task) = self.checkpoint_control.next_complete_barrier_task(None) {
559 let epochs_to_ack = task.epochs_to_ack();
560 match task
561 .complete_barrier(&*self.context, self.env.clone())
562 .await
563 {
564 Ok(hummock_version_stats) => {
565 self.checkpoint_control
566 .ack_completed(BarrierCompleteOutput {
567 epochs_to_ack,
568 hummock_version_stats,
569 });
570 }
571 Err(e) => {
572 error!(
573 err = %e.as_report(),
574 "failed to complete barrier during recovery"
575 );
576 break;
577 }
578 }
579 }
580 }
581 self.checkpoint_control.clear_on_err(err);
582 }
583}
584
585impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
586 async fn failure_recovery(&mut self, err: MetaError) {
588 self.clear_on_err(&err).await;
589
590 if self.enable_recovery {
591 let span = tracing::info_span!(
592 "failure_recovery",
593 error = %err.as_report(),
594 );
595
596 crate::telemetry::report_event(
597 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
598 "failure_recovery",
599 0,
600 None,
601 None,
602 None,
603 );
604
605 let reason = RecoveryReason::Failover(err);
606
607 self.recovery(false, reason).instrument(span).await;
610 } else {
611 panic!(
612 "a streaming error occurred while recovery is disabled, aborting: {:?}",
613 err.as_report()
614 );
615 }
616 }
617
618 async fn adhoc_recovery(&mut self) {
619 let err = MetaErrorInner::AdhocRecovery.into();
620 self.clear_on_err(&err).await;
621
622 let span = tracing::info_span!(
623 "adhoc_recovery",
624 error = %err.as_report(),
625 );
626
627 crate::telemetry::report_event(
628 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
629 "adhoc_recovery",
630 0,
631 None,
632 None,
633 None,
634 );
635
636 self.recovery(false, RecoveryReason::Adhoc)
639 .instrument(span)
640 .await;
641 }
642}
643
644impl<C> GlobalBarrierWorker<C> {
645 pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
647 use risingwave_pb::meta::event_log;
649 let event = event_log::EventCollectBarrierFail {
650 error: error.to_report_string(),
651 };
652 env.event_log_manager_ref()
653 .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
654 }
655}
656
657mod retry_strategy {
658 use std::time::Duration;
659
660 use tokio_retry::strategy::{ExponentialBackoff, jitter};
661
662 const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
664 const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
666
667 pub(crate) type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
686
687 pub(crate) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
688 Box::pin(tokio::time::sleep(duration))
689 }
690
691 pub(crate) type RetryBackoffStrategy =
692 impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
693
694 #[inline(always)]
696 pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> + Send + 'static {
697 ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
698 .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
699 .map(jitter)
700 }
701
702 #[define_opaque(RetryBackoffStrategy)]
703 pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
704 get_retry_strategy().map(get_retry_backoff_future)
705 }
706}
707
708pub(crate) use retry_strategy::*;
709use risingwave_common::error::tonic::extra::{Score, ScoredError};
710use risingwave_meta_model::WorkerId;
711use risingwave_pb::meta::event_log::{Event, EventRecovery};
712
713use crate::barrier::edge_builder::FragmentEdgeBuilder;
714use crate::controller::fragment::InflightFragmentInfo;
715
716impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
717 pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
725 self.control_stream_manager.clear();
727
728 let reason_str = match &recovery_reason {
729 RecoveryReason::Bootstrap => "bootstrap".to_owned(),
730 RecoveryReason::Failover(err) => {
731 format!("failed over: {}", err.as_report())
732 }
733 RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
734 };
735 self.context.abort_and_mark_blocked(None, recovery_reason);
736
737 self.recovery_inner(is_paused, reason_str).await;
738 self.context.mark_ready(MarkReadyOptions::Global {
739 blocked_databases: self.checkpoint_control.recovering_databases().collect(),
740 });
741 }
742
743 #[await_tree::instrument("recovery({recovery_reason})")]
744 async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
745 let event_log_manager_ref = self.env.event_log_manager_ref();
746
747 tracing::info!("recovery start!");
748 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
749 EventRecovery::global_recovery_start(recovery_reason.clone()),
750 )]);
751
752 let retry_strategy = get_retry_strategy();
753
754 let recovery_timer = GLOBAL_META_METRICS
757 .recovery_latency
758 .with_label_values(&["global"])
759 .start_timer();
760
761 let enable_per_database_isolation = self.enable_per_database_isolation();
762
763 let new_state = tokio_retry::Retry::spawn(retry_strategy, || async {
764 self.env.stream_client_pool().invalidate_all();
765 self.context
770 .notify_creating_job_failed(None, recovery_reason.clone())
771 .await;
772 let runtime_info_snapshot = self
773 .context
774 .reload_runtime_info()
775 .await?;
776 runtime_info_snapshot.validate().inspect_err(|e| {
777 warn!(err = ?e.as_report(), ?runtime_info_snapshot, "reloaded runtime info failed to validate");
778 })?;
779 let BarrierWorkerRuntimeInfoSnapshot {
780 active_streaming_nodes,
781 database_job_infos,
782 mut state_table_committed_epochs,
783 mut state_table_log_epochs,
784 mut subscription_infos,
785 stream_actors,
786 fragment_relations,
787 mut source_splits,
788 mut background_jobs,
789 hummock_version_stats,
790 database_infos,
791 mut cdc_table_snapshot_split_assignment,
792 } = runtime_info_snapshot;
793
794 self.sink_manager.reset().await;
795 let term_id = Uuid::new_v4().to_string();
796
797 let mut control_stream_manager = ControlStreamManager::new(self.env.clone());
798 let reset_start_time = Instant::now();
799 let unconnected_worker = control_stream_manager
800 .reset(
801 active_streaming_nodes.current(),
802 term_id.clone(),
803 &*self.context,
804 )
805 .await;
806 info!(elapsed=?reset_start_time.elapsed(), ?unconnected_worker, "control stream reset");
807
808 {
809 let mut builder = FragmentEdgeBuilder::new(database_job_infos.values().flat_map(|info| info.values().flatten()), &control_stream_manager);
810 builder.add_relations(&fragment_relations);
811 let mut edges = builder.build();
812
813 let mut collected_databases = HashMap::new();
814 let mut collecting_databases = HashMap::new();
815 let mut failed_databases = HashSet::new();
816 for (database_id, jobs) in database_job_infos {
817 let result = control_stream_manager.inject_database_initial_barrier(
818 database_id,
819 jobs,
820 &mut state_table_committed_epochs,
821 &mut state_table_log_epochs,
822 &mut edges,
823 &stream_actors,
824 &mut source_splits,
825 &mut background_jobs,
826 subscription_infos.remove(&database_id).unwrap_or_default(),
827 is_paused,
828 &hummock_version_stats,
829 &mut cdc_table_snapshot_split_assignment,
830 );
831 let node_to_collect = match result {
832 Ok(info) => {
833 info
834 }
835 Err(e) => {
836 warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
837 assert!(failed_databases.insert(database_id), "non-duplicate");
838 continue;
839 }
840 };
841 if !node_to_collect.is_collected() {
842 assert!(collecting_databases.insert(database_id, node_to_collect).is_none());
843 } else {
844 warn!(database_id = database_id.database_id, "database has no node to inject initial barrier");
845 assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
846 }
847 }
848 while !collecting_databases.is_empty() {
849 let (worker_id, result) =
850 control_stream_manager.next_response().await;
851 let resp = match result {
852 Err(e) => {
853 warn!(worker_id, err = %e.as_report(), "worker node failure during recovery");
854 for (failed_database_id, _) in collecting_databases.extract_if(|_, node_to_collect| {
855 !node_to_collect.is_valid_after_worker_err(worker_id)
856 }) {
857 warn!(%failed_database_id, worker_id, "database failed to recovery in global recovery due to worker node err");
858 assert!(failed_databases.insert(failed_database_id));
859 }
860 continue;
861 }
862 Ok(resp) => {
863 match resp {
864 Response::CompleteBarrier(resp) => {
865 resp
866 }
867 Response::ReportDatabaseFailure(resp) => {
868 let database_id = DatabaseId::new(resp.database_id);
869 if collecting_databases.remove(&database_id).is_some() {
870 warn!(%database_id, worker_id, "database reset during global recovery");
871 assert!(failed_databases.insert(database_id));
872 } else if collected_databases.remove(&database_id).is_some() {
873 warn!(%database_id, worker_id, "database initialized but later reset during global recovery");
874 assert!(failed_databases.insert(database_id));
875 } else {
876 assert!(failed_databases.contains(&database_id));
877 }
878 continue;
879 }
880 other @ (Response::Init(_) | Response::Shutdown(_) | Response::ResetDatabase(_)) => {
881 return Err(anyhow!("get unexpected resp {:?}", other).into());
882 }
883 }
884 }
885 };
886 assert_eq!(worker_id, resp.worker_id as WorkerId);
887 let database_id = DatabaseId::new(resp.database_id);
888 if failed_databases.contains(&database_id) {
889 assert!(!collecting_databases.contains_key(&database_id));
890 continue;
892 }
893 let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
894 unreachable!("should exist")
895 };
896 let node_to_collect = entry.get_mut();
897 node_to_collect.collect_resp(resp);
898 if node_to_collect.is_collected() {
899 let node_to_collect = entry.remove();
900 assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
901 }
902 }
903 debug!("collected initial barrier");
904 if !stream_actors.is_empty() {
905 warn!(actor_ids = ?stream_actors.keys().collect_vec(), "unused stream actors in recovery");
906 }
907 if !source_splits.is_empty() {
908 warn!(actor_ids = ?source_splits.keys().collect_vec(), "unused actor source splits in recovery");
909 }
910 if !background_jobs.is_empty() {
911 warn!(job_ids = ?background_jobs.keys().collect_vec(), "unused recovered background mview in recovery");
912 }
913 if !subscription_infos.is_empty() {
914 warn!(?subscription_infos, "unused subscription infos in recovery");
915 }
916 if !state_table_committed_epochs.is_empty() {
917 warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
918 }
919 if !enable_per_database_isolation && !failed_databases.is_empty() {
920 return Err(anyhow!(
921 "global recovery failed due to failure of databases {:?}",
922 failed_databases.iter().map(|database_id| database_id.database_id).collect_vec()).into()
923 );
924 }
925 let checkpoint_control = CheckpointControl::recover(
926 collected_databases,
927 failed_databases,
928 &mut control_stream_manager,
929 hummock_version_stats,
930 self.env.clone(),
931 );
932
933 let reader = self.env.system_params_reader().await;
934 let checkpoint_frequency = reader.checkpoint_frequency();
935 let barrier_interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
936 let periodic_barriers = PeriodicBarriers::new(
937 barrier_interval,
938 checkpoint_frequency,
939 database_infos,
940 );
941
942 Ok((
943 active_streaming_nodes,
944 control_stream_manager,
945 checkpoint_control,
946 term_id,
947 periodic_barriers,
948 ))
949 }
950 }.inspect_err(|err: &MetaError| {
951 tracing::error!(error = %err.as_report(), "recovery failed");
952 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
953 EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
954 )]);
955 GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
956 }))
957 .instrument(tracing::info_span!("recovery_attempt"))
958 .await
959 .expect("Retry until recovery success.");
960
961 let duration = recovery_timer.stop_and_record();
962
963 (
964 self.active_streaming_nodes,
965 self.control_stream_manager,
966 self.checkpoint_control,
967 self.term_id,
968 self.periodic_barriers,
969 ) = new_state;
970
971 tracing::info!("recovery success");
972
973 let recovering_databases = self
974 .checkpoint_control
975 .recovering_databases()
976 .map(|database| database.database_id)
977 .collect_vec();
978 let running_databases = self
979 .checkpoint_control
980 .running_databases()
981 .map(|database| database.database_id)
982 .collect_vec();
983
984 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
985 EventRecovery::global_recovery_success(
986 recovery_reason.clone(),
987 duration as f32,
988 running_databases,
989 recovering_databases,
990 ),
991 )]);
992
993 self.env
994 .notification_manager()
995 .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
996 self.env
997 .notification_manager()
998 .notify_compute_without_version(Operation::Update, Info::Recovery(Recovery {}));
999 }
1000}