1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::anyhow;
22use arc_swap::ArcSwap;
23use futures::TryFutureExt;
24use itertools::Itertools;
25use risingwave_common::catalog::DatabaseId;
26use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_pb::meta::Recovery;
29use risingwave_pb::meta::subscribe_response::{Info, Operation};
30use risingwave_pb::stream_service::streaming_control_stream_response::Response;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tokio::sync::oneshot::{Receiver, Sender};
34use tokio::task::JoinHandle;
35use tokio::time::Instant;
36use tonic::Status;
37use tracing::{Instrument, debug, error, info, warn};
38use uuid::Uuid;
39
40use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
41use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
42use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
43use crate::barrier::rpc::{ControlStreamManager, merge_node_rpc_errors};
44use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
45use crate::barrier::{
46 BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command,
47 RecoveryReason, schedule,
48};
49use crate::error::MetaErrorInner;
50use crate::hummock::HummockManagerRef;
51use crate::manager::sink_coordination::SinkCoordinatorManager;
52use crate::manager::{
53 ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
54 MetadataManager,
55};
56use crate::rpc::metrics::GLOBAL_META_METRICS;
57use crate::stream::{ScaleControllerRef, SourceManagerRef};
58use crate::{MetaError, MetaResult};
59
60pub(super) struct GlobalBarrierWorker<C> {
70 enable_recovery: bool,
72
73 periodic_barriers: PeriodicBarriers,
75
76 in_flight_barrier_nums: usize,
78
79 system_enable_per_database_isolation: bool,
81
82 pub(super) context: Arc<C>,
83
84 env: MetaSrvEnv,
85
86 checkpoint_control: CheckpointControl,
87
88 completing_task: CompletingTask,
91
92 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
93
94 active_streaming_nodes: ActiveStreamingWorkerNodes,
95
96 sink_manager: SinkCoordinatorManager,
97
98 control_stream_manager: ControlStreamManager,
99
100 term_id: String,
101}
102
103impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
104 pub(super) async fn new_inner(
105 env: MetaSrvEnv,
106 sink_manager: SinkCoordinatorManager,
107 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
108 context: Arc<C>,
109 ) -> Self {
110 let enable_recovery = env.opts.enable_recovery;
111 let in_flight_barrier_nums = env.opts.in_flight_barrier_nums;
112
113 let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized();
114
115 let control_stream_manager = ControlStreamManager::new(env.clone());
116
117 let reader = env.system_params_reader().await;
118 let system_enable_per_database_isolation = reader.per_database_isolation();
119 let periodic_barriers = PeriodicBarriers::default();
121
122 let checkpoint_control = CheckpointControl::new(env.clone());
123 Self {
124 enable_recovery,
125 periodic_barriers,
126 in_flight_barrier_nums,
127 system_enable_per_database_isolation,
128 context,
129 env,
130 checkpoint_control,
131 completing_task: CompletingTask::None,
132 request_rx,
133 active_streaming_nodes,
134 sink_manager,
135 control_stream_manager,
136 term_id: "uninitialized".into(),
137 }
138 }
139}
140
141impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
142 pub async fn new(
144 scheduled_barriers: schedule::ScheduledBarriers,
145 env: MetaSrvEnv,
146 metadata_manager: MetadataManager,
147 hummock_manager: HummockManagerRef,
148 source_manager: SourceManagerRef,
149 sink_manager: SinkCoordinatorManager,
150 scale_controller: ScaleControllerRef,
151 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
152 ) -> Self {
153 let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
154
155 let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
156 scheduled_barriers,
157 status,
158 metadata_manager,
159 hummock_manager,
160 source_manager,
161 scale_controller,
162 env.clone(),
163 ));
164
165 Self::new_inner(env, sink_manager, request_rx, context).await
166 }
167
168 pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
169 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
170 let fut = (self.env.await_tree_reg())
171 .register_derived_root("Global Barrier Worker")
172 .instrument(self.run(shutdown_rx));
173 let join_handle = tokio::spawn(fut);
174
175 (join_handle, shutdown_tx)
176 }
177
178 async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
180 let paused = self
181 .env
182 .system_params_reader()
183 .await
184 .pause_on_next_bootstrap();
185 if paused {
186 warn!(
187 "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
188 It will now be reset to `false`. \
189 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
190 PAUSE_ON_NEXT_BOOTSTRAP_KEY
191 );
192 self.env
193 .system_params_manager_impl_ref()
194 .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
195 .await?;
196 }
197 Ok(paused)
198 }
199
200 async fn run(mut self, shutdown_rx: Receiver<()>) {
202 tracing::info!(
203 "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
204 self.enable_recovery,
205 self.in_flight_barrier_nums,
206 );
207
208 if !self.enable_recovery {
209 let job_exist = self
210 .context
211 .metadata_manager
212 .catalog_controller
213 .has_any_streaming_jobs()
214 .await
215 .unwrap();
216 if job_exist {
217 panic!(
218 "Some streaming jobs already exist in meta, please start with recovery enabled \
219 or clean up the metadata using `./risedev clean-data`"
220 );
221 }
222 }
223
224 {
225 let span = tracing::info_span!("bootstrap_recovery");
230 crate::telemetry::report_event(
231 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
232 "normal_recovery",
233 0,
234 None,
235 None,
236 None,
237 );
238
239 let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
240
241 self.recovery(paused, RecoveryReason::Bootstrap)
242 .instrument(span)
243 .await;
244 }
245
246 self.run_inner(shutdown_rx).await
247 }
248}
249
250impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
251 fn enable_per_database_isolation(&self) -> bool {
252 self.system_enable_per_database_isolation && {
253 if let Err(e) =
254 risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()
255 {
256 warn!(error = %e.as_report(), "DatabaseFailureIsolation disabled by license");
257 false
258 } else {
259 true
260 }
261 }
262 }
263
264 pub(super) async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
265 let (local_notification_tx, mut local_notification_rx) =
266 tokio::sync::mpsc::unbounded_channel();
267 self.env
268 .notification_manager()
269 .insert_local_sender(local_notification_tx)
270 .await;
271
272 loop {
274 tokio::select! {
275 biased;
276
277 _ = &mut shutdown_rx => {
279 tracing::info!("Barrier manager is stopped");
280 break;
281 }
282
283 request = self.request_rx.recv() => {
284 if let Some(request) = request {
285 match request {
286 BarrierManagerRequest::GetDdlProgress(result_tx) => {
287 let progress = self.checkpoint_control.gen_ddl_progress();
288 if result_tx.send(progress).is_err() {
289 error!("failed to send get ddl progress");
290 }
291 }BarrierManagerRequest::AdhocRecovery(sender) => {
293 self.adhoc_recovery().await;
294 if sender.send(()).is_err() {
295 warn!("failed to notify finish of adhoc recovery");
296 }
297 }
298 BarrierManagerRequest::UpdateDatabaseBarrier {
299 database_id,
300 barrier_interval_ms,
301 checkpoint_frequency,
302 sender,
303 } => {
304 self.periodic_barriers
305 .update_database_barrier(
306 database_id,
307 barrier_interval_ms,
308 checkpoint_frequency,
309 );
310 if sender.send(()).is_err() {
311 warn!("failed to notify finish of update database barrier");
312 }
313 }
314 }
315 } else {
316 tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
317 return;
318 }
319 }
320
321 changed_worker = self.active_streaming_nodes.changed() => {
322 #[cfg(debug_assertions)]
323 {
324 self.active_streaming_nodes.validate_change().await;
325 }
326
327 info!(?changed_worker, "worker changed");
328
329 if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker {
330 self.control_stream_manager.add_worker(node, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
331 }
332 }
333
334 notification = local_notification_rx.recv() => {
335 let notification = notification.unwrap();
336 if let LocalNotification::SystemParamsChange(p) = notification {
337 {
338 self.periodic_barriers.set_sys_barrier_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
339 self.periodic_barriers
340 .set_sys_checkpoint_frequency(p.checkpoint_frequency());
341 self.system_enable_per_database_isolation = p.per_database_isolation();
342 }
343 }
344 }
345 complete_result = self
346 .completing_task
347 .next_completed_barrier(
348 &mut self.periodic_barriers,
349 &mut self.checkpoint_control,
350 &mut self.control_stream_manager,
351 &self.context,
352 &self.env,
353 ) => {
354 match complete_result {
355 Ok(output) => {
356 self.checkpoint_control.ack_completed(output);
357 }
358 Err(e) => {
359 self.failure_recovery(e).await;
360 }
361 }
362 },
363 event = self.checkpoint_control.next_event() => {
364 let result: MetaResult<()> = try {
365 match event {
366 CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
367 let database_id = entering_initializing.database_id();
368 let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
369 resp.root_err.as_ref().map(|root_err| {
370 (*worker_id, ScoredError {
371 error: Status::internal(&root_err.err_msg),
372 score: Score(root_err.score)
373 })
374 })
375 }));
376 Self::report_collect_failure(&self.env, &error);
377 self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
378 match self.context.reload_database_runtime_info(database_id).await? { Some(runtime_info) => {
379 runtime_info.validate(database_id, &self.active_streaming_nodes).inspect_err(|e| {
380 warn!(database_id = database_id.database_id, err = ?e.as_report(), ?runtime_info, "reloaded database runtime info failed to validate");
381 })?;
382 let workers = InflightFragmentInfo::workers(runtime_info.job_infos.values().flat_map(|job| job.fragment_infos()));
383 for worker_id in workers {
384 if !self.control_stream_manager.is_connected(worker_id) {
385 self.control_stream_manager.try_reconnect_worker(worker_id, entering_initializing.control().inflight_infos(), self.term_id.clone(), &*self.context).await;
386 }
387 }
388 entering_initializing.enter(runtime_info, &mut self.control_stream_manager);
389 } _ => {
390 info!(database_id = database_id.database_id, "database removed after reloading empty runtime info");
391 self.context.mark_ready(MarkReadyOptions::Database(database_id));
393 entering_initializing.remove();
394 }}
395 }
396 CheckpointControlEvent::EnteringRunning(entering_running) => {
397 self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
398 entering_running.enter();
399 }
400 }
401 };
402 if let Err(e) = result {
403 self.failure_recovery(e).await;
404 }
405 }
406 (worker_id, resp_result) = self.control_stream_manager.next_response() => {
407 let result: MetaResult<()> = try {
408 let resp = match resp_result {
409 Err(err) => {
410 let failed_databases = self.checkpoint_control.databases_failed_at_worker_err(worker_id);
411 if !failed_databases.is_empty() {
412 if !self.enable_recovery {
413 panic!("control stream to worker {} failed but recovery not enabled: {:?}", worker_id, err.as_report());
414 }
415 if !self.enable_per_database_isolation() {
416 Err(err.clone())?;
417 }
418 Self::report_collect_failure(&self.env, &err);
419 for database_id in failed_databases {
420 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
421 warn!(worker_id, database_id = database_id.database_id, "database entering recovery on node failure");
422 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
423 self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
424 let output = self.completing_task.wait_completing_task().await?;
426 entering_recovery.enter(output, &mut self.control_stream_manager);
427 }
428 }
429 } else {
430 warn!(worker_id, "no barrier to collect from worker, ignore err");
431 }
432 continue;
433 }
434 Ok(resp) => resp,
435 };
436 match resp {
437 Response::CompleteBarrier(resp) => {
438 self.checkpoint_control.barrier_collected(resp, &mut self.periodic_barriers)?;
439 },
440 Response::ReportDatabaseFailure(resp) => {
441 if !self.enable_recovery {
442 panic!("database failure reported but recovery not enabled: {:?}", resp)
443 }
444 if !self.enable_per_database_isolation() {
445 Err(anyhow!("database {} reset", resp.database_id))?;
446 }
447 let database_id = DatabaseId::new(resp.database_id);
448 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
449 warn!(database_id = database_id.database_id, "database entering recovery");
450 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
451 let output = self.completing_task.wait_completing_task().await?;
453 entering_recovery.enter(output, &mut self.control_stream_manager);
454 }
455 }
456 Response::ResetDatabase(resp) => {
457 self.checkpoint_control.on_reset_database_resp(worker_id, resp);
458 }
459 other @ Response::Init(_) | other @ Response::Shutdown(_) => {
460 Err(anyhow!("get expected response: {:?}", other))?;
461 }
462 }
463 };
464 if let Err(e) = result {
465 self.failure_recovery(e).await;
466 }
467 }
468 new_barrier = self.periodic_barriers.next_barrier(&*self.context),
469 if self
470 .checkpoint_control
471 .can_inject_barrier(self.in_flight_barrier_nums) => {
472 if let Some((Command::CreateStreamingJob { info, .. }, _)) = &new_barrier.command {
473 let worker_ids: HashSet<_> =
474 info.stream_job_fragments.inner
475 .actors_to_create()
476 .flat_map(|(_, _, actors)|
477 actors.map(|(_, worker_id)| worker_id)
478 )
479 .collect();
480 for worker_id in worker_ids {
481 if !self.control_stream_manager.is_connected(worker_id) {
482 self.control_stream_manager.try_reconnect_worker(worker_id, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
483 }
484 }
485 }
486 let database_id = new_barrier.database_id;
487 if let Err(e) = self.checkpoint_control.handle_new_barrier(new_barrier, &mut self.control_stream_manager) {
488 if !self.enable_recovery {
489 panic!(
490 "failed to inject barrier to some databases but recovery not enabled: {:?}", (
491 database_id,
492 e.as_report()
493 )
494 );
495 }
496 let result: MetaResult<_> = try {
497 if !self.enable_per_database_isolation() {
498 let err = anyhow!("failed to inject barrier to databases: {:?}", (database_id, e.as_report()));
499 Err(err)?;
500 } else if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
501 warn!(%database_id, e = %e.as_report(),"database entering recovery on inject failure");
502 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(e).context("inject barrier failure").into()));
503 let output = self.completing_task.wait_completing_task().await?;
505 entering_recovery.enter(output, &mut self.control_stream_manager);
506 }
507 };
508 if let Err(e) = result {
509 self.failure_recovery(e).await;
510 }
511 }
512 }
513 }
514 self.checkpoint_control.update_barrier_nums_metrics();
515 }
516 }
517}
518
519impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
520 pub async fn clear_on_err(&mut self, err: &MetaError) {
522 let is_err = match replace(&mut self.completing_task, CompletingTask::None) {
524 CompletingTask::None => false,
525 CompletingTask::Completing {
526 epochs_to_ack,
527 join_handle,
528 ..
529 } => {
530 info!("waiting for completing command to finish in recovery");
531 match join_handle.await {
532 Err(e) => {
533 warn!(err = %e.as_report(), "failed to join completing task");
534 true
535 }
536 Ok(Err(e)) => {
537 warn!(
538 err = %e.as_report(),
539 "failed to complete barrier during clear"
540 );
541 true
542 }
543 Ok(Ok(hummock_version_stats)) => {
544 self.checkpoint_control
545 .ack_completed(BarrierCompleteOutput {
546 epochs_to_ack,
547 hummock_version_stats,
548 });
549 false
550 }
551 }
552 }
553 CompletingTask::Err(_) => true,
554 };
555 if !is_err {
556 while let Some(task) = self.checkpoint_control.next_complete_barrier_task(None) {
558 let epochs_to_ack = task.epochs_to_ack();
559 match task
560 .complete_barrier(&*self.context, self.env.clone())
561 .await
562 {
563 Ok(hummock_version_stats) => {
564 self.checkpoint_control
565 .ack_completed(BarrierCompleteOutput {
566 epochs_to_ack,
567 hummock_version_stats,
568 });
569 }
570 Err(e) => {
571 error!(
572 err = %e.as_report(),
573 "failed to complete barrier during recovery"
574 );
575 break;
576 }
577 }
578 }
579 }
580 self.checkpoint_control.clear_on_err(err);
581 }
582}
583
584impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
585 async fn failure_recovery(&mut self, err: MetaError) {
587 self.clear_on_err(&err).await;
588
589 if self.enable_recovery {
590 let span = tracing::info_span!(
591 "failure_recovery",
592 error = %err.as_report(),
593 );
594
595 crate::telemetry::report_event(
596 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
597 "failure_recovery",
598 0,
599 None,
600 None,
601 None,
602 );
603
604 let reason = RecoveryReason::Failover(err);
605
606 self.recovery(false, reason).instrument(span).await;
609 } else {
610 panic!(
611 "a streaming error occurred while recovery is disabled, aborting: {:?}",
612 err.as_report()
613 );
614 }
615 }
616
617 async fn adhoc_recovery(&mut self) {
618 let err = MetaErrorInner::AdhocRecovery.into();
619 self.clear_on_err(&err).await;
620
621 let span = tracing::info_span!(
622 "adhoc_recovery",
623 error = %err.as_report(),
624 );
625
626 crate::telemetry::report_event(
627 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
628 "adhoc_recovery",
629 0,
630 None,
631 None,
632 None,
633 );
634
635 self.recovery(false, RecoveryReason::Adhoc)
638 .instrument(span)
639 .await;
640 }
641}
642
643impl<C> GlobalBarrierWorker<C> {
644 pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
646 use risingwave_pb::meta::event_log;
648 let event = event_log::EventCollectBarrierFail {
649 error: error.to_report_string(),
650 };
651 env.event_log_manager_ref()
652 .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
653 }
654}
655
656mod retry_strategy {
657 use std::time::Duration;
658
659 use tokio_retry::strategy::{ExponentialBackoff, jitter};
660
661 const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
663 const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
665
666 pub(crate) type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
685
686 pub(crate) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
687 Box::pin(tokio::time::sleep(duration))
688 }
689
690 pub(crate) type RetryBackoffStrategy =
691 impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
692
693 #[inline(always)]
695 pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> + Send + 'static {
696 ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
697 .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
698 .map(jitter)
699 }
700
701 #[define_opaque(RetryBackoffStrategy)]
702 pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
703 get_retry_strategy().map(get_retry_backoff_future)
704 }
705}
706
707pub(crate) use retry_strategy::*;
708use risingwave_common::error::tonic::extra::{Score, ScoredError};
709use risingwave_meta_model::WorkerId;
710use risingwave_pb::meta::event_log::{Event, EventRecovery};
711
712use crate::barrier::edge_builder::FragmentEdgeBuilder;
713use crate::controller::fragment::InflightFragmentInfo;
714
715impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
716 pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
724 self.control_stream_manager.clear();
726
727 let reason_str = match &recovery_reason {
728 RecoveryReason::Bootstrap => "bootstrap".to_owned(),
729 RecoveryReason::Failover(err) => {
730 format!("failed over: {}", err.as_report())
731 }
732 RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
733 };
734 self.context.abort_and_mark_blocked(None, recovery_reason);
735
736 self.recovery_inner(is_paused, reason_str).await;
737 self.context.mark_ready(MarkReadyOptions::Global {
738 blocked_databases: self.checkpoint_control.recovering_databases().collect(),
739 });
740 }
741
742 #[await_tree::instrument("recovery({recovery_reason})")]
743 async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
744 let event_log_manager_ref = self.env.event_log_manager_ref();
745
746 tracing::info!("recovery start!");
747 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
748 EventRecovery::global_recovery_start(recovery_reason.clone()),
749 )]);
750
751 let retry_strategy = get_retry_strategy();
752
753 let recovery_timer = GLOBAL_META_METRICS
756 .recovery_latency
757 .with_label_values(&["global"])
758 .start_timer();
759
760 let enable_per_database_isolation = self.enable_per_database_isolation();
761
762 let new_state = tokio_retry::Retry::spawn(retry_strategy, || async {
763 self.env.stream_client_pool().invalidate_all();
764 self.context
769 .notify_creating_job_failed(None, recovery_reason.clone())
770 .await;
771 let runtime_info_snapshot = self
772 .context
773 .reload_runtime_info()
774 .await?;
775 runtime_info_snapshot.validate().inspect_err(|e| {
776 warn!(err = ?e.as_report(), ?runtime_info_snapshot, "reloaded runtime info failed to validate");
777 })?;
778 let BarrierWorkerRuntimeInfoSnapshot {
779 active_streaming_nodes,
780 database_job_infos,
781 mut state_table_committed_epochs,
782 mut state_table_log_epochs,
783 mut subscription_infos,
784 stream_actors,
785 fragment_relations,
786 mut source_splits,
787 mut background_jobs,
788 hummock_version_stats,
789 database_infos,
790 } = runtime_info_snapshot;
791
792 self.sink_manager.reset().await;
793 let term_id = Uuid::new_v4().to_string();
794
795 let mut control_stream_manager = ControlStreamManager::new(self.env.clone());
796 let reset_start_time = Instant::now();
797 let unconnected_worker = control_stream_manager
798 .reset(
799 active_streaming_nodes.current(),
800 term_id.clone(),
801 &*self.context,
802 )
803 .await;
804 info!(elapsed=?reset_start_time.elapsed(), ?unconnected_worker, "control stream reset");
805
806 {
807 let mut builder = FragmentEdgeBuilder::new(database_job_infos.values().flat_map(|info| info.values().flatten()), &control_stream_manager);
808 builder.add_relations(&fragment_relations);
809 let mut edges = builder.build();
810
811 let mut collected_databases = HashMap::new();
812 let mut collecting_databases = HashMap::new();
813 let mut failed_databases = HashSet::new();
814 for (database_id, jobs) in database_job_infos {
815 let result = control_stream_manager.inject_database_initial_barrier(
816 database_id,
817 jobs,
818 &mut state_table_committed_epochs,
819 &mut state_table_log_epochs,
820 &mut edges,
821 &stream_actors,
822 &mut source_splits,
823 &mut background_jobs,
824 subscription_infos.remove(&database_id).unwrap_or_default(),
825 is_paused,
826 &hummock_version_stats,
827 );
828 let node_to_collect = match result {
829 Ok(info) => {
830 info
831 }
832 Err(e) => {
833 warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
834 assert!(failed_databases.insert(database_id), "non-duplicate");
835 continue;
836 }
837 };
838 if !node_to_collect.is_collected() {
839 assert!(collecting_databases.insert(database_id, node_to_collect).is_none());
840 } else {
841 warn!(database_id = database_id.database_id, "database has no node to inject initial barrier");
842 assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
843 }
844 }
845 while !collecting_databases.is_empty() {
846 let (worker_id, result) =
847 control_stream_manager.next_response().await;
848 let resp = match result {
849 Err(e) => {
850 warn!(worker_id, err = %e.as_report(), "worker node failure during recovery");
851 for (failed_database_id,_ ) in collecting_databases.extract_if(|_, node_to_collect| {
852 !node_to_collect.is_valid_after_worker_err(worker_id)
853 }) {
854 warn!(%failed_database_id, worker_id, "database failed to recovery in global recovery due to worker node err");
855 assert!(failed_databases.insert(failed_database_id));
856 }
857 continue;
858 }
859 Ok(resp) => {
860 match resp {
861 Response::CompleteBarrier(resp) => {
862 resp
863 }
864 Response::ReportDatabaseFailure(resp) => {
865 let database_id = DatabaseId::new(resp.database_id);
866 if collecting_databases.remove(&database_id).is_some() {
867 warn!(%database_id, worker_id, "database reset during global recovery");
868 assert!(failed_databases.insert(database_id));
869 } else if collected_databases.remove(&database_id).is_some() {
870 warn!(%database_id, worker_id, "database initialized but later reset during global recovery");
871 assert!(failed_databases.insert(database_id));
872 } else {
873 assert!(failed_databases.contains(&database_id));
874 }
875 continue;
876 }
877 other @ (Response::Init(_) | Response::Shutdown(_) | Response::ResetDatabase(_))=> {
878 return Err(anyhow!("get unexpected resp {:?}", other).into());
879 }
880 }
881 }
882 };
883 assert_eq!(worker_id, resp.worker_id as WorkerId);
884 let database_id = DatabaseId::new(resp.database_id);
885 if failed_databases.contains(&database_id) {
886 assert!(!collecting_databases.contains_key(&database_id));
887 continue;
889 }
890 let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
891 unreachable!("should exist")
892 };
893 let node_to_collect = entry.get_mut();
894 node_to_collect.collect_resp(resp);
895 if node_to_collect.is_collected() {
896 let node_to_collect = entry.remove();
897 assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
898 }
899 }
900 debug!("collected initial barrier");
901 if !stream_actors.is_empty() {
902 warn!(actor_ids = ?stream_actors.keys().collect_vec(), "unused stream actors in recovery");
903 }
904 if !source_splits.is_empty() {
905 warn!(actor_ids = ?source_splits.keys().collect_vec(), "unused actor source splits in recovery");
906 }
907 if !background_jobs.is_empty() {
908 warn!(job_ids = ?background_jobs.keys().collect_vec(), "unused recovered background mview in recovery");
909 }
910 if !subscription_infos.is_empty() {
911 warn!(?subscription_infos, "unused subscription infos in recovery");
912 }
913 if !state_table_committed_epochs.is_empty() {
914 warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
915 }
916 if !enable_per_database_isolation && !failed_databases.is_empty() {
917 return Err(anyhow!(
918 "global recovery failed due to failure of databases {:?}",
919 failed_databases.iter().map(|database_id| database_id.database_id).collect_vec()).into()
920 );
921 }
922 let checkpoint_control = CheckpointControl::recover(
923 collected_databases,
924 failed_databases,
925 &mut control_stream_manager,
926 hummock_version_stats,
927 self.env.clone(),
928 );
929
930 let reader = self.env.system_params_reader().await;
931 let checkpoint_frequency = reader.checkpoint_frequency();
932 let barrier_interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
933 let periodic_barriers = PeriodicBarriers::new(
934 barrier_interval,
935 checkpoint_frequency,
936 database_infos,
937 );
938
939 Ok((
940 active_streaming_nodes,
941 control_stream_manager,
942 checkpoint_control,
943 term_id,
944 periodic_barriers,
945 ))
946 }
947 }.inspect_err(|err: &MetaError| {
948 tracing::error!(error = %err.as_report(), "recovery failed");
949 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
950 EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
951 )]);
952 GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
953 }))
954 .instrument(tracing::info_span!("recovery_attempt"))
955 .await
956 .expect("Retry until recovery success.");
957
958 let duration = recovery_timer.stop_and_record();
959
960 (
961 self.active_streaming_nodes,
962 self.control_stream_manager,
963 self.checkpoint_control,
964 self.term_id,
965 self.periodic_barriers,
966 ) = new_state;
967
968 tracing::info!("recovery success");
969
970 let recovering_databases = self
971 .checkpoint_control
972 .recovering_databases()
973 .map(|database| database.database_id)
974 .collect_vec();
975 let running_databases = self
976 .checkpoint_control
977 .running_databases()
978 .map(|database| database.database_id)
979 .collect_vec();
980
981 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
982 EventRecovery::global_recovery_success(
983 recovery_reason.clone(),
984 duration as f32,
985 running_databases,
986 recovering_databases,
987 ),
988 )]);
989
990 self.env
991 .notification_manager()
992 .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
993 self.env
994 .notification_manager()
995 .notify_compute_without_version(Operation::Update, Info::Recovery(Recovery {}));
996 }
997}