1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::anyhow;
22use arc_swap::ArcSwap;
23use futures::TryFutureExt;
24use itertools::Itertools;
25use risingwave_common::catalog::DatabaseId;
26use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_pb::meta::Recovery;
29use risingwave_pb::meta::subscribe_response::{Info, Operation};
30use risingwave_pb::stream_service::streaming_control_stream_response::Response;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tokio::sync::oneshot::{Receiver, Sender};
34use tokio::task::JoinHandle;
35use tokio::time::Instant;
36use tonic::Status;
37use tracing::{Instrument, debug, error, info, warn};
38use uuid::Uuid;
39
40use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
41use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
42use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
43use crate::barrier::rpc::{ControlStreamManager, merge_node_rpc_errors};
44use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
45use crate::barrier::{
46 BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command,
47 RecoveryReason, schedule,
48};
49use crate::error::MetaErrorInner;
50use crate::hummock::HummockManagerRef;
51use crate::manager::sink_coordination::SinkCoordinatorManager;
52use crate::manager::{
53 ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
54 MetadataManager,
55};
56use crate::rpc::metrics::GLOBAL_META_METRICS;
57use crate::stream::{ScaleControllerRef, SourceManagerRef};
58use crate::{MetaError, MetaResult};
59
60pub(super) struct GlobalBarrierWorker<C> {
70 enable_recovery: bool,
72
73 periodic_barriers: PeriodicBarriers,
75
76 system_enable_per_database_isolation: bool,
78
79 pub(super) context: Arc<C>,
80
81 env: MetaSrvEnv,
82
83 checkpoint_control: CheckpointControl,
84
85 completing_task: CompletingTask,
88
89 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
90
91 active_streaming_nodes: ActiveStreamingWorkerNodes,
92
93 sink_manager: SinkCoordinatorManager,
94
95 control_stream_manager: ControlStreamManager,
96
97 term_id: String,
98}
99
100impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
101 pub(super) async fn new_inner(
102 env: MetaSrvEnv,
103 sink_manager: SinkCoordinatorManager,
104 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
105 context: Arc<C>,
106 ) -> Self {
107 let enable_recovery = env.opts.enable_recovery;
108
109 let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized();
110
111 let control_stream_manager = ControlStreamManager::new(env.clone());
112
113 let reader = env.system_params_reader().await;
114 let system_enable_per_database_isolation = reader.per_database_isolation();
115 let periodic_barriers = PeriodicBarriers::default();
117
118 let checkpoint_control = CheckpointControl::new(env.clone());
119 Self {
120 enable_recovery,
121 periodic_barriers,
122 system_enable_per_database_isolation,
123 context,
124 env,
125 checkpoint_control,
126 completing_task: CompletingTask::None,
127 request_rx,
128 active_streaming_nodes,
129 sink_manager,
130 control_stream_manager,
131 term_id: "uninitialized".into(),
132 }
133 }
134}
135
136impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
137 pub async fn new(
139 scheduled_barriers: schedule::ScheduledBarriers,
140 env: MetaSrvEnv,
141 metadata_manager: MetadataManager,
142 hummock_manager: HummockManagerRef,
143 source_manager: SourceManagerRef,
144 sink_manager: SinkCoordinatorManager,
145 scale_controller: ScaleControllerRef,
146 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
147 barrier_scheduler: schedule::BarrierScheduler,
148 ) -> Self {
149 let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
150
151 let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
152 scheduled_barriers,
153 status,
154 metadata_manager,
155 hummock_manager,
156 source_manager,
157 scale_controller,
158 env.clone(),
159 barrier_scheduler,
160 ));
161
162 Self::new_inner(env, sink_manager, request_rx, context).await
163 }
164
165 pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
166 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
167 let fut = (self.env.await_tree_reg())
168 .register_derived_root("Global Barrier Worker")
169 .instrument(self.run(shutdown_rx));
170 let join_handle = tokio::spawn(fut);
171
172 (join_handle, shutdown_tx)
173 }
174
175 async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
177 let paused = self
178 .env
179 .system_params_reader()
180 .await
181 .pause_on_next_bootstrap();
182 if paused {
183 warn!(
184 "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
185 It will now be reset to `false`. \
186 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
187 PAUSE_ON_NEXT_BOOTSTRAP_KEY
188 );
189 self.env
190 .system_params_manager_impl_ref()
191 .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
192 .await?;
193 }
194 Ok(paused)
195 }
196
197 async fn run(mut self, shutdown_rx: Receiver<()>) {
199 tracing::info!(
200 "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
201 self.enable_recovery,
202 self.checkpoint_control.in_flight_barrier_nums,
203 );
204
205 if !self.enable_recovery {
206 let job_exist = self
207 .context
208 .metadata_manager
209 .catalog_controller
210 .has_any_streaming_jobs()
211 .await
212 .unwrap();
213 if job_exist {
214 panic!(
215 "Some streaming jobs already exist in meta, please start with recovery enabled \
216 or clean up the metadata using `./risedev clean-data`"
217 );
218 }
219 }
220
221 {
222 let span = tracing::info_span!("bootstrap_recovery");
227 crate::telemetry::report_event(
228 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
229 "normal_recovery",
230 0,
231 None,
232 None,
233 None,
234 );
235
236 let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
237
238 self.recovery(paused, RecoveryReason::Bootstrap)
239 .instrument(span)
240 .await;
241 }
242
243 self.run_inner(shutdown_rx).await
244 }
245}
246
247impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
248 fn enable_per_database_isolation(&self) -> bool {
249 self.system_enable_per_database_isolation && {
250 if let Err(e) =
251 risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()
252 {
253 warn!(error = %e.as_report(), "DatabaseFailureIsolation disabled by license");
254 false
255 } else {
256 true
257 }
258 }
259 }
260
261 pub(super) async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
262 let (local_notification_tx, mut local_notification_rx) =
263 tokio::sync::mpsc::unbounded_channel();
264 self.env
265 .notification_manager()
266 .insert_local_sender(local_notification_tx);
267
268 loop {
270 tokio::select! {
271 biased;
272
273 _ = &mut shutdown_rx => {
275 tracing::info!("Barrier manager is stopped");
276 break;
277 }
278
279 request = self.request_rx.recv() => {
280 if let Some(request) = request {
281 match request {
282 BarrierManagerRequest::GetDdlProgress(result_tx) => {
283 let progress = self.checkpoint_control.gen_ddl_progress();
284 if result_tx.send(progress).is_err() {
285 error!("failed to send get ddl progress");
286 }
287 }BarrierManagerRequest::AdhocRecovery(sender) => {
289 self.adhoc_recovery().await;
290 if sender.send(()).is_err() {
291 warn!("failed to notify finish of adhoc recovery");
292 }
293 }
294 BarrierManagerRequest::UpdateDatabaseBarrier {
295 database_id,
296 barrier_interval_ms,
297 checkpoint_frequency,
298 sender,
299 } => {
300 self.periodic_barriers
301 .update_database_barrier(
302 database_id,
303 barrier_interval_ms,
304 checkpoint_frequency,
305 );
306 if sender.send(()).is_err() {
307 warn!("failed to notify finish of update database barrier");
308 }
309 }
310 }
311 } else {
312 tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
313 return;
314 }
315 }
316
317 changed_worker = self.active_streaming_nodes.changed() => {
318 #[cfg(debug_assertions)]
319 {
320 self.active_streaming_nodes.validate_change().await;
321 }
322
323 info!(?changed_worker, "worker changed");
324
325 if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker {
326 self.control_stream_manager.add_worker(node, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
327 }
328 }
329
330 notification = local_notification_rx.recv() => {
331 let notification = notification.unwrap();
332 if let LocalNotification::SystemParamsChange(p) = notification {
333 {
334 self.periodic_barriers.set_sys_barrier_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
335 self.periodic_barriers
336 .set_sys_checkpoint_frequency(p.checkpoint_frequency());
337 self.system_enable_per_database_isolation = p.per_database_isolation();
338 }
339 }
340 }
341 complete_result = self
342 .completing_task
343 .next_completed_barrier(
344 &mut self.periodic_barriers,
345 &mut self.checkpoint_control,
346 &mut self.control_stream_manager,
347 &self.context,
348 &self.env,
349 ) => {
350 match complete_result {
351 Ok(output) => {
352 self.checkpoint_control.ack_completed(output);
353 }
354 Err(e) => {
355 self.failure_recovery(e).await;
356 }
357 }
358 },
359 event = self.checkpoint_control.next_event() => {
360 let result: MetaResult<()> = try {
361 match event {
362 CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
363 let database_id = entering_initializing.database_id();
364 let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
365 resp.root_err.as_ref().map(|root_err| {
366 (*worker_id, ScoredError {
367 error: Status::internal(&root_err.err_msg),
368 score: Score(root_err.score)
369 })
370 })
371 }));
372 Self::report_collect_failure(&self.env, &error);
373 self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
374 match self.context.reload_database_runtime_info(database_id).await? { Some(runtime_info) => {
375 runtime_info.validate(database_id, &self.active_streaming_nodes).inspect_err(|e| {
376 warn!(database_id = database_id.database_id, err = ?e.as_report(), ?runtime_info, "reloaded database runtime info failed to validate");
377 })?;
378 let workers = InflightFragmentInfo::workers(runtime_info.job_infos.values().flat_map(|job| job.fragment_infos()));
379 for worker_id in workers {
380 if !self.control_stream_manager.is_connected(worker_id) {
381 self.control_stream_manager.try_reconnect_worker(worker_id, entering_initializing.control().inflight_infos(), self.term_id.clone(), &*self.context).await;
382 }
383 }
384 entering_initializing.enter(runtime_info, &mut self.control_stream_manager);
385 } _ => {
386 info!(database_id = database_id.database_id, "database removed after reloading empty runtime info");
387 self.context.mark_ready(MarkReadyOptions::Database(database_id));
389 entering_initializing.remove();
390 }}
391 }
392 CheckpointControlEvent::EnteringRunning(entering_running) => {
393 self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
394 entering_running.enter();
395 }
396 }
397 };
398 if let Err(e) = result {
399 self.failure_recovery(e).await;
400 }
401 }
402 (worker_id, resp_result) = self.control_stream_manager.next_response() => {
403 let result: MetaResult<()> = try {
404 let resp = match resp_result {
405 Err(err) => {
406 let failed_databases = self.checkpoint_control.databases_failed_at_worker_err(worker_id);
407 if !failed_databases.is_empty() {
408 if !self.enable_recovery {
409 panic!("control stream to worker {} failed but recovery not enabled: {:?}", worker_id, err.as_report());
410 }
411 if !self.enable_per_database_isolation() {
412 Err(err.clone())?;
413 }
414 Self::report_collect_failure(&self.env, &err);
415 for database_id in failed_databases {
416 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
417 warn!(worker_id, database_id = database_id.database_id, "database entering recovery on node failure");
418 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
419 self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
420 let output = self.completing_task.wait_completing_task().await?;
422 entering_recovery.enter(output, &mut self.control_stream_manager);
423 }
424 }
425 } else {
426 warn!(worker_id, "no barrier to collect from worker, ignore err");
427 }
428 continue;
429 }
430 Ok(resp) => resp,
431 };
432 match resp {
433 Response::CompleteBarrier(resp) => {
434 self.checkpoint_control.barrier_collected(resp, &mut self.periodic_barriers)?;
435 },
436 Response::ReportDatabaseFailure(resp) => {
437 if !self.enable_recovery {
438 panic!("database failure reported but recovery not enabled: {:?}", resp)
439 }
440 if !self.enable_per_database_isolation() {
441 Err(anyhow!("database {} reset", resp.database_id))?;
442 }
443 let database_id = DatabaseId::new(resp.database_id);
444 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
445 warn!(database_id = database_id.database_id, "database entering recovery");
446 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
447 let output = self.completing_task.wait_completing_task().await?;
449 entering_recovery.enter(output, &mut self.control_stream_manager);
450 }
451 }
452 Response::ResetDatabase(resp) => {
453 self.checkpoint_control.on_reset_database_resp(worker_id, resp);
454 }
455 other @ Response::Init(_) | other @ Response::Shutdown(_) => {
456 Err(anyhow!("get expected response: {:?}", other))?;
457 }
458 }
459 };
460 if let Err(e) = result {
461 self.failure_recovery(e).await;
462 }
463 }
464 new_barrier = self.periodic_barriers.next_barrier(&*self.context) => {
465 if let Some((Command::CreateStreamingJob { info, .. }, _)) = &new_barrier.command {
466 let worker_ids: HashSet<_> =
467 info.stream_job_fragments.inner
468 .actors_to_create()
469 .flat_map(|(_, _, actors)|
470 actors.map(|(_, worker_id)| worker_id)
471 )
472 .collect();
473 for worker_id in worker_ids {
474 if !self.control_stream_manager.is_connected(worker_id) {
475 self.control_stream_manager.try_reconnect_worker(worker_id, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
476 }
477 }
478 }
479 let database_id = new_barrier.database_id;
480 if let Err(e) = self.checkpoint_control.handle_new_barrier(new_barrier, &mut self.control_stream_manager) {
481 if !self.enable_recovery {
482 panic!(
483 "failed to inject barrier to some databases but recovery not enabled: {:?}", (
484 database_id,
485 e.as_report()
486 )
487 );
488 }
489 let result: MetaResult<_> = try {
490 if !self.enable_per_database_isolation() {
491 let err = anyhow!("failed to inject barrier to databases: {:?}", (database_id, e.as_report()));
492 Err(err)?;
493 } else if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
494 warn!(%database_id, e = %e.as_report(),"database entering recovery on inject failure");
495 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(e).context("inject barrier failure").into()));
496 let output = self.completing_task.wait_completing_task().await?;
498 entering_recovery.enter(output, &mut self.control_stream_manager);
499 }
500 };
501 if let Err(e) = result {
502 self.failure_recovery(e).await;
503 }
504 }
505 }
506 }
507 self.checkpoint_control.update_barrier_nums_metrics();
508 }
509 }
510}
511
512impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
513 pub async fn clear_on_err(&mut self, err: &MetaError) {
515 let is_err = match replace(&mut self.completing_task, CompletingTask::None) {
517 CompletingTask::None => false,
518 CompletingTask::Completing {
519 epochs_to_ack,
520 join_handle,
521 ..
522 } => {
523 info!("waiting for completing command to finish in recovery");
524 match join_handle.await {
525 Err(e) => {
526 warn!(err = %e.as_report(), "failed to join completing task");
527 true
528 }
529 Ok(Err(e)) => {
530 warn!(
531 err = %e.as_report(),
532 "failed to complete barrier during clear"
533 );
534 true
535 }
536 Ok(Ok(hummock_version_stats)) => {
537 self.checkpoint_control
538 .ack_completed(BarrierCompleteOutput {
539 epochs_to_ack,
540 hummock_version_stats,
541 });
542 false
543 }
544 }
545 }
546 CompletingTask::Err(_) => true,
547 };
548 if !is_err {
549 while let Some(task) = self.checkpoint_control.next_complete_barrier_task(None) {
551 let epochs_to_ack = task.epochs_to_ack();
552 match task
553 .complete_barrier(&*self.context, self.env.clone())
554 .await
555 {
556 Ok(hummock_version_stats) => {
557 self.checkpoint_control
558 .ack_completed(BarrierCompleteOutput {
559 epochs_to_ack,
560 hummock_version_stats,
561 });
562 }
563 Err(e) => {
564 error!(
565 err = %e.as_report(),
566 "failed to complete barrier during recovery"
567 );
568 break;
569 }
570 }
571 }
572 }
573 self.checkpoint_control.clear_on_err(err);
574 }
575}
576
577impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
578 async fn failure_recovery(&mut self, err: MetaError) {
580 self.clear_on_err(&err).await;
581
582 if self.enable_recovery {
583 let span = tracing::info_span!(
584 "failure_recovery",
585 error = %err.as_report(),
586 );
587
588 crate::telemetry::report_event(
589 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
590 "failure_recovery",
591 0,
592 None,
593 None,
594 None,
595 );
596
597 let reason = RecoveryReason::Failover(err);
598
599 self.recovery(false, reason).instrument(span).await;
602 } else {
603 panic!(
604 "a streaming error occurred while recovery is disabled, aborting: {:?}",
605 err.as_report()
606 );
607 }
608 }
609
610 async fn adhoc_recovery(&mut self) {
611 let err = MetaErrorInner::AdhocRecovery.into();
612 self.clear_on_err(&err).await;
613
614 let span = tracing::info_span!(
615 "adhoc_recovery",
616 error = %err.as_report(),
617 );
618
619 crate::telemetry::report_event(
620 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
621 "adhoc_recovery",
622 0,
623 None,
624 None,
625 None,
626 );
627
628 self.recovery(false, RecoveryReason::Adhoc)
631 .instrument(span)
632 .await;
633 }
634}
635
636impl<C> GlobalBarrierWorker<C> {
637 pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
639 use risingwave_pb::meta::event_log;
641 let event = event_log::EventCollectBarrierFail {
642 error: error.to_report_string(),
643 };
644 env.event_log_manager_ref()
645 .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
646 }
647}
648
649mod retry_strategy {
650 use std::time::Duration;
651
652 use tokio_retry::strategy::{ExponentialBackoff, jitter};
653
654 const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
656 const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
658
659 pub(crate) type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
678
679 pub(crate) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
680 Box::pin(tokio::time::sleep(duration))
681 }
682
683 pub(crate) type RetryBackoffStrategy =
684 impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
685
686 #[inline(always)]
688 pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> + Send + 'static {
689 ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
690 .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
691 .map(jitter)
692 }
693
694 #[define_opaque(RetryBackoffStrategy)]
695 pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
696 get_retry_strategy().map(get_retry_backoff_future)
697 }
698}
699
700pub(crate) use retry_strategy::*;
701use risingwave_common::error::tonic::extra::{Score, ScoredError};
702use risingwave_meta_model::WorkerId;
703use risingwave_pb::meta::event_log::{Event, EventRecovery};
704
705use crate::barrier::edge_builder::FragmentEdgeBuilder;
706use crate::controller::fragment::InflightFragmentInfo;
707
708impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
709 pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
717 self.control_stream_manager.clear();
719
720 let reason_str = match &recovery_reason {
721 RecoveryReason::Bootstrap => "bootstrap".to_owned(),
722 RecoveryReason::Failover(err) => {
723 format!("failed over: {}", err.as_report())
724 }
725 RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
726 };
727 self.context.abort_and_mark_blocked(None, recovery_reason);
728
729 self.recovery_inner(is_paused, reason_str).await;
730 self.context.mark_ready(MarkReadyOptions::Global {
731 blocked_databases: self.checkpoint_control.recovering_databases().collect(),
732 });
733 }
734
735 #[await_tree::instrument("recovery({recovery_reason})")]
736 async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
737 let event_log_manager_ref = self.env.event_log_manager_ref();
738
739 tracing::info!("recovery start!");
740 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
741 EventRecovery::global_recovery_start(recovery_reason.clone()),
742 )]);
743
744 let retry_strategy = get_retry_strategy();
745
746 let recovery_timer = GLOBAL_META_METRICS
749 .recovery_latency
750 .with_label_values(&["global"])
751 .start_timer();
752
753 let enable_per_database_isolation = self.enable_per_database_isolation();
754
755 let new_state = tokio_retry::Retry::spawn(retry_strategy, || async {
756 self.env.stream_client_pool().invalidate_all();
757 self.context
762 .notify_creating_job_failed(None, recovery_reason.clone())
763 .await;
764 let runtime_info_snapshot = self
765 .context
766 .reload_runtime_info()
767 .await?;
768 runtime_info_snapshot.validate().inspect_err(|e| {
769 warn!(err = ?e.as_report(), ?runtime_info_snapshot, "reloaded runtime info failed to validate");
770 })?;
771 let BarrierWorkerRuntimeInfoSnapshot {
772 active_streaming_nodes,
773 database_job_infos,
774 mut state_table_committed_epochs,
775 mut state_table_log_epochs,
776 mut subscription_infos,
777 stream_actors,
778 fragment_relations,
779 mut source_splits,
780 mut background_jobs,
781 hummock_version_stats,
782 database_infos,
783 mut cdc_table_snapshot_split_assignment,
784 } = runtime_info_snapshot;
785
786 self.sink_manager.reset().await;
787 let term_id = Uuid::new_v4().to_string();
788
789 let mut control_stream_manager = ControlStreamManager::new(self.env.clone());
790 let reset_start_time = Instant::now();
791 let unconnected_worker = control_stream_manager
792 .reset(
793 active_streaming_nodes.current(),
794 term_id.clone(),
795 &*self.context,
796 )
797 .await;
798 info!(elapsed=?reset_start_time.elapsed(), ?unconnected_worker, "control stream reset");
799
800 {
801 let mut builder = FragmentEdgeBuilder::new(database_job_infos.values().flat_map(|info| info.values().flatten()), &control_stream_manager);
802 builder.add_relations(&fragment_relations);
803 let mut edges = builder.build();
804
805 let mut collected_databases = HashMap::new();
806 let mut collecting_databases = HashMap::new();
807 let mut failed_databases = HashSet::new();
808 for (database_id, jobs) in database_job_infos {
809 let result = control_stream_manager.inject_database_initial_barrier(
810 database_id,
811 jobs,
812 &mut state_table_committed_epochs,
813 &mut state_table_log_epochs,
814 &mut edges,
815 &stream_actors,
816 &mut source_splits,
817 &mut background_jobs,
818 subscription_infos.remove(&database_id).unwrap_or_default(),
819 is_paused,
820 &hummock_version_stats,
821 &mut cdc_table_snapshot_split_assignment,
822 );
823 let node_to_collect = match result {
824 Ok(info) => {
825 info
826 }
827 Err(e) => {
828 warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
829 assert!(failed_databases.insert(database_id), "non-duplicate");
830 continue;
831 }
832 };
833 if !node_to_collect.is_collected() {
834 assert!(collecting_databases.insert(database_id, node_to_collect).is_none());
835 } else {
836 warn!(database_id = database_id.database_id, "database has no node to inject initial barrier");
837 assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
838 }
839 }
840 while !collecting_databases.is_empty() {
841 let (worker_id, result) =
842 control_stream_manager.next_response().await;
843 let resp = match result {
844 Err(e) => {
845 warn!(worker_id, err = %e.as_report(), "worker node failure during recovery");
846 for (failed_database_id, _) in collecting_databases.extract_if(|_, node_to_collect| {
847 !node_to_collect.is_valid_after_worker_err(worker_id)
848 }) {
849 warn!(%failed_database_id, worker_id, "database failed to recovery in global recovery due to worker node err");
850 assert!(failed_databases.insert(failed_database_id));
851 }
852 continue;
853 }
854 Ok(resp) => {
855 match resp {
856 Response::CompleteBarrier(resp) => {
857 resp
858 }
859 Response::ReportDatabaseFailure(resp) => {
860 let database_id = DatabaseId::new(resp.database_id);
861 if collecting_databases.remove(&database_id).is_some() {
862 warn!(%database_id, worker_id, "database reset during global recovery");
863 assert!(failed_databases.insert(database_id));
864 } else if collected_databases.remove(&database_id).is_some() {
865 warn!(%database_id, worker_id, "database initialized but later reset during global recovery");
866 assert!(failed_databases.insert(database_id));
867 } else {
868 assert!(failed_databases.contains(&database_id));
869 }
870 continue;
871 }
872 other @ (Response::Init(_) | Response::Shutdown(_) | Response::ResetDatabase(_)) => {
873 return Err(anyhow!("get unexpected resp {:?}", other).into());
874 }
875 }
876 }
877 };
878 assert_eq!(worker_id, resp.worker_id as WorkerId);
879 let database_id = DatabaseId::new(resp.database_id);
880 if failed_databases.contains(&database_id) {
881 assert!(!collecting_databases.contains_key(&database_id));
882 continue;
884 }
885 let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
886 unreachable!("should exist")
887 };
888 let node_to_collect = entry.get_mut();
889 node_to_collect.collect_resp(resp);
890 if node_to_collect.is_collected() {
891 let node_to_collect = entry.remove();
892 assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
893 }
894 }
895 debug!("collected initial barrier");
896 if !stream_actors.is_empty() {
897 warn!(actor_ids = ?stream_actors.keys().collect_vec(), "unused stream actors in recovery");
898 }
899 if !source_splits.is_empty() {
900 warn!(actor_ids = ?source_splits.keys().collect_vec(), "unused actor source splits in recovery");
901 }
902 if !background_jobs.is_empty() {
903 warn!(job_ids = ?background_jobs.keys().collect_vec(), "unused recovered background mview in recovery");
904 }
905 if !subscription_infos.is_empty() {
906 warn!(?subscription_infos, "unused subscription infos in recovery");
907 }
908 if !state_table_committed_epochs.is_empty() {
909 warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
910 }
911 if !enable_per_database_isolation && !failed_databases.is_empty() {
912 return Err(anyhow!(
913 "global recovery failed due to failure of databases {:?}",
914 failed_databases.iter().map(|database_id| database_id.database_id).collect_vec()).into()
915 );
916 }
917 let checkpoint_control = CheckpointControl::recover(
918 collected_databases,
919 failed_databases,
920 &mut control_stream_manager,
921 hummock_version_stats,
922 self.env.clone(),
923 );
924
925 let reader = self.env.system_params_reader().await;
926 let checkpoint_frequency = reader.checkpoint_frequency();
927 let barrier_interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
928 let periodic_barriers = PeriodicBarriers::new(
929 barrier_interval,
930 checkpoint_frequency,
931 database_infos,
932 );
933
934 Ok((
935 active_streaming_nodes,
936 control_stream_manager,
937 checkpoint_control,
938 term_id,
939 periodic_barriers,
940 ))
941 }
942 }.inspect_err(|err: &MetaError| {
943 tracing::error!(error = %err.as_report(), "recovery failed");
944 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
945 EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
946 )]);
947 GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
948 }))
949 .instrument(tracing::info_span!("recovery_attempt"))
950 .await
951 .expect("Retry until recovery success.");
952
953 let duration = recovery_timer.stop_and_record();
954
955 (
956 self.active_streaming_nodes,
957 self.control_stream_manager,
958 self.checkpoint_control,
959 self.term_id,
960 self.periodic_barriers,
961 ) = new_state;
962
963 tracing::info!("recovery success");
964
965 let recovering_databases = self
966 .checkpoint_control
967 .recovering_databases()
968 .map(|database| database.database_id)
969 .collect_vec();
970 let running_databases = self
971 .checkpoint_control
972 .running_databases()
973 .map(|database| database.database_id)
974 .collect_vec();
975
976 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
977 EventRecovery::global_recovery_success(
978 recovery_reason.clone(),
979 duration as f32,
980 running_databases,
981 recovering_databases,
982 ),
983 )]);
984
985 self.env
986 .notification_manager()
987 .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
988 self.env
989 .notification_manager()
990 .notify_compute_without_version(Operation::Update, Info::Recovery(Recovery {}));
991 }
992}