1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::pin::pin;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use arc_swap::ArcSwap;
24use futures::{TryFutureExt, pin_mut};
25use itertools::Itertools;
26use risingwave_common::catalog::DatabaseId;
27use risingwave_common::system_param::reader::SystemParamsRead;
28use risingwave_common::system_param::{AdaptiveParallelismStrategy, PAUSE_ON_NEXT_BOOTSTRAP_KEY};
29use risingwave_meta_model::WorkerId;
30use risingwave_pb::common::WorkerNode;
31use risingwave_pb::meta::Recovery;
32use risingwave_pb::meta::subscribe_response::{Info, Operation};
33use thiserror_ext::AsReport;
34use tokio::select;
35use tokio::sync::mpsc;
36use tokio::sync::oneshot::{Receiver, Sender};
37use tokio::task::JoinHandle;
38use tonic::Status;
39use tracing::{Instrument, debug, error, info, warn};
40
41use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
42use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
43use crate::barrier::context::recovery::{RenderedDatabaseRuntimeInfo, render_runtime_info};
44use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
45use crate::barrier::info::InflightDatabaseInfo;
46use crate::barrier::rpc::{
47 DatabaseInitialBarrierCollector, database_partial_graphs, from_partial_graph_id,
48 merge_node_rpc_errors,
49};
50use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
51use crate::barrier::{
52 BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command,
53 RecoveryReason, RescheduleContext, UpdateDatabaseBarrierRequest, schedule,
54};
55use crate::controller::scale::{materialize_actor_assignments, preview_actor_assignments};
56use crate::error::MetaErrorInner;
57use crate::hummock::HummockManagerRef;
58use crate::manager::sink_coordination::SinkCoordinatorManager;
59use crate::manager::{
60 ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
61 MetadataManager,
62};
63use crate::rpc::metrics::GLOBAL_META_METRICS;
64use crate::stream::{
65 GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef, build_reschedule_commands,
66 rendered_layout_matches_current,
67};
68use crate::{MetaError, MetaResult};
69
70pub(super) struct GlobalBarrierWorker<C> {
80 enable_recovery: bool,
82
83 periodic_barriers: PeriodicBarriers,
85
86 system_enable_per_database_isolation: bool,
88
89 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
90
91 pub(super) context: Arc<C>,
92
93 env: MetaSrvEnv,
94
95 checkpoint_control: CheckpointControl,
96
97 completing_task: CompletingTask,
100
101 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
102
103 active_streaming_nodes: ActiveStreamingWorkerNodes,
104
105 partial_graph_manager: PartialGraphManager,
106}
107
108#[cfg(test)]
109mod tests {
110 use std::collections::HashMap;
111
112 use tokio::sync::oneshot;
113
114 use super::*;
115 use crate::barrier::RescheduleContext;
116 use crate::barrier::notifier::Notifier;
117
118 #[tokio::test]
119 async fn test_reschedule_intent_without_workers_notifies_start_failed() {
120 let env = MetaSrvEnv::for_test().await;
121 let database_id = DatabaseId::new(1);
122 let database_info =
123 InflightDatabaseInfo::empty(database_id, env.shared_actor_infos().clone());
124 let (started_tx, started_rx) = oneshot::channel();
125 let (_collected_tx, _collected_rx) = oneshot::channel();
126
127 let notifier = Notifier {
128 started: Some(started_tx),
129 collected: Some(_collected_tx),
130 };
131
132 let new_barrier = schedule::NewBarrier {
133 database_id,
134 command: Some((
135 Command::RescheduleIntent {
136 context: RescheduleContext::empty(),
137 reschedule_plan: None,
138 },
139 vec![notifier],
140 )),
141 span: tracing::Span::none(),
142 checkpoint: false,
143 };
144
145 let result = resolve_reschedule_intent(
146 env,
147 HashMap::new(),
148 AdaptiveParallelismStrategy::default(),
149 Some(&database_info),
150 new_barrier,
151 );
152
153 assert!(matches!(result, Ok(None)));
154 let started = started_rx.await.expect("started notifier dropped");
155 assert!(started.is_err());
156 }
157}
158
159impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
160 pub(super) async fn new_inner(
161 env: MetaSrvEnv,
162 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
163 context: Arc<C>,
164 ) -> Self {
165 let enable_recovery = env.opts.enable_recovery;
166
167 let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized();
168
169 let partial_graph_manager = PartialGraphManager::uninitialized(env.clone());
170
171 let reader = env.system_params_reader().await;
172 let system_enable_per_database_isolation = reader.per_database_isolation();
173 let periodic_barriers = PeriodicBarriers::default();
175 let adaptive_parallelism_strategy = reader.adaptive_parallelism_strategy();
176
177 let checkpoint_control = CheckpointControl::new(env.clone());
178 Self {
179 enable_recovery,
180 periodic_barriers,
181 system_enable_per_database_isolation,
182 adaptive_parallelism_strategy,
183 context,
184 env,
185 checkpoint_control,
186 completing_task: CompletingTask::None,
187 request_rx,
188 active_streaming_nodes,
189 partial_graph_manager,
190 }
191 }
192}
193
194fn resolve_reschedule_intent(
195 env: MetaSrvEnv,
196 worker_nodes: HashMap<WorkerId, WorkerNode>,
197 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
198 database_info: Option<&InflightDatabaseInfo>,
199 mut new_barrier: schedule::NewBarrier,
200) -> MetaResult<Option<schedule::NewBarrier>> {
201 let Some((command, notifiers)) = new_barrier.command.take() else {
202 return Ok(Some(new_barrier));
203 };
204
205 match command {
206 Command::RescheduleIntent {
207 context,
208 reschedule_plan,
209 } => {
210 if let Some(reschedule_plan) = reschedule_plan {
211 new_barrier.command = Some((
212 Command::RescheduleIntent {
213 context,
214 reschedule_plan: Some(reschedule_plan),
215 },
216 notifiers,
217 ));
218 return Ok(Some(new_barrier));
219 }
220 let span = tracing::info_span!(
221 "resolve_reschedule_intent",
222 database_id = %new_barrier.database_id
223 );
224 let reschedule_plan = {
225 let _guard = span.enter();
226 build_reschedule_from_context(
227 &env,
228 worker_nodes,
229 adaptive_parallelism_strategy,
230 new_barrier.database_id,
231 context,
232 database_info.ok_or_else(|| {
233 anyhow!(
234 "database {} not found when resolving reschedule intent",
235 new_barrier.database_id
236 )
237 })?,
238 )
239 };
240 match reschedule_plan {
241 Ok(Some(reschedule_plan)) => {
242 new_barrier.command = Some((
243 Command::RescheduleIntent {
244 context: RescheduleContext::empty(),
245 reschedule_plan: Some(reschedule_plan),
246 },
247 notifiers,
248 ));
249 Ok(Some(new_barrier))
250 }
251 Ok(None) => {
252 for mut notifier in notifiers {
254 notifier.notify_started();
255 notifier.notify_collected();
256 }
257 Ok(None)
258 }
259 Err(err) => {
260 for notifier in notifiers {
261 notifier.notify_start_failed(err.clone());
262 }
263 Ok(None)
264 }
265 }
266 }
267 _ => {
268 new_barrier.command = Some((command, notifiers));
269 Ok(Some(new_barrier))
270 }
271 }
272}
273
274fn build_reschedule_from_context(
275 env: &MetaSrvEnv,
276 worker_nodes: HashMap<WorkerId, WorkerNode>,
277 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
278 database_id: DatabaseId,
279 context: RescheduleContext,
280 database_info: &InflightDatabaseInfo,
281) -> MetaResult<Option<crate::barrier::ReschedulePlan>> {
282 if worker_nodes.is_empty() {
283 return Err(anyhow!("no active streaming workers for reschedule").into());
284 }
285
286 if context.is_empty() {
287 return Ok(None);
288 }
289
290 let all_prev_fragments = database_info
293 .fragment_infos()
294 .map(|fragment| (fragment.fragment_id, fragment))
295 .collect();
296
297 let previewed = preview_actor_assignments(
298 &worker_nodes,
299 adaptive_parallelism_strategy,
300 &context.loaded,
301 )?;
302
303 if rendered_layout_matches_current(&previewed.fragments, &all_prev_fragments)? {
304 return Ok(None);
305 }
306
307 let actor_id_counter = env.actor_id_generator();
308 let rendered = materialize_actor_assignments(actor_id_counter, previewed);
311 let mut commands = build_reschedule_commands(rendered.fragments, context, all_prev_fragments)?;
312 Ok(commands.remove(&database_id))
313}
314
315impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
316 pub async fn new(
318 scheduled_barriers: schedule::ScheduledBarriers,
319 env: MetaSrvEnv,
320 metadata_manager: MetadataManager,
321 hummock_manager: HummockManagerRef,
322 source_manager: SourceManagerRef,
323 sink_manager: SinkCoordinatorManager,
324 scale_controller: ScaleControllerRef,
325 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
326 barrier_scheduler: schedule::BarrierScheduler,
327 refresh_manager: GlobalRefreshManagerRef,
328 ) -> Self {
329 let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
330
331 let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
332 scheduled_barriers,
333 status,
334 metadata_manager,
335 hummock_manager,
336 source_manager,
337 scale_controller,
338 env.clone(),
339 barrier_scheduler,
340 refresh_manager,
341 sink_manager,
342 ));
343
344 Self::new_inner(env, request_rx, context).await
345 }
346
347 pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
348 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
349 let fut = (self.env.await_tree_reg())
350 .register_derived_root("Global Barrier Worker")
351 .instrument(self.run(shutdown_rx));
352 let join_handle = tokio::spawn(fut);
353
354 (join_handle, shutdown_tx)
355 }
356
357 async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
359 let paused = self
360 .env
361 .system_params_reader()
362 .await
363 .pause_on_next_bootstrap()
364 || self.env.opts.pause_on_next_bootstrap_offline;
365
366 if paused {
367 warn!(
368 "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
369 It will now be reset to `false`. \
370 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
371 PAUSE_ON_NEXT_BOOTSTRAP_KEY
372 );
373 self.env
374 .system_params_manager_impl_ref()
375 .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
376 .await?;
377 }
378 Ok(paused)
379 }
380
381 async fn run(mut self, shutdown_rx: Receiver<()>) {
383 tracing::info!(
384 "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
385 self.enable_recovery,
386 self.checkpoint_control.in_flight_barrier_nums,
387 );
388
389 if !self.enable_recovery {
390 let job_exist = self
391 .context
392 .metadata_manager
393 .catalog_controller
394 .has_any_streaming_jobs()
395 .await
396 .unwrap();
397 if job_exist {
398 panic!(
399 "Some streaming jobs already exist in meta, please start with recovery enabled \
400 or clean up the metadata using `./risedev clean-data`"
401 );
402 }
403 }
404
405 {
406 let span = tracing::info_span!("bootstrap_recovery");
411 crate::telemetry::report_event(
412 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
413 "normal_recovery",
414 0,
415 None,
416 None,
417 None,
418 );
419
420 let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
421
422 self.recovery(paused, RecoveryReason::Bootstrap)
423 .instrument(span)
424 .await;
425 }
426
427 Box::pin(self.run_inner(shutdown_rx)).await
428 }
429}
430
431impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
432 fn enable_per_database_isolation(&self) -> bool {
433 self.system_enable_per_database_isolation && {
434 if let Err(e) =
435 risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()
436 {
437 warn!(error = %e.as_report(), "DatabaseFailureIsolation disabled by license");
438 false
439 } else {
440 true
441 }
442 }
443 }
444
445 pub(super) async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
446 let (local_notification_tx, mut local_notification_rx) =
447 tokio::sync::mpsc::unbounded_channel();
448 self.env
449 .notification_manager()
450 .insert_local_sender(local_notification_tx);
451
452 loop {
454 tokio::select! {
455 biased;
456
457 _ = &mut shutdown_rx => {
459 tracing::info!("Barrier manager is stopped");
460 break;
461 }
462
463 request = self.request_rx.recv() => {
464 if let Some(request) = request {
465 match request {
466 BarrierManagerRequest::GetBackfillProgress(result_tx) => {
467 let progress = self.checkpoint_control.gen_backfill_progress();
468 if result_tx.send(Ok(progress)).is_err() {
469 error!("failed to send get ddl progress");
470 }
471 }
472 BarrierManagerRequest::GetFragmentBackfillProgress(result_tx) => {
473 let progress =
474 self.checkpoint_control.gen_fragment_backfill_progress();
475 if result_tx.send(Ok(progress)).is_err() {
476 error!("failed to send get fragment backfill progress");
477 }
478 }
479 BarrierManagerRequest::GetCdcProgress(result_tx) => {
480 let progress = self.checkpoint_control.gen_cdc_progress();
481 if result_tx.send(Ok(progress)).is_err() {
482 error!("failed to send get ddl progress");
483 }
484 }
485 BarrierManagerRequest::AdhocRecovery(sender) => {
487 self.adhoc_recovery().await;
488 if sender.send(()).is_err() {
489 warn!("failed to notify finish of adhoc recovery");
490 }
491 }
492 BarrierManagerRequest::UpdateDatabaseBarrier( UpdateDatabaseBarrierRequest {
493 database_id,
494 barrier_interval_ms,
495 checkpoint_frequency,
496 sender,
497 }) => {
498 self.periodic_barriers
499 .update_database_barrier(
500 database_id,
501 barrier_interval_ms,
502 checkpoint_frequency,
503 );
504 if sender.send(()).is_err() {
505 warn!("failed to notify finish of update database barrier");
506 }
507 }
508 BarrierManagerRequest::MayHaveSnapshotBackfillingJob(tx) => {
509 if tx.send(self.checkpoint_control.may_have_snapshot_backfilling_jobs()).is_err() {
510 warn!("failed to may have snapshot backfill job");
511 }
512 }
513 }
514 } else {
515 tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
516 return;
517 }
518 }
519
520 changed_worker = self.active_streaming_nodes.changed() => {
521 #[cfg(debug_assertions)]
522 {
523 self.active_streaming_nodes.validate_change().await;
524 }
525
526 info!(?changed_worker, "worker changed");
527
528 match changed_worker {
529 ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) => {
530 self.partial_graph_manager.add_worker(node, &*self.context).await;
531 }
532 ActiveStreamingWorkerChange::Remove(node) => {
533 self.partial_graph_manager.remove_worker(node);
534 }
535 }
536 }
537
538 notification = local_notification_rx.recv() => {
539 let notification = notification.unwrap();
540 if let LocalNotification::SystemParamsChange(p) = notification {
541 {
542 self.periodic_barriers.set_sys_barrier_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
543 self.periodic_barriers
544 .set_sys_checkpoint_frequency(p.checkpoint_frequency());
545 self.system_enable_per_database_isolation = p.per_database_isolation();
546 self.adaptive_parallelism_strategy = p.adaptive_parallelism_strategy();
547 }
548 }
549 }
550 complete_result = self
551 .completing_task
552 .next_completed_barrier(
553 &mut self.periodic_barriers,
554 &mut self.checkpoint_control,
555 &mut self.partial_graph_manager,
556 &self.context,
557 &self.env,
558 ) => {
559 match complete_result {
560 Ok(output) => {
561 self.checkpoint_control.ack_completed(output);
562 }
563 Err(e) => {
564 self.failure_recovery(e).await;
565 }
566 }
567 },
568 event = self.checkpoint_control.next_event() => {
569 let result: MetaResult<()> = try {
570 match event {
571 CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
572 let database_id = entering_initializing.database_id();
573 let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
574 resp.root_err.as_ref().map(|root_err| {
575 (*worker_id, ScoredError {
576 error: Status::internal(&root_err.err_msg),
577 score: Score(root_err.score)
578 })
579 })
580 }));
581 Self::report_collect_failure(&self.env, &error);
582 self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
583 let result: MetaResult<_> = try {
584 let runtime_info = self.context.reload_database_runtime_info(database_id).await.inspect_err(|err| {
585 warn!(%database_id, err = %err.as_report(), "reload runtime info failed");
586 })?;
587 let rendered_info = render_runtime_info(
588 self.env.actor_id_generator(),
589 &self.active_streaming_nodes,
590 self.adaptive_parallelism_strategy,
591 &runtime_info.recovery_context,
592 database_id,
593 )
594 .inspect_err(|err: &MetaError| {
595 warn!(%database_id, err = %err.as_report(), "render runtime info failed");
596 })?;
597 if let Some(rendered_info) = rendered_info {
598 BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
599 database_id,
600 &rendered_info.job_infos,
601 &self.active_streaming_nodes,
602 &rendered_info.stream_actors,
603 &runtime_info.state_table_committed_epochs,
604 )
605 .inspect_err(|err| {
606 warn!(%database_id, err = ?err.as_report(), "database runtime info failed validation");
607 })?;
608 Some((runtime_info, rendered_info))
609 } else {
610 None
611 }
612 };
613 match result {
614 Ok(Some((runtime_info, rendered_info))) => {
615 entering_initializing.enter(
616 runtime_info,
617 rendered_info,
618 &mut self.partial_graph_manager,
619 );
620 }
621 Ok(None) => {
622 info!(%database_id, "database removed after reloading empty runtime info");
623 self.context.mark_ready(MarkReadyOptions::Database(database_id));
625 entering_initializing.remove();
626 }
627 Err(e) => {
628 entering_initializing.fail_reload_runtime_info(e);
629 }
630 }
631 }
632 CheckpointControlEvent::EnteringRunning(entering_running) => {
633 self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
634 entering_running.enter();
635 }
636 }
637 };
638 if let Err(e) = result {
639 self.failure_recovery(e).await;
640 }
641 }
642 event = self.partial_graph_manager.next_event(&self.context) => {
643 let result: MetaResult<()> = try {
644 match event {
645 PartialGraphManagerEvent::Worker(_worker_id, WorkerEvent::WorkerConnected) => {
646 }
648 PartialGraphManagerEvent::Worker(worker_id, WorkerEvent::WorkerError { err, affected_partial_graphs }) => {
649 let failed_databases = self
650 .checkpoint_control
651 .databases_failed_at_worker_err(worker_id)
652 .chain(
653 affected_partial_graphs
654 .into_iter()
655 .map(|partial_graph_id| {
656 let (database_id, _) = from_partial_graph_id(partial_graph_id);
657 database_id
658 })
659 )
660 .collect::<HashSet<_>>();
661 if !failed_databases.is_empty() {
662 if !self.enable_recovery {
663 panic!("control stream to worker {} failed but recovery not enabled: {}", worker_id, err.as_report());
664 }
665 if !self.enable_per_database_isolation() {
666 Err(err.clone())?;
667 }
668 Self::report_collect_failure(&self.env, &err);
669 for database_id in failed_databases {
670 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
671 warn!(%worker_id, %database_id, "database entering recovery on node failure");
672 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
673 self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
674 let output = self.completing_task.wait_completing_task().await?;
676 entering_recovery.enter(output, &mut self.partial_graph_manager);
677 }
678 }
679 } else {
680 warn!(%worker_id, "no barrier to collect from worker, ignore err");
681 }
682 continue;
683 }
684 PartialGraphManagerEvent::PartialGraph(partial_graph_id, event) => {
685 let (database_id, _creating_job_id) = from_partial_graph_id(partial_graph_id);
686 match event {
687 PartialGraphEvent::BarrierCollected(collected_barrier) => {
688 self.checkpoint_control.barrier_collected(partial_graph_id, collected_barrier, &mut self.periodic_barriers)?;
689 }
690 PartialGraphEvent::Error(worker_id) => {
691 if !self.enable_recovery {
692 panic!("database {database_id} failure reported from {worker_id} but recovery not enabled")
693 }
694 if !self.enable_per_database_isolation() {
695 Err(anyhow!("database {database_id} report failure from {worker_id}"))?;
696 }
697 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
698 warn!(%database_id, "database entering recovery");
699 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
700 let output = self.completing_task.wait_completing_task().await?;
702 entering_recovery.enter(output, &mut self.partial_graph_manager);
703 }
704 }
705 PartialGraphEvent::Reset(reset_resps) => {
706 self.checkpoint_control.on_partial_graph_reset(partial_graph_id, reset_resps);
707 }
708 PartialGraphEvent::Initialized => {
709 self.checkpoint_control.on_partial_graph_initialized(partial_graph_id);
710 }
711 }
712 }
713 };
714 };
715 if let Err(e) = result {
716 self.failure_recovery(e).await;
717 }
718 }
719 new_barrier = self.periodic_barriers.next_barrier(&*self.context) => {
720 let database_id = new_barrier.database_id;
721 let new_barrier = if matches!(
722 new_barrier.command,
723 Some((Command::RescheduleIntent { .. }, _))
724 ) {
725 let env = self.env.clone();
726 let worker_nodes = self
727 .active_streaming_nodes
728 .current()
729 .iter()
730 .map(|(worker_id, worker)| (*worker_id, worker.clone()))
731 .collect();
732 let adaptive_parallelism_strategy = self.adaptive_parallelism_strategy;
733 let database_info = self.checkpoint_control.database_info(database_id);
734 match resolve_reschedule_intent(
735 env,
736 worker_nodes,
737 adaptive_parallelism_strategy,
738 database_info,
739 new_barrier,
740 ) {
741 Ok(Some(new_barrier)) => new_barrier,
742 Ok(None) => continue,
743 Err(err) => {
744 self.failure_recovery(err).await;
745 continue;
746 }
747 }
748 } else {
749 new_barrier
750 };
751 if let Err(e) = self.checkpoint_control.handle_new_barrier(
752 new_barrier,
753 &mut self.partial_graph_manager,
754 self.adaptive_parallelism_strategy,
755 self.active_streaming_nodes.current()
756 ) {
757 if !self.enable_recovery {
758 panic!(
759 "failed to inject barrier to some databases but recovery not enabled: {:?}", (
760 database_id,
761 e.as_report()
762 )
763 );
764 }
765 let result: MetaResult<_> = try {
766 if !self.enable_per_database_isolation() {
767 let err = anyhow!("failed to inject barrier to databases: {:?}", (database_id, e.as_report()));
768 Err(err)?;
769 } else if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
770 warn!(%database_id, e = %e.as_report(),"database entering recovery on inject failure");
771 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(e).context("inject barrier failure").into()));
772 let output = self.completing_task.wait_completing_task().await?;
774 entering_recovery.enter(output, &mut self.partial_graph_manager);
775 }
776 };
777 if let Err(e) = result {
778 self.failure_recovery(e).await;
779 }
780 }
781 }
782 }
783 }
784 }
785}
786
787impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
788 pub async fn clear_on_err(&mut self, err: &MetaError) {
790 match replace(&mut self.completing_task, CompletingTask::None) {
792 CompletingTask::None | CompletingTask::Err(_) => {}
793 CompletingTask::Completing {
794 epochs_to_ack,
795 join_handle,
796 ..
797 } => {
798 info!("waiting for completing command to finish in recovery");
799 match join_handle.await {
800 Err(e) => {
801 warn!(err = %e.as_report(), "failed to join completing task");
802 }
803 Ok(Err(e)) => {
804 warn!(
805 err = %e.as_report(),
806 "failed to complete barrier during clear"
807 );
808 }
809 Ok(Ok(hummock_version_stats)) => {
810 self.checkpoint_control
811 .ack_completed(BarrierCompleteOutput {
812 epochs_to_ack,
813 hummock_version_stats,
814 });
815 }
816 }
817 }
818 };
819 self.partial_graph_manager.notify_all_err(err);
820 }
821}
822
823impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
824 async fn failure_recovery(&mut self, err: MetaError) {
826 self.clear_on_err(&err).await;
827
828 if self.enable_recovery {
829 let span = tracing::info_span!(
830 "failure_recovery",
831 error = %err.as_report(),
832 );
833
834 crate::telemetry::report_event(
835 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
836 "failure_recovery",
837 0,
838 None,
839 None,
840 None,
841 );
842
843 let reason = RecoveryReason::Failover(err);
844
845 self.recovery(false, reason).instrument(span).await;
848 } else {
849 panic!(
850 "a streaming error occurred while recovery is disabled, aborting: {:?}",
851 err.as_report()
852 );
853 }
854 }
855
856 async fn adhoc_recovery(&mut self) {
857 let err = MetaErrorInner::AdhocRecovery.into();
858 self.clear_on_err(&err).await;
859
860 let span = tracing::info_span!(
861 "adhoc_recovery",
862 error = %err.as_report(),
863 );
864
865 crate::telemetry::report_event(
866 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
867 "adhoc_recovery",
868 0,
869 None,
870 None,
871 None,
872 );
873
874 self.recovery(false, RecoveryReason::Adhoc)
877 .instrument(span)
878 .await;
879 }
880}
881
882impl<C> GlobalBarrierWorker<C> {
883 pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
885 use risingwave_pb::meta::event_log;
887 let event = event_log::EventCollectBarrierFail {
888 error: error.to_report_string(),
889 };
890 env.event_log_manager_ref()
891 .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
892 }
893}
894
895mod retry_strategy {
896 use std::time::Duration;
897
898 use tokio_retry::strategy::{ExponentialBackoff, jitter};
899
900 const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
902 const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
904
905 pub(crate) type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
924
925 pub(crate) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
926 Box::pin(tokio::time::sleep(duration))
927 }
928
929 pub(crate) type RetryBackoffStrategy =
930 impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
931
932 #[inline(always)]
934 pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> + Send + 'static {
935 ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
936 .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
937 .map(jitter)
938 }
939
940 #[define_opaque(RetryBackoffStrategy)]
941 pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
942 get_retry_strategy().map(get_retry_backoff_future)
943 }
944}
945
946pub(crate) use retry_strategy::*;
947use risingwave_common::error::tonic::extra::{Score, ScoredError};
948use risingwave_pb::meta::event_log::{Event, EventRecovery};
949
950use crate::barrier::partial_graph::{
951 PartialGraphEvent, PartialGraphManager, PartialGraphManagerEvent, WorkerEvent,
952};
953
954impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
955 pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
963 self.partial_graph_manager.clear_worker();
965
966 let reason_str = match &recovery_reason {
967 RecoveryReason::Bootstrap => "bootstrap".to_owned(),
968 RecoveryReason::Failover(err) => {
969 format!("failed over: {}", err.as_report())
970 }
971 RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
972 };
973 self.context.abort_and_mark_blocked(None, recovery_reason);
974
975 self.recovery_inner(is_paused, reason_str).await;
976 self.context.mark_ready(MarkReadyOptions::Global {
977 blocked_databases: self.checkpoint_control.recovering_databases().collect(),
978 });
979 }
980
981 #[await_tree::instrument("recovery({recovery_reason})")]
982 async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
983 let event_log_manager_ref = self.env.event_log_manager_ref();
984
985 tracing::info!("recovery start!");
986 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
987 EventRecovery::global_recovery_start(recovery_reason.clone()),
988 )]);
989
990 let retry_strategy = get_retry_strategy();
991
992 let recovery_timer = GLOBAL_META_METRICS
995 .recovery_latency
996 .with_label_values(&["global"])
997 .start_timer();
998
999 let enable_per_database_isolation = self.enable_per_database_isolation();
1000
1001 let recovery_future = tokio_retry::Retry::spawn(retry_strategy, || async {
1002 self.env.stream_client_pool().invalidate_all();
1003 self.context
1008 .notify_creating_job_failed(None, recovery_reason.clone())
1009 .await;
1010
1011 let runtime_info_snapshot = self
1012 .context
1013 .reload_runtime_info()
1014 .await?;
1015 let BarrierWorkerRuntimeInfoSnapshot {
1016 active_streaming_nodes,
1017 recovery_context,
1018 mut state_table_committed_epochs,
1019 mut state_table_log_epochs,
1020 mut mv_depended_subscriptions,
1021 mut background_jobs,
1022 hummock_version_stats,
1023 database_infos,
1024 mut cdc_table_snapshot_splits,
1025 } = runtime_info_snapshot;
1026
1027 let mut partial_graph_manager = PartialGraphManager::recover(
1028 self.env.clone(),
1029 active_streaming_nodes.current(),
1030 self.context.clone(),
1031 )
1032 .await;
1033 {
1034 let mut empty_databases = HashSet::new();
1035 let mut collected_databases = HashMap::new();
1036 let mut collecting_databases = HashMap::new();
1037 let mut failed_databases = HashMap::new();
1038 for &database_id in recovery_context.fragment_context.database_map.keys() {
1039 let mut recoverer = partial_graph_manager.start_recover();
1040 let result: MetaResult<_> = try {
1041 let Some(rendered_info) = render_runtime_info(
1042 self.env.actor_id_generator(),
1043 &active_streaming_nodes,
1044 self.adaptive_parallelism_strategy,
1045 &recovery_context,
1046 database_id,
1047 )
1048 .inspect_err(|err: &MetaError| {
1049 warn!(%database_id, err = %err.as_report(), "render runtime info failed");
1050 })? else {
1051 empty_databases.insert(database_id);
1052 continue;
1053 };
1054 BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
1055 database_id,
1056 &rendered_info.job_infos,
1057 &active_streaming_nodes,
1058 &rendered_info.stream_actors,
1059 &state_table_committed_epochs,
1060 )
1061 .inspect_err(|err| {
1062 warn!(%database_id, err = %err.as_report(), "rendered runtime info failed validation");
1063 })?;
1064 let RenderedDatabaseRuntimeInfo {
1065 job_infos,
1066 stream_actors,
1067 mut source_splits,
1068 } = rendered_info;
1069 recoverer.inject_database_initial_barrier(
1070 database_id,
1071 job_infos,
1072 &recovery_context.job_extra_info,
1073 &mut state_table_committed_epochs,
1074 &mut state_table_log_epochs,
1075 &recovery_context.fragment_relations,
1076 &stream_actors,
1077 &mut source_splits,
1078 &mut background_jobs,
1079 &mut mv_depended_subscriptions,
1080 is_paused,
1081 &hummock_version_stats,
1082 &mut cdc_table_snapshot_splits,
1083 )?
1084 };
1085 let collector = match result {
1086 Ok(database) => {
1087 DatabaseInitialBarrierCollector {
1088 database_id,
1089 initializing_partial_graphs: recoverer.all_initializing(),
1090 database,
1091 }
1092 }
1093 Err(e) => {
1094 warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
1095 assert!(failed_databases.insert(database_id, recoverer.failed()).is_none(), "non-duplicate");
1096 continue;
1097 }
1098 };
1099 if !collector.is_collected() {
1100 assert!(collecting_databases.insert(database_id, collector).is_none());
1101 } else {
1102 warn!(%database_id, "database has no node to inject initial barrier");
1103 assert!(collected_databases.insert(database_id, collector.finish()).is_none());
1104 }
1105 }
1106 if !empty_databases.is_empty() {
1107 info!(?empty_databases, "empty database in global recovery");
1108 }
1109 while !collecting_databases.is_empty() {
1110 match partial_graph_manager.next_event(&self.context).await {
1111 PartialGraphManagerEvent::Worker(_, WorkerEvent::WorkerConnected) => {
1112 }
1114 PartialGraphManagerEvent::Worker(worker_id, WorkerEvent::WorkerError { err, affected_partial_graphs }) => {
1115 let affected_databases: HashSet<_> = affected_partial_graphs.into_iter().map(|partial_graph_id| {
1116 let (database_id, _) = from_partial_graph_id(partial_graph_id);
1117 database_id
1118 }).collect();
1119 warn!(%worker_id, err = %err.as_report(), "worker node failure during recovery");
1120 for (failed_database_id, collector) in collecting_databases.extract_if(|database_id, collector| {
1121 !collector.is_valid_after_worker_err(worker_id) || affected_databases.contains(database_id)
1122 }) {
1123 warn!(%failed_database_id, %worker_id, "database failed to recovery in global recovery due to worker node err");
1124 let resetting_partial_graphs: HashSet<_> = collector.all_partial_graphs().collect();
1125 partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1126 assert!(failed_databases.insert(failed_database_id, resetting_partial_graphs).is_none());
1127 }
1128 }
1129 PartialGraphManagerEvent::PartialGraph(partial_graph_id, event) => {
1130 match event {
1131 PartialGraphEvent::BarrierCollected(_) => {
1132 unreachable!("no barrier collected event on initializing")
1133 }
1134 PartialGraphEvent::Reset(_) => {
1135 unreachable!("no partial graph reset on initializing")
1136 }
1137 PartialGraphEvent::Error(worker_id) => {
1138 let (database_id, _) = from_partial_graph_id(partial_graph_id);
1139 if let Some(collector) = collecting_databases.remove(&database_id) {
1140 warn!(%database_id, %worker_id, "database reset during global recovery");
1141 let resetting_partial_graphs: HashSet<_> = collector.all_partial_graphs().collect();
1142 partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1143 assert!(failed_databases.insert(database_id, resetting_partial_graphs).is_none());
1144 } else if let Some(database) = collected_databases.remove(&database_id) {
1145 warn!(%database_id, %worker_id, "database initialized but later reset during global recovery");
1146 let resetting_partial_graphs: HashSet<_> = database_partial_graphs(database_id, database.creating_streaming_job_controls.keys().copied()).collect();
1147 partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1148 assert!(failed_databases.insert(database_id, resetting_partial_graphs).is_none());
1149 } else {
1150 assert!(failed_databases.contains_key(&database_id));
1151 }
1152 }
1153 PartialGraphEvent::Initialized => {
1154 let (database_id, _) = from_partial_graph_id(partial_graph_id);
1155 if failed_databases.contains_key(&database_id) {
1156 assert!(!collecting_databases.contains_key(&database_id));
1157 continue;
1159 }
1160 let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
1161 unreachable!("should exist")
1162 };
1163 let collector = entry.get_mut();
1164 collector.partial_graph_initialized(partial_graph_id);
1165 if collector.is_collected() {
1166 let collector = entry.remove();
1167 assert!(collected_databases.insert(database_id, collector.finish()).is_none());
1168 }
1169 }
1170 }
1171 }
1172 }
1173 }
1174 debug!("collected initial barrier");
1175 if !background_jobs.is_empty() {
1176 warn!(job_ids = ?background_jobs.iter().collect_vec(), "unused recovered background mview in recovery");
1177 }
1178 if !mv_depended_subscriptions.is_empty() {
1179 warn!(?mv_depended_subscriptions, "unused subscription infos in recovery");
1180 }
1181 if !state_table_committed_epochs.is_empty() {
1182 warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
1183 }
1184 if !enable_per_database_isolation && !failed_databases.is_empty() {
1185 return Err(anyhow!(
1186 "global recovery failed due to failure of databases {:?}",
1187 failed_databases.keys().collect_vec()).into()
1188 );
1189 }
1190 let checkpoint_control = CheckpointControl::recover(
1191 collected_databases,
1192 failed_databases,
1193 hummock_version_stats,
1194 self.env.clone(),
1195 );
1196
1197 let reader = self.env.system_params_reader().await;
1198 let checkpoint_frequency = reader.checkpoint_frequency();
1199 let barrier_interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
1200 let periodic_barriers = PeriodicBarriers::new(
1201 barrier_interval,
1202 checkpoint_frequency,
1203 database_infos,
1204 );
1205
1206 Ok((
1207 active_streaming_nodes,
1208 partial_graph_manager,
1209 checkpoint_control,
1210 periodic_barriers,
1211 ))
1212 }
1213 }.inspect_err(|err: &MetaError| {
1214 tracing::error!(error = %err.as_report(), "recovery failed");
1215 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
1216 EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
1217 )]);
1218 GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
1219 }))
1220 .instrument(tracing::info_span!("recovery_attempt"));
1221
1222 let mut recover_txs = vec![];
1223 let mut update_barrier_requests = vec![];
1224 pin_mut!(recovery_future);
1225 let mut request_rx_closed = false;
1226 let new_state = loop {
1227 select! {
1228 biased;
1229 new_state = &mut recovery_future => {
1230 break new_state.expect("Retry until recovery success.");
1231 }
1232 request = pin!(self.request_rx.recv()), if !request_rx_closed => {
1233 let Some(request) = request else {
1234 warn!("request rx channel closed during recovery");
1235 request_rx_closed = true;
1236 continue;
1237 };
1238 match request {
1239 BarrierManagerRequest::GetBackfillProgress(tx) => {
1240 let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1241 }
1242 BarrierManagerRequest::GetFragmentBackfillProgress(tx) => {
1243 let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1244 }
1245 BarrierManagerRequest::GetCdcProgress(tx) => {
1246 let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1247 }
1248 BarrierManagerRequest::AdhocRecovery(tx) => {
1249 recover_txs.push(tx);
1250 }
1251 BarrierManagerRequest::UpdateDatabaseBarrier(request) => {
1252 update_barrier_requests.push(request);
1253 }
1254 BarrierManagerRequest::MayHaveSnapshotBackfillingJob(tx) => {
1255 let _ = tx.send(true);
1257 }
1258 }
1259 }
1260 }
1261 };
1262
1263 let duration = recovery_timer.stop_and_record();
1264
1265 (
1266 self.active_streaming_nodes,
1267 self.partial_graph_manager,
1268 self.checkpoint_control,
1269 self.periodic_barriers,
1270 ) = new_state;
1271
1272 tracing::info!("recovery success");
1273
1274 for UpdateDatabaseBarrierRequest {
1275 database_id,
1276 barrier_interval_ms,
1277 checkpoint_frequency,
1278 sender,
1279 } in update_barrier_requests
1280 {
1281 self.periodic_barriers.update_database_barrier(
1282 database_id,
1283 barrier_interval_ms,
1284 checkpoint_frequency,
1285 );
1286 let _ = sender.send(());
1287 }
1288
1289 for tx in recover_txs {
1290 let _ = tx.send(());
1291 }
1292
1293 let recovering_databases = self
1294 .checkpoint_control
1295 .recovering_databases()
1296 .map(|database| database.as_raw_id())
1297 .collect_vec();
1298 let running_databases = self
1299 .checkpoint_control
1300 .running_databases()
1301 .map(|database| database.as_raw_id())
1302 .collect_vec();
1303
1304 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
1305 EventRecovery::global_recovery_success(
1306 recovery_reason.clone(),
1307 duration as f32,
1308 running_databases,
1309 recovering_databases,
1310 ),
1311 )]);
1312
1313 self.env
1314 .notification_manager()
1315 .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
1316 self.env
1317 .notification_manager()
1318 .notify_compute_without_version(Operation::Update, Info::Recovery(Recovery {}));
1319 }
1320}