1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::pin::pin;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use arc_swap::ArcSwap;
24use futures::{TryFutureExt, pin_mut};
25use itertools::Itertools;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::system_param::{AdaptiveParallelismStrategy, PAUSE_ON_NEXT_BOOTSTRAP_KEY};
28use risingwave_pb::meta::Recovery;
29use risingwave_pb::meta::subscribe_response::{Info, Operation};
30use risingwave_pb::stream_service::streaming_control_stream_response::Response;
31use thiserror_ext::AsReport;
32use tokio::select;
33use tokio::sync::mpsc;
34use tokio::sync::oneshot::{Receiver, Sender};
35use tokio::task::JoinHandle;
36use tonic::Status;
37use tracing::{Instrument, debug, error, info, warn};
38use uuid::Uuid;
39
40use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
41use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
42use crate::barrier::context::recovery::{RenderedDatabaseRuntimeInfo, render_runtime_info};
43use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
44use crate::barrier::rpc::{
45 ControlStreamManager, WorkerNodeEvent, from_partial_graph_id, merge_node_rpc_errors,
46};
47use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
48use crate::barrier::{
49 BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, RecoveryReason,
50 UpdateDatabaseBarrierRequest, schedule,
51};
52use crate::error::MetaErrorInner;
53use crate::hummock::HummockManagerRef;
54use crate::manager::sink_coordination::SinkCoordinatorManager;
55use crate::manager::{
56 ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
57 MetadataManager,
58};
59use crate::rpc::metrics::GLOBAL_META_METRICS;
60use crate::stream::{GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef};
61use crate::{MetaError, MetaResult};
62
63pub(super) struct GlobalBarrierWorker<C> {
73 enable_recovery: bool,
75
76 periodic_barriers: PeriodicBarriers,
78
79 system_enable_per_database_isolation: bool,
81
82 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
83
84 pub(super) context: Arc<C>,
85
86 env: MetaSrvEnv,
87
88 checkpoint_control: CheckpointControl,
89
90 completing_task: CompletingTask,
93
94 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
95
96 active_streaming_nodes: ActiveStreamingWorkerNodes,
97
98 control_stream_manager: ControlStreamManager,
99
100 term_id: String,
101}
102
103impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
104 pub(super) async fn new_inner(
105 env: MetaSrvEnv,
106 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
107 context: Arc<C>,
108 ) -> Self {
109 let enable_recovery = env.opts.enable_recovery;
110
111 let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized();
112
113 let control_stream_manager = ControlStreamManager::new(env.clone());
114
115 let reader = env.system_params_reader().await;
116 let system_enable_per_database_isolation = reader.per_database_isolation();
117 let periodic_barriers = PeriodicBarriers::default();
119 let adaptive_parallelism_strategy = reader.adaptive_parallelism_strategy();
120
121 let checkpoint_control = CheckpointControl::new(env.clone());
122 Self {
123 enable_recovery,
124 periodic_barriers,
125 system_enable_per_database_isolation,
126 adaptive_parallelism_strategy,
127 context,
128 env,
129 checkpoint_control,
130 completing_task: CompletingTask::None,
131 request_rx,
132 active_streaming_nodes,
133 control_stream_manager,
134 term_id: "uninitialized".into(),
135 }
136 }
137}
138
139impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
140 pub async fn new(
142 scheduled_barriers: schedule::ScheduledBarriers,
143 env: MetaSrvEnv,
144 metadata_manager: MetadataManager,
145 hummock_manager: HummockManagerRef,
146 source_manager: SourceManagerRef,
147 sink_manager: SinkCoordinatorManager,
148 scale_controller: ScaleControllerRef,
149 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
150 barrier_scheduler: schedule::BarrierScheduler,
151 refresh_manager: GlobalRefreshManagerRef,
152 ) -> Self {
153 let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
154
155 let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
156 scheduled_barriers,
157 status,
158 metadata_manager,
159 hummock_manager,
160 source_manager,
161 scale_controller,
162 env.clone(),
163 barrier_scheduler,
164 refresh_manager,
165 sink_manager,
166 ));
167
168 Self::new_inner(env, request_rx, context).await
169 }
170
171 pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
172 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
173 let fut = (self.env.await_tree_reg())
174 .register_derived_root("Global Barrier Worker")
175 .instrument(self.run(shutdown_rx));
176 let join_handle = tokio::spawn(fut);
177
178 (join_handle, shutdown_tx)
179 }
180
181 async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
183 let paused = self
184 .env
185 .system_params_reader()
186 .await
187 .pause_on_next_bootstrap()
188 || self.env.opts.pause_on_next_bootstrap_offline;
189
190 if paused {
191 warn!(
192 "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
193 It will now be reset to `false`. \
194 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
195 PAUSE_ON_NEXT_BOOTSTRAP_KEY
196 );
197 self.env
198 .system_params_manager_impl_ref()
199 .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
200 .await?;
201 }
202 Ok(paused)
203 }
204
205 async fn run(mut self, shutdown_rx: Receiver<()>) {
207 tracing::info!(
208 "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
209 self.enable_recovery,
210 self.checkpoint_control.in_flight_barrier_nums,
211 );
212
213 if !self.enable_recovery {
214 let job_exist = self
215 .context
216 .metadata_manager
217 .catalog_controller
218 .has_any_streaming_jobs()
219 .await
220 .unwrap();
221 if job_exist {
222 panic!(
223 "Some streaming jobs already exist in meta, please start with recovery enabled \
224 or clean up the metadata using `./risedev clean-data`"
225 );
226 }
227 }
228
229 {
230 let span = tracing::info_span!("bootstrap_recovery");
235 crate::telemetry::report_event(
236 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
237 "normal_recovery",
238 0,
239 None,
240 None,
241 None,
242 );
243
244 let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
245
246 self.recovery(paused, RecoveryReason::Bootstrap)
247 .instrument(span)
248 .await;
249 }
250
251 self.run_inner(shutdown_rx).await
252 }
253}
254
255impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
256 fn enable_per_database_isolation(&self) -> bool {
257 self.system_enable_per_database_isolation && {
258 if let Err(e) =
259 risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()
260 {
261 warn!(error = %e.as_report(), "DatabaseFailureIsolation disabled by license");
262 false
263 } else {
264 true
265 }
266 }
267 }
268
269 pub(super) async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
270 let (local_notification_tx, mut local_notification_rx) =
271 tokio::sync::mpsc::unbounded_channel();
272 self.env
273 .notification_manager()
274 .insert_local_sender(local_notification_tx);
275
276 loop {
278 tokio::select! {
279 biased;
280
281 _ = &mut shutdown_rx => {
283 tracing::info!("Barrier manager is stopped");
284 break;
285 }
286
287 request = self.request_rx.recv() => {
288 if let Some(request) = request {
289 match request {
290 BarrierManagerRequest::GetBackfillProgress(result_tx) => {
291 let progress = self.checkpoint_control.gen_backfill_progress();
292 if result_tx.send(Ok(progress)).is_err() {
293 error!("failed to send get ddl progress");
294 }
295 }
296 BarrierManagerRequest::GetFragmentBackfillProgress(result_tx) => {
297 let progress =
298 self.checkpoint_control.gen_fragment_backfill_progress();
299 if result_tx.send(Ok(progress)).is_err() {
300 error!("failed to send get fragment backfill progress");
301 }
302 }
303 BarrierManagerRequest::GetCdcProgress(result_tx) => {
304 let progress = self.checkpoint_control.gen_cdc_progress();
305 if result_tx.send(Ok(progress)).is_err() {
306 error!("failed to send get ddl progress");
307 }
308 }
309 BarrierManagerRequest::AdhocRecovery(sender) => {
311 self.adhoc_recovery().await;
312 if sender.send(()).is_err() {
313 warn!("failed to notify finish of adhoc recovery");
314 }
315 }
316 BarrierManagerRequest::UpdateDatabaseBarrier( UpdateDatabaseBarrierRequest {
317 database_id,
318 barrier_interval_ms,
319 checkpoint_frequency,
320 sender,
321 }) => {
322 self.periodic_barriers
323 .update_database_barrier(
324 database_id,
325 barrier_interval_ms,
326 checkpoint_frequency,
327 );
328 if sender.send(()).is_err() {
329 warn!("failed to notify finish of update database barrier");
330 }
331 }
332 BarrierManagerRequest::MayHaveSnapshotBackfillingJob(tx) => {
333 if tx.send(self.checkpoint_control.may_have_snapshot_backfilling_jobs()).is_err() {
334 warn!("failed to may have snapshot backfill job");
335 }
336 }
337 }
338 } else {
339 tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
340 return;
341 }
342 }
343
344 changed_worker = self.active_streaming_nodes.changed() => {
345 #[cfg(debug_assertions)]
346 {
347 self.active_streaming_nodes.validate_change().await;
348 }
349
350 info!(?changed_worker, "worker changed");
351
352 match changed_worker {
353 ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) => {
354 self.control_stream_manager.add_worker(node, self.checkpoint_control.inflight_infos(), self.term_id.clone(), &*self.context).await;
355 }
356 ActiveStreamingWorkerChange::Remove(node) => {
357 self.control_stream_manager.remove_worker(node);
358 }
359 }
360 }
361
362 notification = local_notification_rx.recv() => {
363 let notification = notification.unwrap();
364 if let LocalNotification::SystemParamsChange(p) = notification {
365 {
366 self.periodic_barriers.set_sys_barrier_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
367 self.periodic_barriers
368 .set_sys_checkpoint_frequency(p.checkpoint_frequency());
369 self.system_enable_per_database_isolation = p.per_database_isolation();
370 self.adaptive_parallelism_strategy = p.adaptive_parallelism_strategy();
371 }
372 }
373 }
374 complete_result = self
375 .completing_task
376 .next_completed_barrier(
377 &mut self.periodic_barriers,
378 &mut self.checkpoint_control,
379 &mut self.control_stream_manager,
380 &self.context,
381 &self.env,
382 ) => {
383 match complete_result {
384 Ok(output) => {
385 self.checkpoint_control.ack_completed(output);
386 }
387 Err(e) => {
388 self.failure_recovery(e).await;
389 }
390 }
391 },
392 event = self.checkpoint_control.next_event() => {
393 let result: MetaResult<()> = try {
394 match event {
395 CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
396 let database_id = entering_initializing.database_id();
397 let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
398 resp.root_err.as_ref().map(|root_err| {
399 (*worker_id, ScoredError {
400 error: Status::internal(&root_err.err_msg),
401 score: Score(root_err.score)
402 })
403 })
404 }));
405 Self::report_collect_failure(&self.env, &error);
406 self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
407 let result: MetaResult<_> = try {
408 let runtime_info = self.context.reload_database_runtime_info(database_id).await.inspect_err(|err| {
409 warn!(%database_id, err = %err.as_report(), "reload runtime info failed");
410 })?;
411 let rendered_info = render_runtime_info(
412 self.env.actor_id_generator(),
413 &self.active_streaming_nodes,
414 self.adaptive_parallelism_strategy,
415 &runtime_info.recovery_context,
416 database_id,
417 )
418 .inspect_err(|err: &MetaError| {
419 warn!(%database_id, err = %err.as_report(), "render runtime info failed");
420 })?;
421 if let Some(rendered_info) = rendered_info {
422 BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
423 database_id,
424 &rendered_info.job_infos,
425 &self.active_streaming_nodes,
426 &rendered_info.stream_actors,
427 &runtime_info.state_table_committed_epochs,
428 )
429 .inspect_err(|err| {
430 warn!(%database_id, err = ?err.as_report(), "database runtime info failed validation");
431 })?;
432 Some((runtime_info, rendered_info))
433 } else {
434 None
435 }
436 };
437 match result {
438 Ok(Some((runtime_info, rendered_info))) => {
439 entering_initializing.enter(
440 runtime_info,
441 rendered_info,
442 &mut self.control_stream_manager,
443 );
444 }
445 Ok(None) => {
446 info!(%database_id, "database removed after reloading empty runtime info");
447 self.context.mark_ready(MarkReadyOptions::Database(database_id));
449 entering_initializing.remove();
450 }
451 Err(e) => {
452 entering_initializing.fail_reload_runtime_info(e);
453 }
454 }
455 }
456 CheckpointControlEvent::EnteringRunning(entering_running) => {
457 self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
458 entering_running.enter();
459 }
460 }
461 };
462 if let Err(e) = result {
463 self.failure_recovery(e).await;
464 }
465 }
466 (worker_id, event) = self.control_stream_manager.next_event(&self.term_id, &self.context) => {
467 let resp_result = match event {
468 WorkerNodeEvent::Response(result) => {
469 result
470 }
471 WorkerNodeEvent::Connected(connected) => {
472 connected.initialize(self.checkpoint_control.inflight_infos());
473 continue;
474 }
475 };
476 let result: MetaResult<()> = try {
477 let resp = match resp_result {
478 Err(err) => {
479 let failed_databases = self.checkpoint_control.databases_failed_at_worker_err(worker_id);
480 if !failed_databases.is_empty() {
481 if !self.enable_recovery {
482 panic!("control stream to worker {} failed but recovery not enabled: {:?}", worker_id, err.as_report());
483 }
484 if !self.enable_per_database_isolation() {
485 Err(err.clone())?;
486 }
487 Self::report_collect_failure(&self.env, &err);
488 for database_id in failed_databases {
489 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
490 warn!(%worker_id, %database_id, "database entering recovery on node failure");
491 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
492 self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
493 let output = self.completing_task.wait_completing_task().await?;
495 entering_recovery.enter(output, &mut self.control_stream_manager);
496 }
497 }
498 } else {
499 warn!(%worker_id, "no barrier to collect from worker, ignore err");
500 }
501 continue;
502 }
503 Ok(resp) => resp,
504 };
505 match resp {
506 Response::CompleteBarrier(resp) => {
507 self.checkpoint_control.barrier_collected(resp, &mut self.periodic_barriers)?;
508 },
509 Response::ReportPartialGraphFailure(resp) => {
510 if !self.enable_recovery {
511 panic!("database failure reported but recovery not enabled: {:?}", resp)
512 }
513 let (database_id, _) = from_partial_graph_id(resp.partial_graph_id);
514 if !self.enable_per_database_isolation() {
515 Err(anyhow!("database {} reset", database_id))?;
516 }
517 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
518 warn!(%database_id, "database entering recovery");
519 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
520 let output = self.completing_task.wait_completing_task().await?;
522 entering_recovery.enter(output, &mut self.control_stream_manager);
523 }
524 }
525 Response::ResetPartialGraph(resp) => {
526 self.checkpoint_control.on_reset_partial_graph_resp(worker_id, resp);
527 }
528 other @ Response::Init(_) | other @ Response::Shutdown(_) => {
529 Err(anyhow!("get expected response: {:?}", other))?;
530 }
531 }
532 };
533 if let Err(e) = result {
534 self.failure_recovery(e).await;
535 }
536 }
537 new_barrier = self.periodic_barriers.next_barrier(&*self.context) => {
538 let database_id = new_barrier.database_id;
539 if let Err(e) = self.checkpoint_control.handle_new_barrier(new_barrier, &mut self.control_stream_manager) {
540 if !self.enable_recovery {
541 panic!(
542 "failed to inject barrier to some databases but recovery not enabled: {:?}", (
543 database_id,
544 e.as_report()
545 )
546 );
547 }
548 let result: MetaResult<_> = try {
549 if !self.enable_per_database_isolation() {
550 let err = anyhow!("failed to inject barrier to databases: {:?}", (database_id, e.as_report()));
551 Err(err)?;
552 } else if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.control_stream_manager) {
553 warn!(%database_id, e = %e.as_report(),"database entering recovery on inject failure");
554 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(e).context("inject barrier failure").into()));
555 let output = self.completing_task.wait_completing_task().await?;
557 entering_recovery.enter(output, &mut self.control_stream_manager);
558 }
559 };
560 if let Err(e) = result {
561 self.failure_recovery(e).await;
562 }
563 }
564 }
565 }
566 self.checkpoint_control.update_barrier_nums_metrics();
567 }
568 }
569}
570
571impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
572 pub async fn clear_on_err(&mut self, err: &MetaError) {
574 let is_err = match replace(&mut self.completing_task, CompletingTask::None) {
576 CompletingTask::None => false,
577 CompletingTask::Completing {
578 epochs_to_ack,
579 join_handle,
580 ..
581 } => {
582 info!("waiting for completing command to finish in recovery");
583 match join_handle.await {
584 Err(e) => {
585 warn!(err = %e.as_report(), "failed to join completing task");
586 true
587 }
588 Ok(Err(e)) => {
589 warn!(
590 err = %e.as_report(),
591 "failed to complete barrier during clear"
592 );
593 true
594 }
595 Ok(Ok(hummock_version_stats)) => {
596 self.checkpoint_control
597 .ack_completed(BarrierCompleteOutput {
598 epochs_to_ack,
599 hummock_version_stats,
600 });
601 false
602 }
603 }
604 }
605 CompletingTask::Err(_) => true,
606 };
607 if !is_err {
608 while let Some(task) = self.checkpoint_control.next_complete_barrier_task(None) {
610 let epochs_to_ack = task.epochs_to_ack();
611 match task
612 .complete_barrier(&*self.context, self.env.clone())
613 .await
614 {
615 Ok(hummock_version_stats) => {
616 self.checkpoint_control
617 .ack_completed(BarrierCompleteOutput {
618 epochs_to_ack,
619 hummock_version_stats,
620 });
621 }
622 Err(e) => {
623 error!(
624 err = %e.as_report(),
625 "failed to complete barrier during recovery"
626 );
627 break;
628 }
629 }
630 }
631 }
632 self.checkpoint_control.clear_on_err(err);
633 }
634}
635
636impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
637 async fn failure_recovery(&mut self, err: MetaError) {
639 self.clear_on_err(&err).await;
640
641 if self.enable_recovery {
642 let span = tracing::info_span!(
643 "failure_recovery",
644 error = %err.as_report(),
645 );
646
647 crate::telemetry::report_event(
648 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
649 "failure_recovery",
650 0,
651 None,
652 None,
653 None,
654 );
655
656 let reason = RecoveryReason::Failover(err);
657
658 self.recovery(false, reason).instrument(span).await;
661 } else {
662 panic!(
663 "a streaming error occurred while recovery is disabled, aborting: {:?}",
664 err.as_report()
665 );
666 }
667 }
668
669 async fn adhoc_recovery(&mut self) {
670 let err = MetaErrorInner::AdhocRecovery.into();
671 self.clear_on_err(&err).await;
672
673 let span = tracing::info_span!(
674 "adhoc_recovery",
675 error = %err.as_report(),
676 );
677
678 crate::telemetry::report_event(
679 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
680 "adhoc_recovery",
681 0,
682 None,
683 None,
684 None,
685 );
686
687 self.recovery(false, RecoveryReason::Adhoc)
690 .instrument(span)
691 .await;
692 }
693}
694
695impl<C> GlobalBarrierWorker<C> {
696 pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
698 use risingwave_pb::meta::event_log;
700 let event = event_log::EventCollectBarrierFail {
701 error: error.to_report_string(),
702 };
703 env.event_log_manager_ref()
704 .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
705 }
706}
707
708mod retry_strategy {
709 use std::time::Duration;
710
711 use tokio_retry::strategy::{ExponentialBackoff, jitter};
712
713 const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
715 const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
717
718 pub(crate) type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
737
738 pub(crate) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
739 Box::pin(tokio::time::sleep(duration))
740 }
741
742 pub(crate) type RetryBackoffStrategy =
743 impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
744
745 #[inline(always)]
747 pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> + Send + 'static {
748 ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
749 .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
750 .map(jitter)
751 }
752
753 #[define_opaque(RetryBackoffStrategy)]
754 pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
755 get_retry_strategy().map(get_retry_backoff_future)
756 }
757}
758
759pub(crate) use retry_strategy::*;
760use risingwave_common::error::tonic::extra::{Score, ScoredError};
761use risingwave_pb::meta::event_log::{Event, EventRecovery};
762
763impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
764 pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
772 self.control_stream_manager.clear();
774
775 let reason_str = match &recovery_reason {
776 RecoveryReason::Bootstrap => "bootstrap".to_owned(),
777 RecoveryReason::Failover(err) => {
778 format!("failed over: {}", err.as_report())
779 }
780 RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
781 };
782 self.context.abort_and_mark_blocked(None, recovery_reason);
783
784 self.recovery_inner(is_paused, reason_str).await;
785 self.context.mark_ready(MarkReadyOptions::Global {
786 blocked_databases: self.checkpoint_control.recovering_databases().collect(),
787 });
788 }
789
790 #[await_tree::instrument("recovery({recovery_reason})")]
791 async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
792 let event_log_manager_ref = self.env.event_log_manager_ref();
793
794 tracing::info!("recovery start!");
795 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
796 EventRecovery::global_recovery_start(recovery_reason.clone()),
797 )]);
798
799 let retry_strategy = get_retry_strategy();
800
801 let recovery_timer = GLOBAL_META_METRICS
804 .recovery_latency
805 .with_label_values(&["global"])
806 .start_timer();
807
808 let enable_per_database_isolation = self.enable_per_database_isolation();
809
810 let recovery_future = tokio_retry::Retry::spawn(retry_strategy, || async {
811 self.env.stream_client_pool().invalidate_all();
812 self.context
817 .notify_creating_job_failed(None, recovery_reason.clone())
818 .await;
819
820 let runtime_info_snapshot = self
821 .context
822 .reload_runtime_info()
823 .await?;
824 let BarrierWorkerRuntimeInfoSnapshot {
825 active_streaming_nodes,
826 recovery_context,
827 mut state_table_committed_epochs,
828 mut state_table_log_epochs,
829 mut mv_depended_subscriptions,
830 mut background_jobs,
831 hummock_version_stats,
832 database_infos,
833 mut cdc_table_snapshot_splits,
834 } = runtime_info_snapshot;
835
836 let term_id = Uuid::new_v4().to_string();
837
838
839 let mut control_stream_manager = ControlStreamManager::recover(
840 self.env.clone(),
841 active_streaming_nodes.current(),
842 &term_id,
843 self.context.clone(),
844 )
845 .await;
846 {
847 let mut empty_databases = HashSet::new();
848 let mut collected_databases = HashMap::new();
849 let mut collecting_databases = HashMap::new();
850 let mut failed_databases = HashMap::new();
851 for &database_id in recovery_context.fragment_context.database_map.keys() {
852 let mut injected_creating_jobs = HashSet::new();
853 let result: MetaResult<_> = try {
854 let Some(rendered_info) = render_runtime_info(
855 self.env.actor_id_generator(),
856 &active_streaming_nodes,
857 self.adaptive_parallelism_strategy,
858 &recovery_context,
859 database_id,
860 )
861 .inspect_err(|err: &MetaError| {
862 warn!(%database_id, err = %err.as_report(), "render runtime info failed");
863 })? else {
864 empty_databases.insert(database_id);
865 continue;
866 };
867 BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
868 database_id,
869 &rendered_info.job_infos,
870 &active_streaming_nodes,
871 &rendered_info.stream_actors,
872 &state_table_committed_epochs,
873 )
874 .inspect_err(|err| {
875 warn!(%database_id, err = %err.as_report(), "rendered runtime info failed validation");
876 })?;
877 let RenderedDatabaseRuntimeInfo {
878 job_infos,
879 stream_actors,
880 mut source_splits,
881 } = rendered_info;
882 control_stream_manager.inject_database_initial_barrier(
883 database_id,
884 job_infos,
885 &recovery_context.job_extra_info,
886 &mut state_table_committed_epochs,
887 &mut state_table_log_epochs,
888 &recovery_context.fragment_relations,
889 &stream_actors,
890 &mut source_splits,
891 &mut background_jobs,
892 &mut mv_depended_subscriptions,
893 is_paused,
894 &hummock_version_stats,
895 &mut cdc_table_snapshot_splits,
896 &mut injected_creating_jobs,
897 )?
898 };
899 let node_to_collect = match result {
900 Ok(info) => {
901 info
902 }
903 Err(e) => {
904 warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
905 assert!(failed_databases.insert(database_id, injected_creating_jobs).is_none(), "non-duplicate");
906 continue;
907 }
908 };
909 if !node_to_collect.is_collected() {
910 assert!(collecting_databases.insert(database_id, node_to_collect).is_none());
911 } else {
912 warn!(%database_id, "database has no node to inject initial barrier");
913 assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
914 }
915 }
916 if !empty_databases.is_empty() {
917 info!(?empty_databases, "empty database in global recovery");
918 }
919 while !collecting_databases.is_empty() {
920 let (worker_id, result) =
921 control_stream_manager.next_response(&term_id, &self.context).await;
922 let resp = match result {
923 Err(e) => {
924 warn!(%worker_id, err = %e.as_report(), "worker node failure during recovery");
925 for (failed_database_id, collector) in collecting_databases.extract_if(|_, collector| {
926 !collector.is_valid_after_worker_err(worker_id)
927 }) {
928 warn!(%failed_database_id, %worker_id, "database failed to recovery in global recovery due to worker node err");
929 assert!(failed_databases.insert(failed_database_id, collector.creating_job_ids().collect()).is_none());
930 }
931 continue;
932 }
933 Ok(resp) => {
934 match resp {
935 Response::CompleteBarrier(resp) => {
936 resp
937 }
938 Response::ReportPartialGraphFailure(resp) => {
939 let (database_id, _) = from_partial_graph_id(resp.partial_graph_id);
940 if let Some(collector) = collecting_databases.remove(&database_id) {
941 warn!(%database_id, %worker_id, "database reset during global recovery");
942 assert!(failed_databases.insert(database_id, collector.creating_job_ids().collect()).is_none());
943 } else if let Some(database) = collected_databases.remove(&database_id) {
944 warn!(%database_id, %worker_id, "database initialized but later reset during global recovery");
945 assert!(failed_databases.insert(database_id, database.creating_streaming_job_controls.keys().copied().collect()).is_none());
946 } else {
947 assert!(failed_databases.contains_key(&database_id));
948 }
949 continue;
950 }
951 other @ (Response::Init(_) | Response::Shutdown(_) | Response::ResetPartialGraph(_)) => {
952 return Err(anyhow!("get unexpected resp {:?}", other).into());
953 }
954 }
955 }
956 };
957 assert_eq!(worker_id, resp.worker_id);
958 let (database_id, _) = from_partial_graph_id(resp.partial_graph_id);
959 if failed_databases.contains_key(&database_id) {
960 assert!(!collecting_databases.contains_key(&database_id));
961 continue;
963 }
964 let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
965 unreachable!("should exist")
966 };
967 let node_to_collect = entry.get_mut();
968 node_to_collect.collect_resp(resp);
969 if node_to_collect.is_collected() {
970 let node_to_collect = entry.remove();
971 assert!(collected_databases.insert(database_id, node_to_collect.finish()).is_none());
972 }
973 }
974 debug!("collected initial barrier");
975 if !background_jobs.is_empty() {
976 warn!(job_ids = ?background_jobs.iter().collect_vec(), "unused recovered background mview in recovery");
977 }
978 if !mv_depended_subscriptions.is_empty() {
979 warn!(?mv_depended_subscriptions, "unused subscription infos in recovery");
980 }
981 if !state_table_committed_epochs.is_empty() {
982 warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
983 }
984 if !enable_per_database_isolation && !failed_databases.is_empty() {
985 return Err(anyhow!(
986 "global recovery failed due to failure of databases {:?}",
987 failed_databases.keys().collect_vec()).into()
988 );
989 }
990 let checkpoint_control = CheckpointControl::recover(
991 collected_databases,
992 failed_databases,
993 &mut control_stream_manager,
994 hummock_version_stats,
995 self.env.clone(),
996 );
997
998 let reader = self.env.system_params_reader().await;
999 let checkpoint_frequency = reader.checkpoint_frequency();
1000 let barrier_interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
1001 let periodic_barriers = PeriodicBarriers::new(
1002 barrier_interval,
1003 checkpoint_frequency,
1004 database_infos,
1005 );
1006
1007 Ok((
1008 active_streaming_nodes,
1009 control_stream_manager,
1010 checkpoint_control,
1011 term_id,
1012 periodic_barriers,
1013 ))
1014 }
1015 }.inspect_err(|err: &MetaError| {
1016 tracing::error!(error = %err.as_report(), "recovery failed");
1017 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
1018 EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
1019 )]);
1020 GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
1021 }))
1022 .instrument(tracing::info_span!("recovery_attempt"));
1023
1024 let mut recover_txs = vec![];
1025 let mut update_barrier_requests = vec![];
1026 pin_mut!(recovery_future);
1027 let mut request_rx_closed = false;
1028 let new_state = loop {
1029 select! {
1030 biased;
1031 new_state = &mut recovery_future => {
1032 break new_state.expect("Retry until recovery success.");
1033 }
1034 request = pin!(self.request_rx.recv()), if !request_rx_closed => {
1035 let Some(request) = request else {
1036 warn!("request rx channel closed during recovery");
1037 request_rx_closed = true;
1038 continue;
1039 };
1040 match request {
1041 BarrierManagerRequest::GetBackfillProgress(tx) => {
1042 let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1043 }
1044 BarrierManagerRequest::GetFragmentBackfillProgress(tx) => {
1045 let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1046 }
1047 BarrierManagerRequest::GetCdcProgress(tx) => {
1048 let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1049 }
1050 BarrierManagerRequest::AdhocRecovery(tx) => {
1051 recover_txs.push(tx);
1052 }
1053 BarrierManagerRequest::UpdateDatabaseBarrier(request) => {
1054 update_barrier_requests.push(request);
1055 }
1056 BarrierManagerRequest::MayHaveSnapshotBackfillingJob(tx) => {
1057 let _ = tx.send(true);
1059 }
1060 }
1061 }
1062 }
1063 };
1064
1065 let duration = recovery_timer.stop_and_record();
1066
1067 (
1068 self.active_streaming_nodes,
1069 self.control_stream_manager,
1070 self.checkpoint_control,
1071 self.term_id,
1072 self.periodic_barriers,
1073 ) = new_state;
1074
1075 tracing::info!("recovery success");
1076
1077 for UpdateDatabaseBarrierRequest {
1078 database_id,
1079 barrier_interval_ms,
1080 checkpoint_frequency,
1081 sender,
1082 } in update_barrier_requests
1083 {
1084 self.periodic_barriers.update_database_barrier(
1085 database_id,
1086 barrier_interval_ms,
1087 checkpoint_frequency,
1088 );
1089 let _ = sender.send(());
1090 }
1091
1092 for tx in recover_txs {
1093 let _ = tx.send(());
1094 }
1095
1096 let recovering_databases = self
1097 .checkpoint_control
1098 .recovering_databases()
1099 .map(|database| database.as_raw_id())
1100 .collect_vec();
1101 let running_databases = self
1102 .checkpoint_control
1103 .running_databases()
1104 .map(|database| database.as_raw_id())
1105 .collect_vec();
1106
1107 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
1108 EventRecovery::global_recovery_success(
1109 recovery_reason.clone(),
1110 duration as f32,
1111 running_databases,
1112 recovering_databases,
1113 ),
1114 )]);
1115
1116 self.env
1117 .notification_manager()
1118 .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
1119 self.env
1120 .notification_manager()
1121 .notify_compute_without_version(Operation::Update, Info::Recovery(Recovery {}));
1122 }
1123}