1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::pin::pin;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use arc_swap::ArcSwap;
24use futures::{TryFutureExt, pin_mut};
25use itertools::Itertools;
26use risingwave_common::catalog::DatabaseId;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::system_param::{AdaptiveParallelismStrategy, PAUSE_ON_NEXT_BOOTSTRAP_KEY};
29use risingwave_meta_model::WorkerId;
30use risingwave_pb::common::WorkerNode;
31use risingwave_pb::meta::Recovery;
32use risingwave_pb::meta::subscribe_response::{Info, Operation};
33use thiserror_ext::AsReport;
34use tokio::select;
35use tokio::sync::mpsc;
36use tokio::sync::oneshot::{Receiver, Sender};
37use tokio::task::JoinHandle;
38use tonic::Status;
39use tracing::{Instrument, debug, error, info, warn};
40
41use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
42use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
43use crate::barrier::context::recovery::{RenderedDatabaseRuntimeInfo, render_runtime_info};
44use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
45use crate::barrier::info::InflightDatabaseInfo;
46use crate::barrier::rpc::{
47 DatabaseInitialBarrierCollector, database_partial_graphs, from_partial_graph_id,
48 merge_node_rpc_errors,
49};
50use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
51use crate::barrier::{
52 BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command,
53 RecoveryReason, RescheduleContext, UpdateDatabaseBarrierRequest, schedule,
54};
55use crate::controller::scale::{materialize_actor_assignments, preview_actor_assignments};
56use crate::error::MetaErrorInner;
57use crate::hummock::HummockManagerRef;
58use crate::manager::sink_coordination::SinkCoordinatorManager;
59use crate::manager::{
60 ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
61 MetadataManager,
62};
63use crate::rpc::metrics::GLOBAL_META_METRICS;
64use crate::stream::{
65 GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef, build_reschedule_commands,
66 rendered_layout_matches_current,
67};
68use crate::{MetaError, MetaResult};
69
70pub(super) struct GlobalBarrierWorker<C> {
80 enable_recovery: bool,
82
83 periodic_barriers: PeriodicBarriers,
85
86 system_enable_per_database_isolation: bool,
88
89 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
90
91 pub(super) context: Arc<C>,
92
93 env: MetaSrvEnv,
94
95 checkpoint_control: CheckpointControl,
96
97 completing_task: CompletingTask,
100
101 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
102
103 active_streaming_nodes: ActiveStreamingWorkerNodes,
104
105 partial_graph_manager: PartialGraphManager,
106}
107
108#[cfg(test)]
109mod tests {
110 use std::collections::HashMap;
111
112 use tokio::sync::oneshot;
113
114 use super::*;
115 use crate::barrier::RescheduleContext;
116 use crate::barrier::notifier::Notifier;
117
118 #[tokio::test]
119 async fn test_reschedule_intent_without_workers_notifies_start_failed() {
120 let env = MetaSrvEnv::for_test().await;
121 let database_id = DatabaseId::new(1);
122 let database_info =
123 InflightDatabaseInfo::empty(database_id, env.shared_actor_infos().clone());
124 let (started_tx, started_rx) = oneshot::channel();
125 let (_collected_tx, _collected_rx) = oneshot::channel();
126
127 let notifier = Notifier {
128 started: Some(started_tx),
129 collected: Some(_collected_tx),
130 };
131
132 let new_barrier = schedule::NewBarrier {
133 database_id,
134 command: Some((
135 Command::RescheduleIntent {
136 context: RescheduleContext::empty(),
137 reschedule_plan: None,
138 },
139 vec![notifier],
140 )),
141 span: tracing::Span::none(),
142 checkpoint: false,
143 };
144
145 let result = resolve_reschedule_intent(
146 env,
147 HashMap::new(),
148 AdaptiveParallelismStrategy::default(),
149 Some(&database_info),
150 new_barrier,
151 );
152
153 assert!(matches!(result, Ok(None)));
154 let started = started_rx.await.expect("started notifier dropped");
155 assert!(started.is_err());
156 }
157}
158
159impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
160 pub(super) async fn new_inner(
161 env: MetaSrvEnv,
162 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
163 context: Arc<C>,
164 ) -> Self {
165 let enable_recovery = env.opts.enable_recovery;
166
167 let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized();
168
169 let partial_graph_manager = PartialGraphManager::uninitialized(env.clone());
170
171 let reader = env.system_params_reader().await;
172 let system_enable_per_database_isolation = reader.per_database_isolation();
173 let periodic_barriers = PeriodicBarriers::default();
175 let adaptive_parallelism_strategy = reader.adaptive_parallelism_strategy();
176
177 let checkpoint_control = CheckpointControl::new(env.clone());
178 Self {
179 enable_recovery,
180 periodic_barriers,
181 system_enable_per_database_isolation,
182 adaptive_parallelism_strategy,
183 context,
184 env,
185 checkpoint_control,
186 completing_task: CompletingTask::None,
187 request_rx,
188 active_streaming_nodes,
189 partial_graph_manager,
190 }
191 }
192}
193
194fn resolve_reschedule_intent(
195 env: MetaSrvEnv,
196 worker_nodes: HashMap<WorkerId, WorkerNode>,
197 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
198 database_info: Option<&InflightDatabaseInfo>,
199 mut new_barrier: schedule::NewBarrier,
200) -> MetaResult<Option<schedule::NewBarrier>> {
201 let Some((command, notifiers)) = new_barrier.command.take() else {
202 return Ok(Some(new_barrier));
203 };
204
205 match command {
206 Command::RescheduleIntent {
207 context,
208 reschedule_plan,
209 } => {
210 if let Some(reschedule_plan) = reschedule_plan {
211 new_barrier.command = Some((
212 Command::RescheduleIntent {
213 context,
214 reschedule_plan: Some(reschedule_plan),
215 },
216 notifiers,
217 ));
218 return Ok(Some(new_barrier));
219 }
220 let span = tracing::info_span!(
221 "resolve_reschedule_intent",
222 database_id = %new_barrier.database_id
223 );
224 let reschedule_plan = {
225 let _guard = span.enter();
226 build_reschedule_from_context(
227 &env,
228 worker_nodes,
229 adaptive_parallelism_strategy,
230 new_barrier.database_id,
231 context,
232 database_info.ok_or_else(|| {
233 anyhow!(
234 "database {} not found when resolving reschedule intent",
235 new_barrier.database_id
236 )
237 })?,
238 )
239 };
240 match reschedule_plan {
241 Ok(Some(reschedule_plan)) => {
242 new_barrier.command = Some((
243 Command::RescheduleIntent {
244 context: RescheduleContext::empty(),
245 reschedule_plan: Some(reschedule_plan),
246 },
247 notifiers,
248 ));
249 Ok(Some(new_barrier))
250 }
251 Ok(None) => {
252 for mut notifier in notifiers {
254 notifier.notify_started();
255 notifier.notify_collected();
256 }
257 Ok(None)
258 }
259 Err(err) => {
260 for notifier in notifiers {
261 notifier.notify_start_failed(err.clone());
262 }
263 Ok(None)
264 }
265 }
266 }
267 _ => {
268 new_barrier.command = Some((command, notifiers));
269 Ok(Some(new_barrier))
270 }
271 }
272}
273
274fn build_reschedule_from_context(
275 env: &MetaSrvEnv,
276 worker_nodes: HashMap<WorkerId, WorkerNode>,
277 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
278 database_id: DatabaseId,
279 context: RescheduleContext,
280 database_info: &InflightDatabaseInfo,
281) -> MetaResult<Option<crate::barrier::ReschedulePlan>> {
282 if worker_nodes.is_empty() {
283 return Err(anyhow!("no active streaming workers for reschedule").into());
284 }
285
286 if context.is_empty() {
287 return Ok(None);
288 }
289
290 let all_prev_fragments = database_info
293 .fragment_infos()
294 .map(|fragment| (fragment.fragment_id, fragment))
295 .collect();
296
297 let previewed = preview_actor_assignments(
298 &worker_nodes,
299 adaptive_parallelism_strategy,
300 &context.loaded,
301 )?;
302
303 if rendered_layout_matches_current(&previewed.fragments, &all_prev_fragments)? {
304 return Ok(None);
305 }
306
307 let actor_id_counter = env.actor_id_generator();
308 let rendered = materialize_actor_assignments(actor_id_counter, previewed);
311 let mut commands = build_reschedule_commands(rendered.fragments, context, all_prev_fragments)?;
312 Ok(commands.remove(&database_id))
313}
314
315impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
316 pub async fn new(
318 scheduled_barriers: schedule::ScheduledBarriers,
319 env: MetaSrvEnv,
320 metadata_manager: MetadataManager,
321 hummock_manager: HummockManagerRef,
322 source_manager: SourceManagerRef,
323 sink_manager: SinkCoordinatorManager,
324 scale_controller: ScaleControllerRef,
325 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
326 barrier_scheduler: schedule::BarrierScheduler,
327 refresh_manager: GlobalRefreshManagerRef,
328 ) -> Self {
329 let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
330
331 let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
332 scheduled_barriers,
333 status,
334 metadata_manager,
335 hummock_manager,
336 source_manager,
337 scale_controller,
338 env.clone(),
339 barrier_scheduler,
340 refresh_manager,
341 sink_manager,
342 ));
343
344 Self::new_inner(env, request_rx, context).await
345 }
346
347 pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
348 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
349 let fut = (self.env.await_tree_reg())
350 .register_derived_root("Global Barrier Worker")
351 .instrument(self.run(shutdown_rx));
352 let join_handle = tokio::spawn(fut);
353
354 (join_handle, shutdown_tx)
355 }
356
357 async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
359 let paused = self
360 .env
361 .system_params_reader()
362 .await
363 .pause_on_next_bootstrap()
364 || self.env.opts.pause_on_next_bootstrap_offline;
365
366 if paused {
367 warn!(
368 "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
369 It will now be reset to `false`. \
370 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
371 PAUSE_ON_NEXT_BOOTSTRAP_KEY
372 );
373 self.env
374 .system_params_manager_impl_ref()
375 .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
376 .await?;
377 }
378 Ok(paused)
379 }
380
381 async fn run(mut self, shutdown_rx: Receiver<()>) {
383 tracing::info!(
384 "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
385 self.enable_recovery,
386 self.checkpoint_control.in_flight_barrier_nums,
387 );
388
389 if !self.enable_recovery {
390 let job_exist = self
391 .context
392 .metadata_manager
393 .catalog_controller
394 .has_any_streaming_jobs()
395 .await
396 .unwrap();
397 if job_exist {
398 panic!(
399 "Some streaming jobs already exist in meta, please start with recovery enabled \
400 or clean up the metadata using `./risedev clean-data`"
401 );
402 }
403 }
404
405 {
406 let span = tracing::info_span!("bootstrap_recovery");
411 crate::telemetry::report_event(
412 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
413 "normal_recovery",
414 0,
415 None,
416 None,
417 None,
418 );
419
420 let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
421
422 self.recovery(paused, RecoveryReason::Bootstrap)
423 .instrument(span)
424 .await;
425 }
426
427 Box::pin(self.run_inner(shutdown_rx)).await
428 }
429}
430
431impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
432 fn enable_per_database_isolation(&self) -> bool {
433 self.system_enable_per_database_isolation && {
434 if let Err(e) =
435 risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()
436 {
437 warn!(error = %e.as_report(), "DatabaseFailureIsolation disabled by license");
438 false
439 } else {
440 true
441 }
442 }
443 }
444
445 pub(super) async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
446 let (local_notification_tx, mut local_notification_rx) =
447 tokio::sync::mpsc::unbounded_channel();
448 self.env
449 .notification_manager()
450 .insert_local_sender(local_notification_tx);
451
452 loop {
454 tokio::select! {
455 biased;
456
457 _ = &mut shutdown_rx => {
459 tracing::info!("Barrier manager is stopped");
460 break;
461 }
462
463 request = self.request_rx.recv() => {
464 if let Some(request) = request {
465 match request {
466 BarrierManagerRequest::GetBackfillProgress(result_tx) => {
467 let progress = self.checkpoint_control.gen_backfill_progress();
468 if result_tx.send(Ok(progress)).is_err() {
469 error!("failed to send get ddl progress");
470 }
471 }
472 BarrierManagerRequest::GetFragmentBackfillProgress(result_tx) => {
473 let progress =
474 self.checkpoint_control.gen_fragment_backfill_progress();
475 if result_tx.send(Ok(progress)).is_err() {
476 error!("failed to send get fragment backfill progress");
477 }
478 }
479 BarrierManagerRequest::GetCdcProgress(result_tx) => {
480 let progress = self.checkpoint_control.gen_cdc_progress();
481 if result_tx.send(Ok(progress)).is_err() {
482 error!("failed to send get ddl progress");
483 }
484 }
485 BarrierManagerRequest::AdhocRecovery(sender) => {
487 self.adhoc_recovery().await;
488 if sender.send(()).is_err() {
489 warn!("failed to notify finish of adhoc recovery");
490 }
491 }
492 BarrierManagerRequest::UpdateDatabaseBarrier( UpdateDatabaseBarrierRequest {
493 database_id,
494 barrier_interval_ms,
495 checkpoint_frequency,
496 sender,
497 }) => {
498 self.periodic_barriers
499 .update_database_barrier(
500 database_id,
501 barrier_interval_ms,
502 checkpoint_frequency,
503 );
504 if sender.send(()).is_err() {
505 warn!("failed to notify finish of update database barrier");
506 }
507 }
508 BarrierManagerRequest::MayHaveSnapshotBackfillingJob(tx) => {
509 if tx.send(self.checkpoint_control.may_have_snapshot_backfilling_jobs()).is_err() {
510 warn!("failed to may have snapshot backfill job");
511 }
512 }
513 }
514 } else {
515 tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
516 return;
517 }
518 }
519
520 changed_worker = self.active_streaming_nodes.changed() => {
521 #[cfg(debug_assertions)]
522 {
523 self.active_streaming_nodes.validate_change().await;
524 }
525
526 info!(?changed_worker, "worker changed");
527
528 match changed_worker {
529 ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) => {
530 self.partial_graph_manager.add_worker(node, &*self.context).await;
531 }
532 ActiveStreamingWorkerChange::Remove(node) => {
533 self.partial_graph_manager.remove_worker(node);
534 }
535 }
536 }
537
538 notification = local_notification_rx.recv() => {
539 let notification = notification.unwrap();
540 if let LocalNotification::SystemParamsChange(p) = notification {
541 {
542 self.periodic_barriers.set_sys_barrier_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
543 self.periodic_barriers
544 .set_sys_checkpoint_frequency(p.checkpoint_frequency());
545 self.system_enable_per_database_isolation = p.per_database_isolation();
546 self.adaptive_parallelism_strategy = p.adaptive_parallelism_strategy();
547 }
548 }
549 }
550 complete_result = self
551 .completing_task
552 .next_completed_barrier(
553 &mut self.periodic_barriers,
554 &mut self.checkpoint_control,
555 &mut self.partial_graph_manager,
556 &self.context,
557 &self.env,
558 ) => {
559 match complete_result {
560 Ok(output) => {
561 self.checkpoint_control.ack_completed(&mut self.partial_graph_manager, output);
562 }
563 Err(e) => {
564 self.failure_recovery(e).await;
565 }
566 }
567 },
568 event = self.checkpoint_control.next_event() => {
569 let result: MetaResult<()> = try {
570 match event {
571 CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
572 let database_id = entering_initializing.database_id();
573 let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
574 resp.root_err.as_ref().map(|root_err| {
575 (*worker_id, ScoredError {
576 error: Status::internal(&root_err.err_msg),
577 score: Score(root_err.score)
578 })
579 })
580 }));
581 Self::report_collect_failure(&self.env, &error);
582 self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
583 let result: MetaResult<_> = try {
584 let runtime_info = self.context.reload_database_runtime_info(database_id).await.inspect_err(|err| {
585 warn!(%database_id, err = %err.as_report(), "reload runtime info failed");
586 })?;
587 let rendered_info = render_runtime_info(
588 self.env.actor_id_generator(),
589 &self.active_streaming_nodes,
590 self.adaptive_parallelism_strategy,
591 &runtime_info.recovery_context,
592 database_id,
593 )
594 .inspect_err(|err: &MetaError| {
595 warn!(%database_id, err = %err.as_report(), "render runtime info failed");
596 })?;
597 if let Some(rendered_info) = rendered_info {
598 BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
599 database_id,
600 &rendered_info.job_infos,
601 &self.active_streaming_nodes,
602 &rendered_info.stream_actors,
603 &runtime_info.state_table_committed_epochs,
604 )
605 .inspect_err(|err| {
606 warn!(%database_id, err = ?err.as_report(), "database runtime info failed validation");
607 })?;
608 Some((runtime_info, rendered_info))
609 } else {
610 None
611 }
612 };
613 match result {
614 Ok(Some((runtime_info, rendered_info))) => {
615 entering_initializing.enter(
616 runtime_info,
617 rendered_info,
618 &mut self.partial_graph_manager,
619 );
620 }
621 Ok(None) => {
622 info!(%database_id, "database removed after reloading empty runtime info");
623 self.context.mark_ready(MarkReadyOptions::Database(database_id));
625 entering_initializing.remove();
626 }
627 Err(e) => {
628 entering_initializing.fail_reload_runtime_info(e);
629 }
630 }
631 }
632 CheckpointControlEvent::EnteringRunning(entering_running) => {
633 self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
634 entering_running.enter();
635 }
636 }
637 };
638 if let Err(e) = result {
639 self.failure_recovery(e).await;
640 }
641 }
642 event = self.partial_graph_manager.next_event(&self.context) => {
643 let result: MetaResult<()> = try {
644 match event {
645 PartialGraphManagerEvent::Worker(_worker_id, WorkerEvent::WorkerConnected) => {
646 }
648 PartialGraphManagerEvent::Worker(worker_id, WorkerEvent::WorkerError { err, affected_partial_graphs }) => {
649 let failed_databases = self
650 .checkpoint_control
651 .databases_failed_at_worker_err(worker_id)
652 .chain(
653 affected_partial_graphs
654 .into_iter()
655 .map(|partial_graph_id| {
656 let (database_id, _) = from_partial_graph_id(partial_graph_id);
657 database_id
658 })
659 )
660 .collect::<HashSet<_>>();
661 if !failed_databases.is_empty() {
662 if !self.enable_recovery {
663 panic!("control stream to worker {} failed but recovery not enabled: {}", worker_id, err.as_report());
664 }
665 if !self.enable_per_database_isolation() {
666 Err(err.clone())?;
667 }
668 Self::report_collect_failure(&self.env, &err);
669 for database_id in failed_databases {
670 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
671 warn!(%worker_id, %database_id, "database entering recovery on node failure");
672 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
673 self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
674 let output = self.completing_task.wait_completing_task().await?;
676 entering_recovery.enter(output, &mut self.partial_graph_manager);
677 }
678 }
679 } else {
680 warn!(%worker_id, "no barrier to collect from worker, ignore err");
681 }
682 continue;
683 }
684 PartialGraphManagerEvent::PartialGraph(partial_graph_id, event) => {
685 let (database_id, _creating_job_id) = from_partial_graph_id(partial_graph_id);
686 match event {
687 PartialGraphEvent::BarrierCollected(collected_barrier) => {
688 self.checkpoint_control.barrier_collected(partial_graph_id, collected_barrier, &mut self.periodic_barriers)?;
689 }
690 PartialGraphEvent::Error(worker_id) => {
691 if !self.enable_recovery {
692 panic!("database {database_id} failure reported from {worker_id} but recovery not enabled")
693 }
694 if !self.enable_per_database_isolation() {
695 Err(anyhow!("database {database_id} report failure from {worker_id}"))?;
696 }
697 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
698 warn!(%database_id, "database entering recovery");
699 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
700 let output = self.completing_task.wait_completing_task().await?;
702 entering_recovery.enter(output, &mut self.partial_graph_manager);
703 }
704 }
705 PartialGraphEvent::Reset(reset_resps) => {
706 self.checkpoint_control.on_partial_graph_reset(partial_graph_id, reset_resps);
707 }
708 PartialGraphEvent::Initialized => {
709 self.checkpoint_control.on_partial_graph_initialized(partial_graph_id);
710 }
711 }
712 }
713 };
714 };
715 if let Err(e) = result {
716 self.failure_recovery(e).await;
717 }
718 }
719 new_barrier = self.periodic_barriers.next_barrier(&*self.context) => {
720 let database_id = new_barrier.database_id;
721 let new_barrier = if matches!(
722 new_barrier.command,
723 Some((Command::RescheduleIntent { .. }, _))
724 ) {
725 let env = self.env.clone();
726 let worker_nodes = self
727 .active_streaming_nodes
728 .current()
729 .iter()
730 .map(|(worker_id, worker)| (*worker_id, worker.clone()))
731 .collect();
732 let adaptive_parallelism_strategy = self.adaptive_parallelism_strategy;
733 let database_info = self.checkpoint_control.database_info(database_id);
734 match resolve_reschedule_intent(
735 env,
736 worker_nodes,
737 adaptive_parallelism_strategy,
738 database_info,
739 new_barrier,
740 ) {
741 Ok(Some(new_barrier)) => new_barrier,
742 Ok(None) => continue,
743 Err(err) => {
744 self.failure_recovery(err).await;
745 continue;
746 }
747 }
748 } else {
749 new_barrier
750 };
751 if let Err(e) = self.checkpoint_control.handle_new_barrier(
752 new_barrier,
753 &mut self.partial_graph_manager,
754 self.adaptive_parallelism_strategy,
755 self.active_streaming_nodes.current()
756 ) {
757 if !self.enable_recovery {
758 panic!(
759 "failed to inject barrier to some databases but recovery not enabled: {:?}", (
760 database_id,
761 e.as_report()
762 )
763 );
764 }
765 let result: MetaResult<_> = try {
766 if !self.enable_per_database_isolation() {
767 let err = anyhow!("failed to inject barrier to databases: {:?}", (database_id, e.as_report()));
768 Err(err)?;
769 } else if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
770 warn!(%database_id, e = %e.as_report(),"database entering recovery on inject failure");
771 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(e).context("inject barrier failure").into()));
772 let output = self.completing_task.wait_completing_task().await?;
774 entering_recovery.enter(output, &mut self.partial_graph_manager);
775 }
776 };
777 if let Err(e) = result {
778 self.failure_recovery(e).await;
779 }
780 }
781 }
782 }
783 }
784 }
785}
786
787impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
788 pub async fn clear_on_err(&mut self, err: &MetaError) {
790 match replace(&mut self.completing_task, CompletingTask::None) {
792 CompletingTask::None | CompletingTask::Err(_) => {}
793 CompletingTask::Completing {
794 epochs_to_ack,
795 join_handle,
796 ..
797 } => {
798 info!("waiting for completing command to finish in recovery");
799 match join_handle.await {
800 Err(e) => {
801 warn!(err = %e.as_report(), "failed to join completing task");
802 }
803 Ok(Err(e)) => {
804 warn!(
805 err = %e.as_report(),
806 "failed to complete barrier during clear"
807 );
808 }
809 Ok(Ok(hummock_version_stats)) => {
810 self.checkpoint_control.ack_completed(
811 &mut self.partial_graph_manager,
812 BarrierCompleteOutput {
813 epochs_to_ack,
814 hummock_version_stats,
815 },
816 );
817 }
818 }
819 }
820 };
821 self.partial_graph_manager.notify_all_err(err);
822 }
823}
824
825impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
826 async fn failure_recovery(&mut self, err: MetaError) {
828 self.clear_on_err(&err).await;
829
830 if self.enable_recovery {
831 let span = tracing::info_span!(
832 "failure_recovery",
833 error = %err.as_report(),
834 );
835
836 crate::telemetry::report_event(
837 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
838 "failure_recovery",
839 0,
840 None,
841 None,
842 None,
843 );
844
845 let reason = RecoveryReason::Failover(err);
846
847 self.recovery(false, reason).instrument(span).await;
850 } else {
851 panic!(
852 "a streaming error occurred while recovery is disabled, aborting: {:?}",
853 err.as_report()
854 );
855 }
856 }
857
858 async fn adhoc_recovery(&mut self) {
859 let err = MetaErrorInner::AdhocRecovery.into();
860 self.clear_on_err(&err).await;
861
862 let span = tracing::info_span!(
863 "adhoc_recovery",
864 error = %err.as_report(),
865 );
866
867 crate::telemetry::report_event(
868 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
869 "adhoc_recovery",
870 0,
871 None,
872 None,
873 None,
874 );
875
876 self.recovery(false, RecoveryReason::Adhoc)
879 .instrument(span)
880 .await;
881 }
882}
883
884impl<C> GlobalBarrierWorker<C> {
885 pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
887 use risingwave_pb::meta::event_log;
889 let event = event_log::EventCollectBarrierFail {
890 error: error.to_report_string(),
891 };
892 env.event_log_manager_ref()
893 .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
894 }
895}
896
897mod retry_strategy {
898 use std::time::Duration;
899
900 use tokio_retry::strategy::{ExponentialBackoff, jitter};
901
902 const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
904 const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
906
907 pub(crate) type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
926
927 pub(crate) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
928 Box::pin(tokio::time::sleep(duration))
929 }
930
931 pub(crate) type RetryBackoffStrategy =
932 impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
933
934 #[inline(always)]
936 pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> + Send + 'static {
937 ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
938 .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
939 .map(jitter)
940 }
941
942 #[define_opaque(RetryBackoffStrategy)]
943 pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
944 get_retry_strategy().map(get_retry_backoff_future)
945 }
946}
947
948pub(crate) use retry_strategy::*;
949use risingwave_common::error::tonic::extra::{Score, ScoredError};
950use risingwave_pb::meta::event_log::{Event, EventRecovery};
951
952use crate::barrier::partial_graph::{
953 PartialGraphEvent, PartialGraphManager, PartialGraphManagerEvent, WorkerEvent,
954};
955
956impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
957 pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
965 self.partial_graph_manager.clear_worker();
967
968 let reason_str = match &recovery_reason {
969 RecoveryReason::Bootstrap => "bootstrap".to_owned(),
970 RecoveryReason::Failover(err) => {
971 format!("failed over: {}", err.as_report())
972 }
973 RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
974 };
975 self.context.abort_and_mark_blocked(None, recovery_reason);
976
977 self.recovery_inner(is_paused, reason_str).await;
978 self.context.mark_ready(MarkReadyOptions::Global {
979 blocked_databases: self.checkpoint_control.recovering_databases().collect(),
980 });
981 }
982
983 #[await_tree::instrument("recovery({recovery_reason})")]
984 async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
985 let event_log_manager_ref = self.env.event_log_manager_ref();
986
987 tracing::info!("recovery start!");
988 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
989 EventRecovery::global_recovery_start(recovery_reason.clone()),
990 )]);
991
992 let retry_strategy = get_retry_strategy();
993
994 let recovery_timer = GLOBAL_META_METRICS
997 .recovery_latency
998 .with_label_values(&["global"])
999 .start_timer();
1000
1001 let enable_per_database_isolation = self.enable_per_database_isolation();
1002
1003 let recovery_future = tokio_retry::Retry::spawn(retry_strategy, || async {
1004 self.env.stream_client_pool().invalidate_all();
1005 self.context
1010 .notify_creating_job_failed(None, recovery_reason.clone())
1011 .await;
1012
1013 let runtime_info_snapshot = self
1014 .context
1015 .reload_runtime_info()
1016 .await?;
1017 let BarrierWorkerRuntimeInfoSnapshot {
1018 active_streaming_nodes,
1019 recovery_context,
1020 mut state_table_committed_epochs,
1021 mut state_table_log_epochs,
1022 mut mv_depended_subscriptions,
1023 mut background_jobs,
1024 hummock_version_stats,
1025 database_infos,
1026 mut cdc_table_snapshot_splits,
1027 } = runtime_info_snapshot;
1028
1029 let mut partial_graph_manager = PartialGraphManager::recover(
1030 self.env.clone(),
1031 active_streaming_nodes.current(),
1032 self.context.clone(),
1033 )
1034 .await;
1035 {
1036 let mut empty_databases = HashSet::new();
1037 let mut collected_databases = HashMap::new();
1038 let mut collecting_databases = HashMap::new();
1039 let mut failed_databases = HashMap::new();
1040 for &database_id in recovery_context.fragment_context.database_map.keys() {
1041 let mut recoverer = partial_graph_manager.start_recover();
1042 let result: MetaResult<_> = try {
1043 let Some(rendered_info) = render_runtime_info(
1044 self.env.actor_id_generator(),
1045 &active_streaming_nodes,
1046 self.adaptive_parallelism_strategy,
1047 &recovery_context,
1048 database_id,
1049 )
1050 .inspect_err(|err: &MetaError| {
1051 warn!(%database_id, err = %err.as_report(), "render runtime info failed");
1052 })? else {
1053 empty_databases.insert(database_id);
1054 continue;
1055 };
1056 BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
1057 database_id,
1058 &rendered_info.job_infos,
1059 &active_streaming_nodes,
1060 &rendered_info.stream_actors,
1061 &state_table_committed_epochs,
1062 )
1063 .inspect_err(|err| {
1064 warn!(%database_id, err = %err.as_report(), "rendered runtime info failed validation");
1065 })?;
1066 let RenderedDatabaseRuntimeInfo {
1067 job_infos,
1068 stream_actors,
1069 mut source_splits,
1070 } = rendered_info;
1071 recoverer.inject_database_initial_barrier(
1072 database_id,
1073 job_infos,
1074 &recovery_context.job_extra_info,
1075 &mut state_table_committed_epochs,
1076 &mut state_table_log_epochs,
1077 &recovery_context.fragment_relations,
1078 &stream_actors,
1079 &mut source_splits,
1080 &mut background_jobs,
1081 &mut mv_depended_subscriptions,
1082 is_paused,
1083 &hummock_version_stats,
1084 &mut cdc_table_snapshot_splits,
1085 )?
1086 };
1087 let collector = match result {
1088 Ok(database) => {
1089 DatabaseInitialBarrierCollector {
1090 database_id,
1091 initializing_partial_graphs: recoverer.all_initializing(),
1092 database,
1093 }
1094 }
1095 Err(e) => {
1096 warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
1097 assert!(failed_databases.insert(database_id, recoverer.failed()).is_none(), "non-duplicate");
1098 continue;
1099 }
1100 };
1101 if !collector.is_collected() {
1102 assert!(collecting_databases.insert(database_id, collector).is_none());
1103 } else {
1104 warn!(%database_id, "database has no node to inject initial barrier");
1105 assert!(collected_databases.insert(database_id, collector.finish()).is_none());
1106 }
1107 }
1108 if !empty_databases.is_empty() {
1109 info!(?empty_databases, "empty database in global recovery");
1110 }
1111 while !collecting_databases.is_empty() {
1112 match partial_graph_manager.next_event(&self.context).await {
1113 PartialGraphManagerEvent::Worker(_, WorkerEvent::WorkerConnected) => {
1114 }
1116 PartialGraphManagerEvent::Worker(worker_id, WorkerEvent::WorkerError { err, affected_partial_graphs }) => {
1117 let affected_databases: HashSet<_> = affected_partial_graphs.into_iter().map(|partial_graph_id| {
1118 let (database_id, _) = from_partial_graph_id(partial_graph_id);
1119 database_id
1120 }).collect();
1121 warn!(%worker_id, err = %err.as_report(), "worker node failure during recovery");
1122 for (failed_database_id, collector) in collecting_databases.extract_if(|database_id, collector| {
1123 !collector.is_valid_after_worker_err(worker_id) || affected_databases.contains(database_id)
1124 }) {
1125 warn!(%failed_database_id, %worker_id, "database failed to recovery in global recovery due to worker node err");
1126 let resetting_partial_graphs: HashSet<_> = collector.all_partial_graphs().collect();
1127 partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1128 assert!(failed_databases.insert(failed_database_id, resetting_partial_graphs).is_none());
1129 }
1130 }
1131 PartialGraphManagerEvent::PartialGraph(partial_graph_id, event) => {
1132 match event {
1133 PartialGraphEvent::BarrierCollected(_) => {
1134 unreachable!("no barrier collected event on initializing")
1135 }
1136 PartialGraphEvent::Reset(_) => {
1137 unreachable!("no partial graph reset on initializing")
1138 }
1139 PartialGraphEvent::Error(worker_id) => {
1140 let (database_id, _) = from_partial_graph_id(partial_graph_id);
1141 if let Some(collector) = collecting_databases.remove(&database_id) {
1142 warn!(%database_id, %worker_id, "database reset during global recovery");
1143 let resetting_partial_graphs: HashSet<_> = collector.all_partial_graphs().collect();
1144 partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1145 assert!(failed_databases.insert(database_id, resetting_partial_graphs).is_none());
1146 } else if let Some(database) = collected_databases.remove(&database_id) {
1147 warn!(%database_id, %worker_id, "database initialized but later reset during global recovery");
1148 let resetting_partial_graphs: HashSet<_> = database_partial_graphs(database_id, database.independent_checkpoint_job_controls.keys().copied()).collect();
1149 partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1150 assert!(failed_databases.insert(database_id, resetting_partial_graphs).is_none());
1151 } else {
1152 assert!(failed_databases.contains_key(&database_id));
1153 }
1154 }
1155 PartialGraphEvent::Initialized => {
1156 let (database_id, _) = from_partial_graph_id(partial_graph_id);
1157 if failed_databases.contains_key(&database_id) {
1158 assert!(!collecting_databases.contains_key(&database_id));
1159 continue;
1161 }
1162 let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
1163 unreachable!("should exist")
1164 };
1165 let collector = entry.get_mut();
1166 collector.partial_graph_initialized(partial_graph_id);
1167 if collector.is_collected() {
1168 let collector = entry.remove();
1169 assert!(collected_databases.insert(database_id, collector.finish()).is_none());
1170 }
1171 }
1172 }
1173 }
1174 }
1175 }
1176 debug!("collected initial barrier");
1177 if !background_jobs.is_empty() {
1178 warn!(job_ids = ?background_jobs.iter().collect_vec(), "unused recovered background mview in recovery");
1179 }
1180 if !mv_depended_subscriptions.is_empty() {
1181 warn!(?mv_depended_subscriptions, "unused subscription infos in recovery");
1182 }
1183 if !state_table_committed_epochs.is_empty() {
1184 warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
1185 }
1186 if !enable_per_database_isolation && !failed_databases.is_empty() {
1187 return Err(anyhow!(
1188 "global recovery failed due to failure of databases {:?}",
1189 failed_databases.keys().collect_vec()).into()
1190 );
1191 }
1192 let checkpoint_control = CheckpointControl::recover(
1193 collected_databases,
1194 failed_databases,
1195 hummock_version_stats,
1196 self.env.clone(),
1197 );
1198
1199 let reader = self.env.system_params_reader().await;
1200 let checkpoint_frequency = reader.checkpoint_frequency();
1201 let barrier_interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
1202 let periodic_barriers = PeriodicBarriers::new(
1203 barrier_interval,
1204 checkpoint_frequency,
1205 database_infos,
1206 );
1207
1208 Ok((
1209 active_streaming_nodes,
1210 partial_graph_manager,
1211 checkpoint_control,
1212 periodic_barriers,
1213 ))
1214 }
1215 }.inspect_err(|err: &MetaError| {
1216 tracing::error!(error = %err.as_report(), "recovery failed");
1217 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
1218 EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
1219 )]);
1220 GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
1221 }))
1222 .instrument(tracing::info_span!("recovery_attempt"));
1223
1224 let mut recover_txs = vec![];
1225 let mut update_barrier_requests = vec![];
1226 pin_mut!(recovery_future);
1227 let mut request_rx_closed = false;
1228 let new_state = loop {
1229 select! {
1230 biased;
1231 new_state = &mut recovery_future => {
1232 break new_state.expect("Retry until recovery success.");
1233 }
1234 request = pin!(self.request_rx.recv()), if !request_rx_closed => {
1235 let Some(request) = request else {
1236 warn!("request rx channel closed during recovery");
1237 request_rx_closed = true;
1238 continue;
1239 };
1240 match request {
1241 BarrierManagerRequest::GetBackfillProgress(tx) => {
1242 let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1243 }
1244 BarrierManagerRequest::GetFragmentBackfillProgress(tx) => {
1245 let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1246 }
1247 BarrierManagerRequest::GetCdcProgress(tx) => {
1248 let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1249 }
1250 BarrierManagerRequest::AdhocRecovery(tx) => {
1251 recover_txs.push(tx);
1252 }
1253 BarrierManagerRequest::UpdateDatabaseBarrier(request) => {
1254 update_barrier_requests.push(request);
1255 }
1256 BarrierManagerRequest::MayHaveSnapshotBackfillingJob(tx) => {
1257 let _ = tx.send(true);
1259 }
1260 }
1261 }
1262 }
1263 };
1264
1265 let duration = recovery_timer.stop_and_record();
1266
1267 (
1268 self.active_streaming_nodes,
1269 self.partial_graph_manager,
1270 self.checkpoint_control,
1271 self.periodic_barriers,
1272 ) = new_state;
1273
1274 tracing::info!("recovery success");
1275
1276 for UpdateDatabaseBarrierRequest {
1277 database_id,
1278 barrier_interval_ms,
1279 checkpoint_frequency,
1280 sender,
1281 } in update_barrier_requests
1282 {
1283 self.periodic_barriers.update_database_barrier(
1284 database_id,
1285 barrier_interval_ms,
1286 checkpoint_frequency,
1287 );
1288 let _ = sender.send(());
1289 }
1290
1291 for tx in recover_txs {
1292 let _ = tx.send(());
1293 }
1294
1295 let recovering_databases = self
1296 .checkpoint_control
1297 .recovering_databases()
1298 .map(|database| database.as_raw_id())
1299 .collect_vec();
1300 let running_databases = self
1301 .checkpoint_control
1302 .running_databases()
1303 .map(|database| database.as_raw_id())
1304 .collect_vec();
1305
1306 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
1307 EventRecovery::global_recovery_success(
1308 recovery_reason.clone(),
1309 duration as f32,
1310 running_databases,
1311 recovering_databases,
1312 ),
1313 )]);
1314
1315 self.env
1316 .notification_manager()
1317 .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
1318 self.env
1319 .notification_manager()
1320 .notify_compute_without_version(Operation::Update, Info::Recovery(Recovery {}));
1321 }
1322}