1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::pin::pin;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use arc_swap::ArcSwap;
24use futures::{TryFutureExt, pin_mut};
25use itertools::Itertools;
26use risingwave_common::catalog::DatabaseId;
27use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
28use risingwave_common::system_param::reader::SystemParamsRead;
29use risingwave_meta_model::WorkerId;
30use risingwave_pb::common::WorkerNode;
31use risingwave_pb::meta::Recovery;
32use risingwave_pb::meta::subscribe_response::{Info, Operation};
33use thiserror_ext::AsReport;
34use tokio::select;
35use tokio::sync::mpsc;
36use tokio::sync::oneshot::{Receiver, Sender};
37use tokio::task::JoinHandle;
38use tonic::Status;
39use tracing::{Instrument, debug, error, info, warn};
40
41use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
42use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
43use crate::barrier::context::recovery::{RenderedDatabaseRuntimeInfo, render_runtime_info};
44use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
45use crate::barrier::info::InflightDatabaseInfo;
46use crate::barrier::rpc::{
47 DatabaseInitialBarrierCollector, database_partial_graphs, from_partial_graph_id,
48 merge_node_rpc_errors,
49};
50use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
51use crate::barrier::{
52 BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command,
53 RecoveryReason, RescheduleContext, UpdateDatabaseBarrierRequest, schedule,
54};
55use crate::controller::scale::{materialize_actor_assignments, preview_actor_assignments};
56use crate::error::MetaErrorInner;
57use crate::hummock::HummockManagerRef;
58use crate::manager::sink_coordination::SinkCoordinatorManager;
59use crate::manager::{
60 ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
61 MetadataManager,
62};
63use crate::rpc::metrics::GLOBAL_META_METRICS;
64use crate::stream::{
65 GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef, build_reschedule_commands,
66 rendered_layout_matches_current,
67};
68use crate::{MetaError, MetaResult};
69
70pub(super) struct GlobalBarrierWorker<C> {
80 enable_recovery: bool,
82
83 periodic_barriers: PeriodicBarriers,
85
86 system_enable_per_database_isolation: bool,
88
89 pub(super) context: Arc<C>,
90
91 env: MetaSrvEnv,
92
93 checkpoint_control: CheckpointControl,
94
95 completing_task: CompletingTask,
98
99 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
100
101 active_streaming_nodes: ActiveStreamingWorkerNodes,
102
103 partial_graph_manager: PartialGraphManager,
104}
105
106#[cfg(test)]
107mod tests {
108 use std::collections::HashMap;
109
110 use tokio::sync::oneshot;
111
112 use super::*;
113 use crate::barrier::RescheduleContext;
114 use crate::barrier::notifier::Notifier;
115
116 #[tokio::test]
117 async fn test_reschedule_intent_without_workers_notifies_start_failed() {
118 let env = MetaSrvEnv::for_test().await;
119 let database_id = DatabaseId::new(1);
120 let database_info =
121 InflightDatabaseInfo::empty(database_id, env.shared_actor_infos().clone());
122 let (started_tx, started_rx) = oneshot::channel();
123 let (_collected_tx, _collected_rx) = oneshot::channel();
124
125 let notifier = Notifier {
126 started: Some(started_tx),
127 collected: Some(_collected_tx),
128 };
129
130 let new_barrier = schedule::NewBarrier {
131 database_id,
132 command: Some((
133 Command::RescheduleIntent {
134 context: RescheduleContext::empty(),
135 reschedule_plan: None,
136 },
137 vec![notifier],
138 )),
139 span: tracing::Span::none(),
140 checkpoint: false,
141 };
142
143 let result =
144 resolve_reschedule_intent(env, HashMap::new(), Some(&database_info), new_barrier);
145
146 assert!(matches!(result, Ok(None)));
147 let started = started_rx.await.expect("started notifier dropped");
148 assert!(started.is_err());
149 }
150}
151
152impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
153 pub(super) async fn new_inner(
154 env: MetaSrvEnv,
155 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
156 context: Arc<C>,
157 ) -> Self {
158 let enable_recovery = env.opts.enable_recovery;
159
160 let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized();
161
162 let partial_graph_manager = PartialGraphManager::uninitialized(env.clone());
163
164 let reader = env.system_params_reader().await;
165 let system_enable_per_database_isolation = reader.per_database_isolation();
166 let periodic_barriers = PeriodicBarriers::default();
168
169 let checkpoint_control = CheckpointControl::new(env.clone());
170 Self {
171 enable_recovery,
172 periodic_barriers,
173 system_enable_per_database_isolation,
174 context,
175 env,
176 checkpoint_control,
177 completing_task: CompletingTask::None,
178 request_rx,
179 active_streaming_nodes,
180 partial_graph_manager,
181 }
182 }
183}
184
185fn resolve_reschedule_intent(
186 env: MetaSrvEnv,
187 worker_nodes: HashMap<WorkerId, WorkerNode>,
188 database_info: Option<&InflightDatabaseInfo>,
189 mut new_barrier: schedule::NewBarrier,
190) -> MetaResult<Option<schedule::NewBarrier>> {
191 let Some((command, notifiers)) = new_barrier.command.take() else {
192 return Ok(Some(new_barrier));
193 };
194
195 match command {
196 Command::RescheduleIntent {
197 context,
198 reschedule_plan,
199 } => {
200 if let Some(reschedule_plan) = reschedule_plan {
201 new_barrier.command = Some((
202 Command::RescheduleIntent {
203 context,
204 reschedule_plan: Some(reschedule_plan),
205 },
206 notifiers,
207 ));
208 return Ok(Some(new_barrier));
209 }
210 let span = tracing::info_span!(
211 "resolve_reschedule_intent",
212 database_id = %new_barrier.database_id
213 );
214 let reschedule_plan = {
215 let _guard = span.enter();
216 build_reschedule_from_context(
217 &env,
218 worker_nodes,
219 new_barrier.database_id,
220 context,
221 database_info.ok_or_else(|| {
222 anyhow!(
223 "database {} not found when resolving reschedule intent",
224 new_barrier.database_id
225 )
226 })?,
227 )
228 };
229 match reschedule_plan {
230 Ok(Some(reschedule_plan)) => {
231 new_barrier.command = Some((
232 Command::RescheduleIntent {
233 context: RescheduleContext::empty(),
234 reschedule_plan: Some(reschedule_plan),
235 },
236 notifiers,
237 ));
238 Ok(Some(new_barrier))
239 }
240 Ok(None) => {
241 for mut notifier in notifiers {
243 notifier.notify_started();
244 notifier.notify_collected();
245 }
246 Ok(None)
247 }
248 Err(err) => {
249 for notifier in notifiers {
250 notifier.notify_start_failed(err.clone());
251 }
252 Ok(None)
253 }
254 }
255 }
256 _ => {
257 new_barrier.command = Some((command, notifiers));
258 Ok(Some(new_barrier))
259 }
260 }
261}
262
263fn build_reschedule_from_context(
264 env: &MetaSrvEnv,
265 worker_nodes: HashMap<WorkerId, WorkerNode>,
266 database_id: DatabaseId,
267 context: RescheduleContext,
268 database_info: &InflightDatabaseInfo,
269) -> MetaResult<Option<crate::barrier::ReschedulePlan>> {
270 if worker_nodes.is_empty() {
271 return Err(anyhow!("no active streaming workers for reschedule").into());
272 }
273
274 if context.is_empty() {
275 return Ok(None);
276 }
277
278 let all_prev_fragments = database_info
281 .fragment_infos()
282 .map(|fragment| (fragment.fragment_id, fragment))
283 .collect();
284
285 let previewed = preview_actor_assignments(&worker_nodes, &context.loaded)?;
286
287 if rendered_layout_matches_current(&previewed.fragments, &all_prev_fragments)? {
288 return Ok(None);
289 }
290
291 let actor_id_counter = env.actor_id_generator();
292 let rendered = materialize_actor_assignments(actor_id_counter, previewed);
295 let mut commands = build_reschedule_commands(rendered.fragments, context, all_prev_fragments)?;
296 Ok(commands.remove(&database_id))
297}
298
299impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
300 pub async fn new(
302 scheduled_barriers: schedule::ScheduledBarriers,
303 env: MetaSrvEnv,
304 metadata_manager: MetadataManager,
305 hummock_manager: HummockManagerRef,
306 source_manager: SourceManagerRef,
307 sink_manager: SinkCoordinatorManager,
308 scale_controller: ScaleControllerRef,
309 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
310 barrier_scheduler: schedule::BarrierScheduler,
311 refresh_manager: GlobalRefreshManagerRef,
312 ) -> Self {
313 let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
314
315 let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
316 scheduled_barriers,
317 status,
318 metadata_manager,
319 hummock_manager,
320 source_manager,
321 scale_controller,
322 env.clone(),
323 barrier_scheduler,
324 refresh_manager,
325 sink_manager,
326 ));
327
328 Self::new_inner(env, request_rx, context).await
329 }
330
331 pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
332 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
333 let fut = (self.env.await_tree_reg())
334 .register_derived_root("Global Barrier Worker")
335 .instrument(self.run(shutdown_rx));
336 let join_handle = tokio::spawn(fut);
337
338 (join_handle, shutdown_tx)
339 }
340
341 async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
343 let paused = self
344 .env
345 .system_params_reader()
346 .await
347 .pause_on_next_bootstrap()
348 || self.env.opts.pause_on_next_bootstrap_offline;
349
350 if paused {
351 warn!(
352 "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
353 It will now be reset to `false`. \
354 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
355 PAUSE_ON_NEXT_BOOTSTRAP_KEY
356 );
357 self.env
358 .system_params_manager_impl_ref()
359 .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
360 .await?;
361 }
362 Ok(paused)
363 }
364
365 async fn run(mut self, shutdown_rx: Receiver<()>) {
367 tracing::info!(
368 "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
369 self.enable_recovery,
370 self.checkpoint_control.in_flight_barrier_nums,
371 );
372
373 if !self.enable_recovery {
374 let job_exist = self
375 .context
376 .metadata_manager
377 .catalog_controller
378 .has_any_streaming_jobs()
379 .await
380 .unwrap();
381 if job_exist {
382 panic!(
383 "Some streaming jobs already exist in meta, please start with recovery enabled \
384 or clean up the metadata using `./risedev clean-data`"
385 );
386 }
387 }
388
389 {
390 let span = tracing::info_span!("bootstrap_recovery");
395 crate::telemetry::report_event(
396 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
397 "normal_recovery",
398 0,
399 None,
400 None,
401 None,
402 );
403
404 let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
405
406 self.recovery(paused, RecoveryReason::Bootstrap)
407 .instrument(span)
408 .await;
409 }
410
411 Box::pin(self.run_inner(shutdown_rx)).await
412 }
413}
414
415impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
416 fn enable_per_database_isolation(&self) -> bool {
417 self.system_enable_per_database_isolation && {
418 if let Err(e) =
419 risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()
420 {
421 warn!(error = %e.as_report(), "DatabaseFailureIsolation disabled by license");
422 false
423 } else {
424 true
425 }
426 }
427 }
428
429 pub(super) async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
430 let (local_notification_tx, mut local_notification_rx) =
431 tokio::sync::mpsc::unbounded_channel();
432 self.env
433 .notification_manager()
434 .insert_local_sender(local_notification_tx);
435
436 loop {
438 tokio::select! {
439 biased;
440
441 _ = &mut shutdown_rx => {
443 tracing::info!("Barrier manager is stopped");
444 break;
445 }
446
447 request = self.request_rx.recv() => {
448 if let Some(request) = request {
449 match request {
450 BarrierManagerRequest::GetBackfillProgress(result_tx) => {
451 let progress = self.checkpoint_control.gen_backfill_progress();
452 if result_tx.send(Ok(progress)).is_err() {
453 error!("failed to send get ddl progress");
454 }
455 }
456 BarrierManagerRequest::GetFragmentBackfillProgress(result_tx) => {
457 let progress =
458 self.checkpoint_control.gen_fragment_backfill_progress();
459 if result_tx.send(Ok(progress)).is_err() {
460 error!("failed to send get fragment backfill progress");
461 }
462 }
463 BarrierManagerRequest::GetCdcProgress(result_tx) => {
464 let progress = self.checkpoint_control.gen_cdc_progress();
465 if result_tx.send(Ok(progress)).is_err() {
466 error!("failed to send get ddl progress");
467 }
468 }
469 BarrierManagerRequest::AdhocRecovery(sender) => {
471 self.adhoc_recovery().await;
472 if sender.send(()).is_err() {
473 warn!("failed to notify finish of adhoc recovery");
474 }
475 }
476 BarrierManagerRequest::UpdateDatabaseBarrier( UpdateDatabaseBarrierRequest {
477 database_id,
478 barrier_interval_ms,
479 checkpoint_frequency,
480 sender,
481 }) => {
482 self.periodic_barriers
483 .update_database_barrier(
484 database_id,
485 barrier_interval_ms,
486 checkpoint_frequency,
487 );
488 if sender.send(()).is_err() {
489 warn!("failed to notify finish of update database barrier");
490 }
491 }
492 BarrierManagerRequest::MayHaveSnapshotBackfillingJob(tx) => {
493 if tx.send(self.checkpoint_control.may_have_snapshot_backfilling_jobs()).is_err() {
494 warn!("failed to may have snapshot backfill job");
495 }
496 }
497 }
498 } else {
499 tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
500 return;
501 }
502 }
503
504 changed_worker = self.active_streaming_nodes.changed() => {
505 #[cfg(debug_assertions)]
506 {
507 self.active_streaming_nodes.validate_change().await;
508 }
509
510 info!(?changed_worker, "worker changed");
511
512 match changed_worker {
513 ActiveStreamingWorkerChange::Add(node)
514 | ActiveStreamingWorkerChange::Update(node) => {
515 self.partial_graph_manager
516 .add_worker(node, self.context.clone())
517 .await;
518 }
519 ActiveStreamingWorkerChange::Remove(node) => {
520 self.partial_graph_manager.remove_worker(node);
521 }
522 }
523 }
524
525 notification = local_notification_rx.recv() => {
526 let notification = notification.unwrap();
527 if let LocalNotification::SystemParamsChange(p) = notification {
528 {
529 self.periodic_barriers.set_sys_barrier_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
530 self.periodic_barriers
531 .set_sys_checkpoint_frequency(p.checkpoint_frequency());
532 self.system_enable_per_database_isolation = p.per_database_isolation();
533 }
534 }
535 }
536 complete_result = self
537 .completing_task
538 .next_completed_barrier(
539 &mut self.periodic_barriers,
540 &mut self.checkpoint_control,
541 &mut self.partial_graph_manager,
542 &self.context,
543 &self.env,
544 ) => {
545 match complete_result {
546 Ok(output) => {
547 self.checkpoint_control.ack_completed(&mut self.partial_graph_manager, output);
548 }
549 Err(e) => {
550 self.failure_recovery(e).await;
551 }
552 }
553 },
554 event = self.checkpoint_control.next_event() => {
555 let result: MetaResult<()> = try {
556 match event {
557 CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
558 let database_id = entering_initializing.database_id();
559 let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
560 resp.root_err.as_ref().map(|root_err| {
561 (*worker_id, ScoredError {
562 error: Status::internal(&root_err.err_msg),
563 score: Score(root_err.score)
564 })
565 })
566 }));
567 Self::report_collect_failure(&self.env, &error);
568 self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
569 let result: MetaResult<_> = try {
570 let runtime_info = self.context.reload_database_runtime_info(database_id).await.inspect_err(|err| {
571 warn!(%database_id, err = %err.as_report(), "reload runtime info failed");
572 })?;
573 let rendered_info = render_runtime_info(
574 self.env.actor_id_generator(),
575 &self.active_streaming_nodes,
576 &runtime_info.recovery_context,
577 database_id,
578 )
579 .inspect_err(|err: &MetaError| {
580 warn!(%database_id, err = %err.as_report(), "render runtime info failed");
581 })?;
582 if let Some(rendered_info) = rendered_info {
583 BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
584 database_id,
585 &rendered_info.job_infos,
586 &self.active_streaming_nodes,
587 &rendered_info.stream_actors,
588 &runtime_info.state_table_committed_epochs,
589 )
590 .inspect_err(|err| {
591 warn!(%database_id, err = ?err.as_report(), "database runtime info failed validation");
592 })?;
593 Some((runtime_info, rendered_info))
594 } else {
595 None
596 }
597 };
598 match result {
599 Ok(Some((runtime_info, rendered_info))) => {
600 entering_initializing.enter(
601 runtime_info,
602 rendered_info,
603 &mut self.partial_graph_manager,
604 );
605 }
606 Ok(None) => {
607 info!(%database_id, "database removed after reloading empty runtime info");
608 self.context.mark_ready(MarkReadyOptions::Database(database_id));
610 entering_initializing.remove();
611 }
612 Err(e) => {
613 entering_initializing.fail_reload_runtime_info(e);
614 }
615 }
616 }
617 CheckpointControlEvent::EnteringRunning(entering_running) => {
618 self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
619 entering_running.enter();
620 }
621 }
622 };
623 if let Err(e) = result {
624 self.failure_recovery(e).await;
625 }
626 }
627 event = self.partial_graph_manager.next_event(&self.context) => {
628 let result: MetaResult<()> = try {
629 match event {
630 PartialGraphManagerEvent::Worker(_worker_id, WorkerEvent::WorkerConnected) => {
631 }
633 PartialGraphManagerEvent::Worker(worker_id, WorkerEvent::WorkerError { err, affected_partial_graphs }) => {
634 let failed_databases = self
635 .checkpoint_control
636 .databases_failed_at_worker_err(worker_id)
637 .chain(
638 affected_partial_graphs
639 .into_iter()
640 .map(|partial_graph_id| {
641 let (database_id, _) = from_partial_graph_id(partial_graph_id);
642 database_id
643 })
644 )
645 .collect::<HashSet<_>>();
646 if !failed_databases.is_empty() {
647 if !self.enable_recovery {
648 panic!("control stream to worker {} failed but recovery not enabled: {}", worker_id, err.as_report());
649 }
650 if !self.enable_per_database_isolation() {
651 Err(err.clone())?;
652 }
653 Self::report_collect_failure(&self.env, &err);
654 for database_id in failed_databases {
655 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
656 warn!(%worker_id, %database_id, "database entering recovery on node failure");
657 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
658 self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
659 let output = self.completing_task.wait_completing_task().await?;
661 entering_recovery.enter(output, &mut self.partial_graph_manager);
662 }
663 }
664 } else {
665 warn!(%worker_id, "no barrier to collect from worker, ignore err");
666 }
667 continue;
668 }
669 PartialGraphManagerEvent::PartialGraph(partial_graph_id, event) => {
670 let (database_id, _creating_job_id) = from_partial_graph_id(partial_graph_id);
671 match event {
672 PartialGraphEvent::BarrierCollected(collected_barrier) => {
673 self.checkpoint_control.barrier_collected(partial_graph_id, collected_barrier, &mut self.periodic_barriers)?;
674 }
675 PartialGraphEvent::Error(worker_id) => {
676 if !self.enable_recovery {
677 panic!("database {database_id} failure reported from {worker_id} but recovery not enabled")
678 }
679 if !self.enable_per_database_isolation() {
680 Err(anyhow!("database {database_id} report failure from {worker_id}"))?;
681 }
682 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
683 warn!(%database_id, "database entering recovery");
684 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
685 let output = self.completing_task.wait_completing_task().await?;
687 entering_recovery.enter(output, &mut self.partial_graph_manager);
688 }
689 }
690 PartialGraphEvent::Reset(reset_resps) => {
691 self.checkpoint_control.on_partial_graph_reset(partial_graph_id, reset_resps);
692 }
693 PartialGraphEvent::Initialized => {
694 self.checkpoint_control.on_partial_graph_initialized(partial_graph_id);
695 }
696 }
697 }
698 };
699 };
700 if let Err(e) = result {
701 self.failure_recovery(e).await;
702 }
703 }
704 new_barrier = self.periodic_barriers.next_barrier(&*self.context) => {
705 let database_id = new_barrier.database_id;
706 let new_barrier = if matches!(
707 new_barrier.command,
708 Some((Command::RescheduleIntent { .. }, _))
709 ) {
710 let env = self.env.clone();
711 let worker_nodes = self
712 .active_streaming_nodes
713 .current()
714 .iter()
715 .map(|(worker_id, worker)| (*worker_id, worker.clone()))
716 .collect();
717 let database_info = self.checkpoint_control.database_info(database_id);
718 match resolve_reschedule_intent(
719 env,
720 worker_nodes,
721 database_info,
722 new_barrier,
723 ) {
724 Ok(Some(new_barrier)) => new_barrier,
725 Ok(None) => continue,
726 Err(err) => {
727 self.failure_recovery(err).await;
728 continue;
729 }
730 }
731 } else {
732 new_barrier
733 };
734 if let Err(e) = self.checkpoint_control.handle_new_barrier(
735 new_barrier,
736 &mut self.partial_graph_manager,
737 self.active_streaming_nodes.current()
738 ) {
739 if !self.enable_recovery {
740 panic!(
741 "failed to inject barrier to some databases but recovery not enabled: {:?}", (
742 database_id,
743 e.as_report()
744 )
745 );
746 }
747 let result: MetaResult<_> = try {
748 if !self.enable_per_database_isolation() {
749 let err = anyhow!("failed to inject barrier to databases: {:?}", (database_id, e.as_report()));
750 Err(err)?;
751 } else if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
752 warn!(%database_id, e = %e.as_report(),"database entering recovery on inject failure");
753 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(e).context("inject barrier failure").into()));
754 let output = self.completing_task.wait_completing_task().await?;
756 entering_recovery.enter(output, &mut self.partial_graph_manager);
757 }
758 };
759 if let Err(e) = result {
760 self.failure_recovery(e).await;
761 }
762 }
763 }
764 }
765 }
766 }
767}
768
769impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
770 pub async fn clear_on_err(&mut self, err: &MetaError) {
772 match replace(&mut self.completing_task, CompletingTask::None) {
774 CompletingTask::None | CompletingTask::Err(_) => {}
775 CompletingTask::Completing {
776 epochs_to_ack,
777 join_handle,
778 ..
779 } => {
780 info!("waiting for completing command to finish in recovery");
781 match join_handle.await {
782 Err(e) => {
783 warn!(err = %e.as_report(), "failed to join completing task");
784 }
785 Ok(Err(e)) => {
786 warn!(
787 err = %e.as_report(),
788 "failed to complete barrier during clear"
789 );
790 }
791 Ok(Ok(hummock_version_stats)) => {
792 self.checkpoint_control.ack_completed(
793 &mut self.partial_graph_manager,
794 BarrierCompleteOutput {
795 epochs_to_ack,
796 hummock_version_stats,
797 },
798 );
799 }
800 }
801 }
802 };
803 self.partial_graph_manager.notify_all_err(err);
804 }
805}
806
807impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
808 async fn failure_recovery(&mut self, err: MetaError) {
810 self.clear_on_err(&err).await;
811
812 if self.enable_recovery {
813 let span = tracing::info_span!(
814 "failure_recovery",
815 error = %err.as_report(),
816 );
817
818 crate::telemetry::report_event(
819 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
820 "failure_recovery",
821 0,
822 None,
823 None,
824 None,
825 );
826
827 let reason = RecoveryReason::Failover(err);
828
829 self.recovery(false, reason).instrument(span).await;
832 } else {
833 panic!(
834 "a streaming error occurred while recovery is disabled, aborting: {:?}",
835 err.as_report()
836 );
837 }
838 }
839
840 async fn adhoc_recovery(&mut self) {
841 let err = MetaErrorInner::AdhocRecovery.into();
842 self.clear_on_err(&err).await;
843
844 let span = tracing::info_span!(
845 "adhoc_recovery",
846 error = %err.as_report(),
847 );
848
849 crate::telemetry::report_event(
850 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
851 "adhoc_recovery",
852 0,
853 None,
854 None,
855 None,
856 );
857
858 self.recovery(false, RecoveryReason::Adhoc)
861 .instrument(span)
862 .await;
863 }
864}
865
866impl<C> GlobalBarrierWorker<C> {
867 pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
869 use risingwave_pb::meta::event_log;
871 let event = event_log::EventCollectBarrierFail {
872 error: error.to_report_string(),
873 };
874 env.event_log_manager_ref()
875 .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
876 }
877}
878
879mod retry_strategy {
880 use std::time::Duration;
881
882 use tokio_retry::strategy::{ExponentialBackoff, jitter};
883
884 const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
886 const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
888
889 pub(crate) type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
908
909 pub(crate) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
910 Box::pin(tokio::time::sleep(duration))
911 }
912
913 pub(crate) type RetryBackoffStrategy =
914 impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
915
916 #[inline(always)]
918 pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> + Send + 'static {
919 ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
920 .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
921 .map(jitter)
922 }
923
924 #[define_opaque(RetryBackoffStrategy)]
925 pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
926 get_retry_strategy().map(get_retry_backoff_future)
927 }
928}
929
930pub(crate) use retry_strategy::*;
931use risingwave_common::error::tonic::extra::{Score, ScoredError};
932use risingwave_pb::meta::event_log::{Event, EventRecovery};
933
934use crate::barrier::partial_graph::{
935 PartialGraphEvent, PartialGraphManager, PartialGraphManagerEvent, WorkerEvent,
936};
937
938impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
939 pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
947 self.partial_graph_manager.clear_worker();
949
950 let reason_str = match &recovery_reason {
951 RecoveryReason::Bootstrap => "bootstrap".to_owned(),
952 RecoveryReason::Failover(err) => {
953 format!("failed over: {}", err.as_report())
954 }
955 RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
956 };
957 self.context.abort_and_mark_blocked(None, recovery_reason);
958
959 self.recovery_inner(is_paused, reason_str).await;
960 self.context.mark_ready(MarkReadyOptions::Global {
961 blocked_databases: self.checkpoint_control.recovering_databases().collect(),
962 });
963 }
964
965 #[await_tree::instrument("recovery({recovery_reason})")]
966 async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
967 let event_log_manager_ref = self.env.event_log_manager_ref();
968
969 tracing::info!("recovery start!");
970 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
971 EventRecovery::global_recovery_start(recovery_reason.clone()),
972 )]);
973
974 let retry_strategy = get_retry_strategy();
975
976 let recovery_timer = GLOBAL_META_METRICS
979 .recovery_latency
980 .with_label_values(&["global"])
981 .start_timer();
982
983 let enable_per_database_isolation = self.enable_per_database_isolation();
984
985 let recovery_future = tokio_retry::Retry::spawn(retry_strategy, || async {
986 self.env.stream_client_pool().invalidate_all();
987 self.context
992 .notify_creating_job_failed(None, recovery_reason.clone())
993 .await;
994
995 let runtime_info_snapshot = self
996 .context
997 .reload_runtime_info()
998 .await?;
999 let BarrierWorkerRuntimeInfoSnapshot {
1000 active_streaming_nodes,
1001 recovery_context,
1002 mut state_table_committed_epochs,
1003 mut state_table_log_epochs,
1004 mut mv_depended_subscriptions,
1005 mut background_jobs,
1006 hummock_version_stats,
1007 database_infos,
1008 mut cdc_table_snapshot_splits,
1009 } = runtime_info_snapshot;
1010
1011 let mut partial_graph_manager = PartialGraphManager::recover(
1012 self.env.clone(),
1013 active_streaming_nodes.current(),
1014 self.context.clone(),
1015 )
1016 .await;
1017 {
1018 let mut empty_databases = HashSet::new();
1019 let mut collected_databases = HashMap::new();
1020 let mut collecting_databases = HashMap::new();
1021 let mut failed_databases = HashMap::new();
1022 for &database_id in recovery_context.fragment_context.database_map.keys() {
1023 let mut recoverer = partial_graph_manager.start_recover();
1024 let result: MetaResult<_> = try {
1025 let Some(rendered_info) = render_runtime_info(
1026 self.env.actor_id_generator(),
1027 &active_streaming_nodes,
1028 &recovery_context,
1029 database_id,
1030 )
1031 .inspect_err(|err: &MetaError| {
1032 warn!(%database_id, err = %err.as_report(), "render runtime info failed");
1033 })? else {
1034 empty_databases.insert(database_id);
1035 continue;
1036 };
1037 BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
1038 database_id,
1039 &rendered_info.job_infos,
1040 &active_streaming_nodes,
1041 &rendered_info.stream_actors,
1042 &state_table_committed_epochs,
1043 )
1044 .inspect_err(|err| {
1045 warn!(%database_id, err = %err.as_report(), "rendered runtime info failed validation");
1046 })?;
1047 let RenderedDatabaseRuntimeInfo {
1048 job_infos,
1049 stream_actors,
1050 mut source_splits,
1051 batch_refresh,
1052 } = rendered_info;
1053 recoverer.inject_database_initial_barrier(
1054 database_id,
1055 job_infos,
1056 &recovery_context.job_extra_info,
1057 &mut state_table_committed_epochs,
1058 &mut state_table_log_epochs,
1059 &recovery_context.fragment_relations,
1060 &stream_actors,
1061 &mut source_splits,
1062 &mut background_jobs,
1063 &mut mv_depended_subscriptions,
1064 is_paused,
1065 &hummock_version_stats,
1066 &mut cdc_table_snapshot_splits,
1067 batch_refresh,
1068 )?
1069 };
1070 let collector = match result {
1071 Ok(database) => {
1072 DatabaseInitialBarrierCollector {
1073 database_id,
1074 initializing_partial_graphs: recoverer.all_initializing(),
1075 database,
1076 }
1077 }
1078 Err(e) => {
1079 warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
1080 assert!(failed_databases.insert(database_id, recoverer.failed()).is_none(), "non-duplicate");
1081 continue;
1082 }
1083 };
1084 if !collector.is_collected() {
1085 assert!(collecting_databases.insert(database_id, collector).is_none());
1086 } else {
1087 warn!(%database_id, "database has no node to inject initial barrier");
1088 assert!(collected_databases.insert(database_id, collector.finish()).is_none());
1089 }
1090 }
1091 if !empty_databases.is_empty() {
1092 info!(?empty_databases, "empty database in global recovery");
1093 }
1094 while !collecting_databases.is_empty() {
1095 match partial_graph_manager.next_event(&self.context).await {
1096 PartialGraphManagerEvent::Worker(_, WorkerEvent::WorkerConnected) => {
1097 }
1099 PartialGraphManagerEvent::Worker(worker_id, WorkerEvent::WorkerError { err, affected_partial_graphs }) => {
1100 let affected_databases: HashSet<_> = affected_partial_graphs.into_iter().map(|partial_graph_id| {
1101 let (database_id, _) = from_partial_graph_id(partial_graph_id);
1102 database_id
1103 }).collect();
1104 warn!(%worker_id, err = %err.as_report(), "worker node failure during recovery");
1105 for (failed_database_id, collector) in collecting_databases.extract_if(|database_id, collector| {
1106 !collector.is_valid_after_worker_err(worker_id) || affected_databases.contains(database_id)
1107 }) {
1108 warn!(%failed_database_id, %worker_id, "database failed to recovery in global recovery due to worker node err");
1109 let resetting_partial_graphs: HashSet<_> = collector.all_partial_graphs().collect();
1110 partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1111 assert!(failed_databases.insert(failed_database_id, resetting_partial_graphs).is_none());
1112 }
1113 }
1114 PartialGraphManagerEvent::PartialGraph(partial_graph_id, event) => {
1115 match event {
1116 PartialGraphEvent::BarrierCollected(_) => {
1117 unreachable!("no barrier collected event on initializing")
1118 }
1119 PartialGraphEvent::Reset(_) => {
1120 unreachable!("no partial graph reset on initializing")
1121 }
1122 PartialGraphEvent::Error(worker_id) => {
1123 let (database_id, _) = from_partial_graph_id(partial_graph_id);
1124 if let Some(collector) = collecting_databases.remove(&database_id) {
1125 warn!(%database_id, %worker_id, "database reset during global recovery");
1126 let resetting_partial_graphs: HashSet<_> = collector.all_partial_graphs().collect();
1127 partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1128 assert!(failed_databases.insert(database_id, resetting_partial_graphs).is_none());
1129 } else if let Some(database) = collected_databases.remove(&database_id) {
1130 warn!(%database_id, %worker_id, "database initialized but later reset during global recovery");
1131 let resetting_partial_graphs: HashSet<_> = database_partial_graphs(database_id, database.independent_checkpoint_job_controls.keys().copied()).collect();
1132 partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1133 assert!(failed_databases.insert(database_id, resetting_partial_graphs).is_none());
1134 } else {
1135 assert!(failed_databases.contains_key(&database_id));
1136 }
1137 }
1138 PartialGraphEvent::Initialized => {
1139 let (database_id, _) = from_partial_graph_id(partial_graph_id);
1140 if failed_databases.contains_key(&database_id) {
1141 assert!(!collecting_databases.contains_key(&database_id));
1142 continue;
1144 }
1145 let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
1146 unreachable!("should exist")
1147 };
1148 let collector = entry.get_mut();
1149 collector.partial_graph_initialized(partial_graph_id);
1150 if collector.is_collected() {
1151 let collector = entry.remove();
1152 assert!(collected_databases.insert(database_id, collector.finish()).is_none());
1153 }
1154 }
1155 }
1156 }
1157 }
1158 }
1159 debug!("collected initial barrier");
1160 if !background_jobs.is_empty() {
1161 warn!(job_ids = ?background_jobs.iter().collect_vec(), "unused recovered background mview in recovery");
1162 }
1163 if !mv_depended_subscriptions.is_empty() {
1164 warn!(?mv_depended_subscriptions, "unused subscription infos in recovery");
1165 }
1166 if !state_table_committed_epochs.is_empty() {
1167 warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
1168 }
1169 if !enable_per_database_isolation && !failed_databases.is_empty() {
1170 return Err(anyhow!(
1171 "global recovery failed due to failure of databases {:?}",
1172 failed_databases.keys().collect_vec()).into()
1173 );
1174 }
1175 let checkpoint_control = CheckpointControl::recover(
1176 collected_databases,
1177 failed_databases,
1178 hummock_version_stats,
1179 self.env.clone(),
1180 );
1181
1182 let reader = self.env.system_params_reader().await;
1183 let checkpoint_frequency = reader.checkpoint_frequency();
1184 let barrier_interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
1185 let periodic_barriers = PeriodicBarriers::new(
1186 barrier_interval,
1187 checkpoint_frequency,
1188 database_infos,
1189 );
1190
1191 Ok((
1192 active_streaming_nodes,
1193 partial_graph_manager,
1194 checkpoint_control,
1195 periodic_barriers,
1196 ))
1197 }
1198 }.inspect_err(|err: &MetaError| {
1199 tracing::error!(error = %err.as_report(), "recovery failed");
1200 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
1201 EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
1202 )]);
1203 GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
1204 }))
1205 .instrument(tracing::info_span!("recovery_attempt"));
1206
1207 let mut recover_txs = vec![];
1208 let mut update_barrier_requests = vec![];
1209 pin_mut!(recovery_future);
1210 let mut request_rx_closed = false;
1211 let new_state = loop {
1212 select! {
1213 biased;
1214 new_state = &mut recovery_future => {
1215 break new_state.expect("Retry until recovery success.");
1216 }
1217 request = pin!(self.request_rx.recv()), if !request_rx_closed => {
1218 let Some(request) = request else {
1219 warn!("request rx channel closed during recovery");
1220 request_rx_closed = true;
1221 continue;
1222 };
1223 match request {
1224 BarrierManagerRequest::GetBackfillProgress(tx) => {
1225 let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1226 }
1227 BarrierManagerRequest::GetFragmentBackfillProgress(tx) => {
1228 let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1229 }
1230 BarrierManagerRequest::GetCdcProgress(tx) => {
1231 let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1232 }
1233 BarrierManagerRequest::AdhocRecovery(tx) => {
1234 recover_txs.push(tx);
1235 }
1236 BarrierManagerRequest::UpdateDatabaseBarrier(request) => {
1237 update_barrier_requests.push(request);
1238 }
1239 BarrierManagerRequest::MayHaveSnapshotBackfillingJob(tx) => {
1240 let _ = tx.send(true);
1242 }
1243 }
1244 }
1245 }
1246 };
1247
1248 let duration = recovery_timer.stop_and_record();
1249
1250 (
1251 self.active_streaming_nodes,
1252 self.partial_graph_manager,
1253 self.checkpoint_control,
1254 self.periodic_barriers,
1255 ) = new_state;
1256
1257 tracing::info!("recovery success");
1258
1259 for UpdateDatabaseBarrierRequest {
1260 database_id,
1261 barrier_interval_ms,
1262 checkpoint_frequency,
1263 sender,
1264 } in update_barrier_requests
1265 {
1266 self.periodic_barriers.update_database_barrier(
1267 database_id,
1268 barrier_interval_ms,
1269 checkpoint_frequency,
1270 );
1271 let _ = sender.send(());
1272 }
1273
1274 for tx in recover_txs {
1275 let _ = tx.send(());
1276 }
1277
1278 let recovering_databases = self
1279 .checkpoint_control
1280 .recovering_databases()
1281 .map(|database| database.as_raw_id())
1282 .collect_vec();
1283 let running_databases = self
1284 .checkpoint_control
1285 .running_databases()
1286 .map(|database| database.as_raw_id())
1287 .collect_vec();
1288
1289 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
1290 EventRecovery::global_recovery_success(
1291 recovery_reason.clone(),
1292 duration as f32,
1293 running_databases,
1294 recovering_databases,
1295 ),
1296 )]);
1297
1298 self.env
1299 .notification_manager()
1300 .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
1301 self.env
1302 .notification_manager()
1303 .notify_compute_without_version(Operation::Update, Info::Recovery(Recovery {}));
1304 }
1305}