1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::pin::pin;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use arc_swap::ArcSwap;
24use futures::{TryFutureExt, pin_mut};
25use itertools::Itertools;
26use risingwave_common::catalog::DatabaseId;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::system_param::{AdaptiveParallelismStrategy, PAUSE_ON_NEXT_BOOTSTRAP_KEY};
29use risingwave_meta_model::WorkerId;
30use risingwave_pb::common::WorkerNode;
31use risingwave_pb::meta::Recovery;
32use risingwave_pb::meta::subscribe_response::{Info, Operation};
33use thiserror_ext::AsReport;
34use tokio::select;
35use tokio::sync::mpsc;
36use tokio::sync::oneshot::{Receiver, Sender};
37use tokio::task::JoinHandle;
38use tonic::Status;
39use tracing::{Instrument, debug, error, info, warn};
40
41use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
42use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
43use crate::barrier::context::recovery::{RenderedDatabaseRuntimeInfo, render_runtime_info};
44use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
45use crate::barrier::info::InflightDatabaseInfo;
46use crate::barrier::rpc::{
47 DatabaseInitialBarrierCollector, database_partial_graphs, from_partial_graph_id,
48 merge_node_rpc_errors,
49};
50use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
51use crate::barrier::{
52 BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command,
53 RecoveryReason, RescheduleContext, UpdateDatabaseBarrierRequest, schedule,
54};
55use crate::controller::scale::render_actor_assignments;
56use crate::error::MetaErrorInner;
57use crate::hummock::HummockManagerRef;
58use crate::manager::sink_coordination::SinkCoordinatorManager;
59use crate::manager::{
60 ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
61 MetadataManager,
62};
63use crate::rpc::metrics::GLOBAL_META_METRICS;
64use crate::stream::{
65 GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef, build_reschedule_commands,
66};
67use crate::{MetaError, MetaResult};
68
69pub(super) struct GlobalBarrierWorker<C> {
79 enable_recovery: bool,
81
82 periodic_barriers: PeriodicBarriers,
84
85 system_enable_per_database_isolation: bool,
87
88 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
89
90 pub(super) context: Arc<C>,
91
92 env: MetaSrvEnv,
93
94 checkpoint_control: CheckpointControl,
95
96 completing_task: CompletingTask,
99
100 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
101
102 active_streaming_nodes: ActiveStreamingWorkerNodes,
103
104 partial_graph_manager: PartialGraphManager,
105}
106
107#[cfg(test)]
108mod tests {
109 use std::collections::HashMap;
110
111 use tokio::sync::oneshot;
112
113 use super::*;
114 use crate::barrier::RescheduleContext;
115 use crate::barrier::notifier::Notifier;
116
117 #[tokio::test]
118 async fn test_reschedule_intent_without_workers_notifies_start_failed() {
119 let env = MetaSrvEnv::for_test().await;
120 let database_id = DatabaseId::new(1);
121 let database_info =
122 InflightDatabaseInfo::empty(database_id, env.shared_actor_infos().clone());
123 let (started_tx, started_rx) = oneshot::channel();
124 let (_collected_tx, _collected_rx) = oneshot::channel();
125
126 let notifier = Notifier {
127 started: Some(started_tx),
128 collected: Some(_collected_tx),
129 };
130
131 let new_barrier = schedule::NewBarrier {
132 database_id,
133 command: Some((
134 Command::RescheduleIntent {
135 context: RescheduleContext::empty(),
136 reschedule_plan: None,
137 },
138 vec![notifier],
139 )),
140 span: tracing::Span::none(),
141 checkpoint: false,
142 };
143
144 let result = resolve_reschedule_intent(
145 env,
146 HashMap::new(),
147 AdaptiveParallelismStrategy::default(),
148 Some(&database_info),
149 new_barrier,
150 );
151
152 assert!(matches!(result, Ok(None)));
153 let started = started_rx.await.expect("started notifier dropped");
154 assert!(started.is_err());
155 }
156}
157
158impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
159 pub(super) async fn new_inner(
160 env: MetaSrvEnv,
161 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
162 context: Arc<C>,
163 ) -> Self {
164 let enable_recovery = env.opts.enable_recovery;
165
166 let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized();
167
168 let partial_graph_manager = PartialGraphManager::uninitialized(env.clone());
169
170 let reader = env.system_params_reader().await;
171 let system_enable_per_database_isolation = reader.per_database_isolation();
172 let periodic_barriers = PeriodicBarriers::default();
174 let adaptive_parallelism_strategy = reader.adaptive_parallelism_strategy();
175
176 let checkpoint_control = CheckpointControl::new(env.clone());
177 Self {
178 enable_recovery,
179 periodic_barriers,
180 system_enable_per_database_isolation,
181 adaptive_parallelism_strategy,
182 context,
183 env,
184 checkpoint_control,
185 completing_task: CompletingTask::None,
186 request_rx,
187 active_streaming_nodes,
188 partial_graph_manager,
189 }
190 }
191}
192
193fn resolve_reschedule_intent(
194 env: MetaSrvEnv,
195 worker_nodes: HashMap<WorkerId, WorkerNode>,
196 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
197 database_info: Option<&InflightDatabaseInfo>,
198 mut new_barrier: schedule::NewBarrier,
199) -> MetaResult<Option<schedule::NewBarrier>> {
200 let Some((command, notifiers)) = new_barrier.command.take() else {
201 return Ok(Some(new_barrier));
202 };
203
204 match command {
205 Command::RescheduleIntent {
206 context,
207 reschedule_plan,
208 } => {
209 if let Some(reschedule_plan) = reschedule_plan {
210 new_barrier.command = Some((
211 Command::RescheduleIntent {
212 context,
213 reschedule_plan: Some(reschedule_plan),
214 },
215 notifiers,
216 ));
217 return Ok(Some(new_barrier));
218 }
219 let span = tracing::info_span!(
220 "resolve_reschedule_intent",
221 database_id = %new_barrier.database_id
222 );
223 let reschedule_plan = {
224 let _guard = span.enter();
225 build_reschedule_from_context(
226 &env,
227 worker_nodes,
228 adaptive_parallelism_strategy,
229 new_barrier.database_id,
230 context,
231 database_info.ok_or_else(|| {
232 anyhow!(
233 "database {} not found when resolving reschedule intent",
234 new_barrier.database_id
235 )
236 })?,
237 )
238 };
239 match reschedule_plan {
240 Ok(Some(reschedule_plan)) => {
241 new_barrier.command = Some((
242 Command::RescheduleIntent {
243 context: RescheduleContext::empty(),
244 reschedule_plan: Some(reschedule_plan),
245 },
246 notifiers,
247 ));
248 Ok(Some(new_barrier))
249 }
250 Ok(None) => {
251 for mut notifier in notifiers {
253 notifier.notify_started();
254 notifier.notify_collected();
255 }
256 Ok(None)
257 }
258 Err(err) => {
259 for notifier in notifiers {
260 notifier.notify_start_failed(err.clone());
261 }
262 Ok(None)
263 }
264 }
265 }
266 _ => {
267 new_barrier.command = Some((command, notifiers));
268 Ok(Some(new_barrier))
269 }
270 }
271}
272
273fn build_reschedule_from_context(
274 env: &MetaSrvEnv,
275 worker_nodes: HashMap<WorkerId, WorkerNode>,
276 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
277 database_id: DatabaseId,
278 context: RescheduleContext,
279 database_info: &InflightDatabaseInfo,
280) -> MetaResult<Option<crate::barrier::ReschedulePlan>> {
281 if worker_nodes.is_empty() {
282 return Err(anyhow!("no active streaming workers for reschedule").into());
283 }
284
285 let actor_id_counter = env.actor_id_generator();
286 if context.is_empty() {
287 return Ok(None);
288 }
289
290 let rendered = render_actor_assignments(
291 actor_id_counter,
292 &worker_nodes,
293 adaptive_parallelism_strategy,
294 &context.loaded,
295 )?;
296
297 let all_prev_fragments = database_info
298 .fragment_infos()
299 .map(|fragment| (fragment.fragment_id, fragment))
300 .collect();
301 let mut commands = build_reschedule_commands(rendered.fragments, context, all_prev_fragments)?;
302 Ok(commands.remove(&database_id))
303}
304
305impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
306 pub async fn new(
308 scheduled_barriers: schedule::ScheduledBarriers,
309 env: MetaSrvEnv,
310 metadata_manager: MetadataManager,
311 hummock_manager: HummockManagerRef,
312 source_manager: SourceManagerRef,
313 sink_manager: SinkCoordinatorManager,
314 scale_controller: ScaleControllerRef,
315 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
316 barrier_scheduler: schedule::BarrierScheduler,
317 refresh_manager: GlobalRefreshManagerRef,
318 ) -> Self {
319 let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
320
321 let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
322 scheduled_barriers,
323 status,
324 metadata_manager,
325 hummock_manager,
326 source_manager,
327 scale_controller,
328 env.clone(),
329 barrier_scheduler,
330 refresh_manager,
331 sink_manager,
332 ));
333
334 Self::new_inner(env, request_rx, context).await
335 }
336
337 pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
338 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
339 let fut = (self.env.await_tree_reg())
340 .register_derived_root("Global Barrier Worker")
341 .instrument(self.run(shutdown_rx));
342 let join_handle = tokio::spawn(fut);
343
344 (join_handle, shutdown_tx)
345 }
346
347 async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
349 let paused = self
350 .env
351 .system_params_reader()
352 .await
353 .pause_on_next_bootstrap()
354 || self.env.opts.pause_on_next_bootstrap_offline;
355
356 if paused {
357 warn!(
358 "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
359 It will now be reset to `false`. \
360 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
361 PAUSE_ON_NEXT_BOOTSTRAP_KEY
362 );
363 self.env
364 .system_params_manager_impl_ref()
365 .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
366 .await?;
367 }
368 Ok(paused)
369 }
370
371 async fn run(mut self, shutdown_rx: Receiver<()>) {
373 tracing::info!(
374 "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
375 self.enable_recovery,
376 self.checkpoint_control.in_flight_barrier_nums,
377 );
378
379 if !self.enable_recovery {
380 let job_exist = self
381 .context
382 .metadata_manager
383 .catalog_controller
384 .has_any_streaming_jobs()
385 .await
386 .unwrap();
387 if job_exist {
388 panic!(
389 "Some streaming jobs already exist in meta, please start with recovery enabled \
390 or clean up the metadata using `./risedev clean-data`"
391 );
392 }
393 }
394
395 {
396 let span = tracing::info_span!("bootstrap_recovery");
401 crate::telemetry::report_event(
402 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
403 "normal_recovery",
404 0,
405 None,
406 None,
407 None,
408 );
409
410 let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
411
412 self.recovery(paused, RecoveryReason::Bootstrap)
413 .instrument(span)
414 .await;
415 }
416
417 Box::pin(self.run_inner(shutdown_rx)).await
418 }
419}
420
421impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
422 fn enable_per_database_isolation(&self) -> bool {
423 self.system_enable_per_database_isolation && {
424 if let Err(e) =
425 risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()
426 {
427 warn!(error = %e.as_report(), "DatabaseFailureIsolation disabled by license");
428 false
429 } else {
430 true
431 }
432 }
433 }
434
435 pub(super) async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
436 let (local_notification_tx, mut local_notification_rx) =
437 tokio::sync::mpsc::unbounded_channel();
438 self.env
439 .notification_manager()
440 .insert_local_sender(local_notification_tx);
441
442 loop {
444 tokio::select! {
445 biased;
446
447 _ = &mut shutdown_rx => {
449 tracing::info!("Barrier manager is stopped");
450 break;
451 }
452
453 request = self.request_rx.recv() => {
454 if let Some(request) = request {
455 match request {
456 BarrierManagerRequest::GetBackfillProgress(result_tx) => {
457 let progress = self.checkpoint_control.gen_backfill_progress();
458 if result_tx.send(Ok(progress)).is_err() {
459 error!("failed to send get ddl progress");
460 }
461 }
462 BarrierManagerRequest::GetFragmentBackfillProgress(result_tx) => {
463 let progress =
464 self.checkpoint_control.gen_fragment_backfill_progress();
465 if result_tx.send(Ok(progress)).is_err() {
466 error!("failed to send get fragment backfill progress");
467 }
468 }
469 BarrierManagerRequest::GetCdcProgress(result_tx) => {
470 let progress = self.checkpoint_control.gen_cdc_progress();
471 if result_tx.send(Ok(progress)).is_err() {
472 error!("failed to send get ddl progress");
473 }
474 }
475 BarrierManagerRequest::AdhocRecovery(sender) => {
477 self.adhoc_recovery().await;
478 if sender.send(()).is_err() {
479 warn!("failed to notify finish of adhoc recovery");
480 }
481 }
482 BarrierManagerRequest::UpdateDatabaseBarrier( UpdateDatabaseBarrierRequest {
483 database_id,
484 barrier_interval_ms,
485 checkpoint_frequency,
486 sender,
487 }) => {
488 self.periodic_barriers
489 .update_database_barrier(
490 database_id,
491 barrier_interval_ms,
492 checkpoint_frequency,
493 );
494 if sender.send(()).is_err() {
495 warn!("failed to notify finish of update database barrier");
496 }
497 }
498 BarrierManagerRequest::MayHaveSnapshotBackfillingJob(tx) => {
499 if tx.send(self.checkpoint_control.may_have_snapshot_backfilling_jobs()).is_err() {
500 warn!("failed to may have snapshot backfill job");
501 }
502 }
503 }
504 } else {
505 tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
506 return;
507 }
508 }
509
510 changed_worker = self.active_streaming_nodes.changed() => {
511 #[cfg(debug_assertions)]
512 {
513 self.active_streaming_nodes.validate_change().await;
514 }
515
516 info!(?changed_worker, "worker changed");
517
518 match changed_worker {
519 ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) => {
520 self.partial_graph_manager.add_worker(node, &*self.context).await;
521 }
522 ActiveStreamingWorkerChange::Remove(node) => {
523 self.partial_graph_manager.remove_worker(node);
524 }
525 }
526 }
527
528 notification = local_notification_rx.recv() => {
529 let notification = notification.unwrap();
530 if let LocalNotification::SystemParamsChange(p) = notification {
531 {
532 self.periodic_barriers.set_sys_barrier_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
533 self.periodic_barriers
534 .set_sys_checkpoint_frequency(p.checkpoint_frequency());
535 self.system_enable_per_database_isolation = p.per_database_isolation();
536 self.adaptive_parallelism_strategy = p.adaptive_parallelism_strategy();
537 }
538 }
539 }
540 complete_result = self
541 .completing_task
542 .next_completed_barrier(
543 &mut self.periodic_barriers,
544 &mut self.checkpoint_control,
545 &mut self.partial_graph_manager,
546 &self.context,
547 &self.env,
548 ) => {
549 match complete_result {
550 Ok(output) => {
551 self.checkpoint_control.ack_completed(output);
552 }
553 Err(e) => {
554 self.failure_recovery(e).await;
555 }
556 }
557 },
558 event = self.checkpoint_control.next_event() => {
559 let result: MetaResult<()> = try {
560 match event {
561 CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
562 let database_id = entering_initializing.database_id();
563 let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
564 resp.root_err.as_ref().map(|root_err| {
565 (*worker_id, ScoredError {
566 error: Status::internal(&root_err.err_msg),
567 score: Score(root_err.score)
568 })
569 })
570 }));
571 Self::report_collect_failure(&self.env, &error);
572 self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
573 let result: MetaResult<_> = try {
574 let runtime_info = self.context.reload_database_runtime_info(database_id).await.inspect_err(|err| {
575 warn!(%database_id, err = %err.as_report(), "reload runtime info failed");
576 })?;
577 let rendered_info = render_runtime_info(
578 self.env.actor_id_generator(),
579 &self.active_streaming_nodes,
580 self.adaptive_parallelism_strategy,
581 &runtime_info.recovery_context,
582 database_id,
583 )
584 .inspect_err(|err: &MetaError| {
585 warn!(%database_id, err = %err.as_report(), "render runtime info failed");
586 })?;
587 if let Some(rendered_info) = rendered_info {
588 BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
589 database_id,
590 &rendered_info.job_infos,
591 &self.active_streaming_nodes,
592 &rendered_info.stream_actors,
593 &runtime_info.state_table_committed_epochs,
594 )
595 .inspect_err(|err| {
596 warn!(%database_id, err = ?err.as_report(), "database runtime info failed validation");
597 })?;
598 Some((runtime_info, rendered_info))
599 } else {
600 None
601 }
602 };
603 match result {
604 Ok(Some((runtime_info, rendered_info))) => {
605 entering_initializing.enter(
606 runtime_info,
607 rendered_info,
608 &mut self.partial_graph_manager,
609 );
610 }
611 Ok(None) => {
612 info!(%database_id, "database removed after reloading empty runtime info");
613 self.context.mark_ready(MarkReadyOptions::Database(database_id));
615 entering_initializing.remove();
616 }
617 Err(e) => {
618 entering_initializing.fail_reload_runtime_info(e);
619 }
620 }
621 }
622 CheckpointControlEvent::EnteringRunning(entering_running) => {
623 self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
624 entering_running.enter();
625 }
626 }
627 };
628 if let Err(e) = result {
629 self.failure_recovery(e).await;
630 }
631 }
632 event = self.partial_graph_manager.next_event(&self.context) => {
633 let result: MetaResult<()> = try {
634 match event {
635 PartialGraphManagerEvent::Worker(_worker_id, WorkerEvent::WorkerConnected) => {
636 }
638 PartialGraphManagerEvent::Worker(worker_id, WorkerEvent::WorkerError { err, affected_partial_graphs }) => {
639 let failed_databases = self
640 .checkpoint_control
641 .databases_failed_at_worker_err(worker_id)
642 .chain(
643 affected_partial_graphs
644 .into_iter()
645 .map(|partial_graph_id| {
646 let (database_id, _) = from_partial_graph_id(partial_graph_id);
647 database_id
648 })
649 )
650 .collect::<HashSet<_>>();
651 if !failed_databases.is_empty() {
652 if !self.enable_recovery {
653 panic!("control stream to worker {} failed but recovery not enabled: {}", worker_id, err.as_report());
654 }
655 if !self.enable_per_database_isolation() {
656 Err(err.clone())?;
657 }
658 Self::report_collect_failure(&self.env, &err);
659 for database_id in failed_databases {
660 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
661 warn!(%worker_id, %database_id, "database entering recovery on node failure");
662 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
663 self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
664 let output = self.completing_task.wait_completing_task().await?;
666 entering_recovery.enter(output, &mut self.partial_graph_manager);
667 }
668 }
669 } else {
670 warn!(%worker_id, "no barrier to collect from worker, ignore err");
671 }
672 continue;
673 }
674 PartialGraphManagerEvent::PartialGraph(partial_graph_id, event) => {
675 let (database_id, _creating_job_id) = from_partial_graph_id(partial_graph_id);
676 match event {
677 PartialGraphEvent::BarrierCollected(collected_barrier) => {
678 self.checkpoint_control.barrier_collected(partial_graph_id, collected_barrier, &mut self.periodic_barriers)?;
679 }
680 PartialGraphEvent::Error(worker_id) => {
681 if !self.enable_recovery {
682 panic!("database {database_id} failure reported from {worker_id} but recovery not enabled")
683 }
684 if !self.enable_per_database_isolation() {
685 Err(anyhow!("database {database_id} report failure from {worker_id}"))?;
686 }
687 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
688 warn!(%database_id, "database entering recovery");
689 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
690 let output = self.completing_task.wait_completing_task().await?;
692 entering_recovery.enter(output, &mut self.partial_graph_manager);
693 }
694 }
695 PartialGraphEvent::Reset(reset_resps) => {
696 self.checkpoint_control.on_partial_graph_reset(partial_graph_id, reset_resps);
697 }
698 PartialGraphEvent::Initialized => {
699 self.checkpoint_control.on_partial_graph_initialized(partial_graph_id);
700 }
701 }
702 }
703 };
704 };
705 if let Err(e) = result {
706 self.failure_recovery(e).await;
707 }
708 }
709 new_barrier = self.periodic_barriers.next_barrier(&*self.context) => {
710 let database_id = new_barrier.database_id;
711 let new_barrier = if matches!(
712 new_barrier.command,
713 Some((Command::RescheduleIntent { .. }, _))
714 ) {
715 let env = self.env.clone();
716 let worker_nodes = self
717 .active_streaming_nodes
718 .current()
719 .iter()
720 .map(|(worker_id, worker)| (*worker_id, worker.clone()))
721 .collect();
722 let adaptive_parallelism_strategy = self.adaptive_parallelism_strategy;
723 let database_info = self.checkpoint_control.database_info(database_id);
724 match resolve_reschedule_intent(
725 env,
726 worker_nodes,
727 adaptive_parallelism_strategy,
728 database_info,
729 new_barrier,
730 ) {
731 Ok(Some(new_barrier)) => new_barrier,
732 Ok(None) => continue,
733 Err(err) => {
734 self.failure_recovery(err).await;
735 continue;
736 }
737 }
738 } else {
739 new_barrier
740 };
741 if let Err(e) = self.checkpoint_control.handle_new_barrier(new_barrier, &mut self.partial_graph_manager) {
742 if !self.enable_recovery {
743 panic!(
744 "failed to inject barrier to some databases but recovery not enabled: {:?}", (
745 database_id,
746 e.as_report()
747 )
748 );
749 }
750 let result: MetaResult<_> = try {
751 if !self.enable_per_database_isolation() {
752 let err = anyhow!("failed to inject barrier to databases: {:?}", (database_id, e.as_report()));
753 Err(err)?;
754 } else if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
755 warn!(%database_id, e = %e.as_report(),"database entering recovery on inject failure");
756 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(e).context("inject barrier failure").into()));
757 let output = self.completing_task.wait_completing_task().await?;
759 entering_recovery.enter(output, &mut self.partial_graph_manager);
760 }
761 };
762 if let Err(e) = result {
763 self.failure_recovery(e).await;
764 }
765 }
766 }
767 }
768 self.checkpoint_control.update_barrier_nums_metrics();
769 }
770 }
771}
772
773impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
774 pub async fn clear_on_err(&mut self, err: &MetaError) {
776 let is_err = match replace(&mut self.completing_task, CompletingTask::None) {
778 CompletingTask::None => false,
779 CompletingTask::Completing {
780 epochs_to_ack,
781 join_handle,
782 ..
783 } => {
784 info!("waiting for completing command to finish in recovery");
785 match join_handle.await {
786 Err(e) => {
787 warn!(err = %e.as_report(), "failed to join completing task");
788 true
789 }
790 Ok(Err(e)) => {
791 warn!(
792 err = %e.as_report(),
793 "failed to complete barrier during clear"
794 );
795 true
796 }
797 Ok(Ok(hummock_version_stats)) => {
798 self.checkpoint_control
799 .ack_completed(BarrierCompleteOutput {
800 epochs_to_ack,
801 hummock_version_stats,
802 });
803 false
804 }
805 }
806 }
807 CompletingTask::Err(_) => true,
808 };
809 if !is_err {
810 while let Some(task) = self.checkpoint_control.next_complete_barrier_task(None) {
812 let epochs_to_ack = task.epochs_to_ack();
813 match task
814 .complete_barrier(&*self.context, self.env.clone())
815 .await
816 {
817 Ok(hummock_version_stats) => {
818 self.checkpoint_control
819 .ack_completed(BarrierCompleteOutput {
820 epochs_to_ack,
821 hummock_version_stats,
822 });
823 }
824 Err(e) => {
825 error!(
826 err = %e.as_report(),
827 "failed to complete barrier during recovery"
828 );
829 break;
830 }
831 }
832 }
833 }
834 self.checkpoint_control.clear_on_err(err);
835 }
836}
837
838impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
839 async fn failure_recovery(&mut self, err: MetaError) {
841 self.clear_on_err(&err).await;
842
843 if self.enable_recovery {
844 let span = tracing::info_span!(
845 "failure_recovery",
846 error = %err.as_report(),
847 );
848
849 crate::telemetry::report_event(
850 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
851 "failure_recovery",
852 0,
853 None,
854 None,
855 None,
856 );
857
858 let reason = RecoveryReason::Failover(err);
859
860 self.recovery(false, reason).instrument(span).await;
863 } else {
864 panic!(
865 "a streaming error occurred while recovery is disabled, aborting: {:?}",
866 err.as_report()
867 );
868 }
869 }
870
871 async fn adhoc_recovery(&mut self) {
872 let err = MetaErrorInner::AdhocRecovery.into();
873 self.clear_on_err(&err).await;
874
875 let span = tracing::info_span!(
876 "adhoc_recovery",
877 error = %err.as_report(),
878 );
879
880 crate::telemetry::report_event(
881 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
882 "adhoc_recovery",
883 0,
884 None,
885 None,
886 None,
887 );
888
889 self.recovery(false, RecoveryReason::Adhoc)
892 .instrument(span)
893 .await;
894 }
895}
896
897impl<C> GlobalBarrierWorker<C> {
898 pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
900 use risingwave_pb::meta::event_log;
902 let event = event_log::EventCollectBarrierFail {
903 error: error.to_report_string(),
904 };
905 env.event_log_manager_ref()
906 .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
907 }
908}
909
910mod retry_strategy {
911 use std::time::Duration;
912
913 use tokio_retry::strategy::{ExponentialBackoff, jitter};
914
915 const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
917 const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
919
920 pub(crate) type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
939
940 pub(crate) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
941 Box::pin(tokio::time::sleep(duration))
942 }
943
944 pub(crate) type RetryBackoffStrategy =
945 impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
946
947 #[inline(always)]
949 pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> + Send + 'static {
950 ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
951 .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
952 .map(jitter)
953 }
954
955 #[define_opaque(RetryBackoffStrategy)]
956 pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
957 get_retry_strategy().map(get_retry_backoff_future)
958 }
959}
960
961pub(crate) use retry_strategy::*;
962use risingwave_common::error::tonic::extra::{Score, ScoredError};
963use risingwave_pb::meta::event_log::{Event, EventRecovery};
964
965use crate::barrier::partial_graph::{
966 PartialGraphEvent, PartialGraphManager, PartialGraphManagerEvent, WorkerEvent,
967};
968
969impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
970 pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
978 self.partial_graph_manager.clear_worker();
980
981 let reason_str = match &recovery_reason {
982 RecoveryReason::Bootstrap => "bootstrap".to_owned(),
983 RecoveryReason::Failover(err) => {
984 format!("failed over: {}", err.as_report())
985 }
986 RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
987 };
988 self.context.abort_and_mark_blocked(None, recovery_reason);
989
990 self.recovery_inner(is_paused, reason_str).await;
991 self.context.mark_ready(MarkReadyOptions::Global {
992 blocked_databases: self.checkpoint_control.recovering_databases().collect(),
993 });
994 }
995
996 #[await_tree::instrument("recovery({recovery_reason})")]
997 async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
998 let event_log_manager_ref = self.env.event_log_manager_ref();
999
1000 tracing::info!("recovery start!");
1001 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
1002 EventRecovery::global_recovery_start(recovery_reason.clone()),
1003 )]);
1004
1005 let retry_strategy = get_retry_strategy();
1006
1007 let recovery_timer = GLOBAL_META_METRICS
1010 .recovery_latency
1011 .with_label_values(&["global"])
1012 .start_timer();
1013
1014 let enable_per_database_isolation = self.enable_per_database_isolation();
1015
1016 let recovery_future = tokio_retry::Retry::spawn(retry_strategy, || async {
1017 self.env.stream_client_pool().invalidate_all();
1018 self.context
1023 .notify_creating_job_failed(None, recovery_reason.clone())
1024 .await;
1025
1026 let runtime_info_snapshot = self
1027 .context
1028 .reload_runtime_info()
1029 .await?;
1030 let BarrierWorkerRuntimeInfoSnapshot {
1031 active_streaming_nodes,
1032 recovery_context,
1033 mut state_table_committed_epochs,
1034 mut state_table_log_epochs,
1035 mut mv_depended_subscriptions,
1036 mut background_jobs,
1037 hummock_version_stats,
1038 database_infos,
1039 mut cdc_table_snapshot_splits,
1040 } = runtime_info_snapshot;
1041
1042 let mut partial_graph_manager = PartialGraphManager::recover(
1043 self.env.clone(),
1044 active_streaming_nodes.current(),
1045 self.context.clone(),
1046 )
1047 .await;
1048 {
1049 let mut empty_databases = HashSet::new();
1050 let mut collected_databases = HashMap::new();
1051 let mut collecting_databases = HashMap::new();
1052 let mut failed_databases = HashMap::new();
1053 for &database_id in recovery_context.fragment_context.database_map.keys() {
1054 let mut recoverer = partial_graph_manager.start_recover();
1055 let result: MetaResult<_> = try {
1056 let Some(rendered_info) = render_runtime_info(
1057 self.env.actor_id_generator(),
1058 &active_streaming_nodes,
1059 self.adaptive_parallelism_strategy,
1060 &recovery_context,
1061 database_id,
1062 )
1063 .inspect_err(|err: &MetaError| {
1064 warn!(%database_id, err = %err.as_report(), "render runtime info failed");
1065 })? else {
1066 empty_databases.insert(database_id);
1067 continue;
1068 };
1069 BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
1070 database_id,
1071 &rendered_info.job_infos,
1072 &active_streaming_nodes,
1073 &rendered_info.stream_actors,
1074 &state_table_committed_epochs,
1075 )
1076 .inspect_err(|err| {
1077 warn!(%database_id, err = %err.as_report(), "rendered runtime info failed validation");
1078 })?;
1079 let RenderedDatabaseRuntimeInfo {
1080 job_infos,
1081 stream_actors,
1082 mut source_splits,
1083 } = rendered_info;
1084 recoverer.inject_database_initial_barrier(
1085 database_id,
1086 job_infos,
1087 &recovery_context.job_extra_info,
1088 &mut state_table_committed_epochs,
1089 &mut state_table_log_epochs,
1090 &recovery_context.fragment_relations,
1091 &stream_actors,
1092 &mut source_splits,
1093 &mut background_jobs,
1094 &mut mv_depended_subscriptions,
1095 is_paused,
1096 &hummock_version_stats,
1097 &mut cdc_table_snapshot_splits,
1098 )?
1099 };
1100 let collector = match result {
1101 Ok(database) => {
1102 DatabaseInitialBarrierCollector {
1103 database_id,
1104 initializing_partial_graphs: recoverer.all_initializing(),
1105 database,
1106 }
1107 }
1108 Err(e) => {
1109 warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
1110 assert!(failed_databases.insert(database_id, recoverer.failed()).is_none(), "non-duplicate");
1111 continue;
1112 }
1113 };
1114 if !collector.is_collected() {
1115 assert!(collecting_databases.insert(database_id, collector).is_none());
1116 } else {
1117 warn!(%database_id, "database has no node to inject initial barrier");
1118 assert!(collected_databases.insert(database_id, collector.finish()).is_none());
1119 }
1120 }
1121 if !empty_databases.is_empty() {
1122 info!(?empty_databases, "empty database in global recovery");
1123 }
1124 while !collecting_databases.is_empty() {
1125 match partial_graph_manager.next_event(&self.context).await {
1126 PartialGraphManagerEvent::Worker(_, WorkerEvent::WorkerConnected) => {
1127 }
1129 PartialGraphManagerEvent::Worker(worker_id, WorkerEvent::WorkerError { err, affected_partial_graphs }) => {
1130 let affected_databases: HashSet<_> = affected_partial_graphs.into_iter().map(|partial_graph_id| {
1131 let (database_id, _) = from_partial_graph_id(partial_graph_id);
1132 database_id
1133 }).collect();
1134 warn!(%worker_id, err = %err.as_report(), "worker node failure during recovery");
1135 for (failed_database_id, collector) in collecting_databases.extract_if(|database_id, collector| {
1136 !collector.is_valid_after_worker_err(worker_id) || affected_databases.contains(database_id)
1137 }) {
1138 warn!(%failed_database_id, %worker_id, "database failed to recovery in global recovery due to worker node err");
1139 let resetting_partial_graphs: HashSet<_> = collector.all_partial_graphs().collect();
1140 partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1141 assert!(failed_databases.insert(failed_database_id, resetting_partial_graphs).is_none());
1142 }
1143 }
1144 PartialGraphManagerEvent::PartialGraph(partial_graph_id, event) => {
1145 match event {
1146 PartialGraphEvent::BarrierCollected(_) => {
1147 unreachable!("no barrier collected event on initializing")
1148 }
1149 PartialGraphEvent::Reset(_) => {
1150 unreachable!("no partial graph reset on initializing")
1151 }
1152 PartialGraphEvent::Error(worker_id) => {
1153 let (database_id, _) = from_partial_graph_id(partial_graph_id);
1154 if let Some(collector) = collecting_databases.remove(&database_id) {
1155 warn!(%database_id, %worker_id, "database reset during global recovery");
1156 let resetting_partial_graphs: HashSet<_> = collector.all_partial_graphs().collect();
1157 partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1158 assert!(failed_databases.insert(database_id, resetting_partial_graphs).is_none());
1159 } else if let Some(database) = collected_databases.remove(&database_id) {
1160 warn!(%database_id, %worker_id, "database initialized but later reset during global recovery");
1161 let resetting_partial_graphs: HashSet<_> = database_partial_graphs(database_id, database.creating_streaming_job_controls.keys().copied()).collect();
1162 partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1163 assert!(failed_databases.insert(database_id, resetting_partial_graphs).is_none());
1164 } else {
1165 assert!(failed_databases.contains_key(&database_id));
1166 }
1167 }
1168 PartialGraphEvent::Initialized => {
1169 let (database_id, _) = from_partial_graph_id(partial_graph_id);
1170 if failed_databases.contains_key(&database_id) {
1171 assert!(!collecting_databases.contains_key(&database_id));
1172 continue;
1174 }
1175 let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
1176 unreachable!("should exist")
1177 };
1178 let collector = entry.get_mut();
1179 collector.partial_graph_initialized(partial_graph_id);
1180 if collector.is_collected() {
1181 let collector = entry.remove();
1182 assert!(collected_databases.insert(database_id, collector.finish()).is_none());
1183 }
1184 }
1185 }
1186 }
1187 }
1188 }
1189 debug!("collected initial barrier");
1190 if !background_jobs.is_empty() {
1191 warn!(job_ids = ?background_jobs.iter().collect_vec(), "unused recovered background mview in recovery");
1192 }
1193 if !mv_depended_subscriptions.is_empty() {
1194 warn!(?mv_depended_subscriptions, "unused subscription infos in recovery");
1195 }
1196 if !state_table_committed_epochs.is_empty() {
1197 warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
1198 }
1199 if !enable_per_database_isolation && !failed_databases.is_empty() {
1200 return Err(anyhow!(
1201 "global recovery failed due to failure of databases {:?}",
1202 failed_databases.keys().collect_vec()).into()
1203 );
1204 }
1205 let checkpoint_control = CheckpointControl::recover(
1206 collected_databases,
1207 failed_databases,
1208 hummock_version_stats,
1209 self.env.clone(),
1210 );
1211
1212 let reader = self.env.system_params_reader().await;
1213 let checkpoint_frequency = reader.checkpoint_frequency();
1214 let barrier_interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
1215 let periodic_barriers = PeriodicBarriers::new(
1216 barrier_interval,
1217 checkpoint_frequency,
1218 database_infos,
1219 );
1220
1221 Ok((
1222 active_streaming_nodes,
1223 partial_graph_manager,
1224 checkpoint_control,
1225 periodic_barriers,
1226 ))
1227 }
1228 }.inspect_err(|err: &MetaError| {
1229 tracing::error!(error = %err.as_report(), "recovery failed");
1230 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
1231 EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
1232 )]);
1233 GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
1234 }))
1235 .instrument(tracing::info_span!("recovery_attempt"));
1236
1237 let mut recover_txs = vec![];
1238 let mut update_barrier_requests = vec![];
1239 pin_mut!(recovery_future);
1240 let mut request_rx_closed = false;
1241 let new_state = loop {
1242 select! {
1243 biased;
1244 new_state = &mut recovery_future => {
1245 break new_state.expect("Retry until recovery success.");
1246 }
1247 request = pin!(self.request_rx.recv()), if !request_rx_closed => {
1248 let Some(request) = request else {
1249 warn!("request rx channel closed during recovery");
1250 request_rx_closed = true;
1251 continue;
1252 };
1253 match request {
1254 BarrierManagerRequest::GetBackfillProgress(tx) => {
1255 let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1256 }
1257 BarrierManagerRequest::GetFragmentBackfillProgress(tx) => {
1258 let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1259 }
1260 BarrierManagerRequest::GetCdcProgress(tx) => {
1261 let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1262 }
1263 BarrierManagerRequest::AdhocRecovery(tx) => {
1264 recover_txs.push(tx);
1265 }
1266 BarrierManagerRequest::UpdateDatabaseBarrier(request) => {
1267 update_barrier_requests.push(request);
1268 }
1269 BarrierManagerRequest::MayHaveSnapshotBackfillingJob(tx) => {
1270 let _ = tx.send(true);
1272 }
1273 }
1274 }
1275 }
1276 };
1277
1278 let duration = recovery_timer.stop_and_record();
1279
1280 (
1281 self.active_streaming_nodes,
1282 self.partial_graph_manager,
1283 self.checkpoint_control,
1284 self.periodic_barriers,
1285 ) = new_state;
1286
1287 tracing::info!("recovery success");
1288
1289 for UpdateDatabaseBarrierRequest {
1290 database_id,
1291 barrier_interval_ms,
1292 checkpoint_frequency,
1293 sender,
1294 } in update_barrier_requests
1295 {
1296 self.periodic_barriers.update_database_barrier(
1297 database_id,
1298 barrier_interval_ms,
1299 checkpoint_frequency,
1300 );
1301 let _ = sender.send(());
1302 }
1303
1304 for tx in recover_txs {
1305 let _ = tx.send(());
1306 }
1307
1308 let recovering_databases = self
1309 .checkpoint_control
1310 .recovering_databases()
1311 .map(|database| database.as_raw_id())
1312 .collect_vec();
1313 let running_databases = self
1314 .checkpoint_control
1315 .running_databases()
1316 .map(|database| database.as_raw_id())
1317 .collect_vec();
1318
1319 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
1320 EventRecovery::global_recovery_success(
1321 recovery_reason.clone(),
1322 duration as f32,
1323 running_databases,
1324 recovering_databases,
1325 ),
1326 )]);
1327
1328 self.env
1329 .notification_manager()
1330 .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
1331 self.env
1332 .notification_manager()
1333 .notify_compute_without_version(Operation::Update, Info::Recovery(Recovery {}));
1334 }
1335}