1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::pin::pin;
19use std::sync::Arc;
20use std::time::Duration;
21
22use anyhow::anyhow;
23use arc_swap::ArcSwap;
24use futures::{TryFutureExt, pin_mut};
25use itertools::Itertools;
26use risingwave_common::catalog::DatabaseId;
27use risingwave_common::id::JobId;
28use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
29use risingwave_common::system_param::reader::SystemParamsRead;
30use risingwave_meta_model::WorkerId;
31use risingwave_pb::common::WorkerNode;
32use risingwave_pb::meta::Recovery;
33use risingwave_pb::meta::subscribe_response::{Info, Operation};
34use thiserror_ext::AsReport;
35use tokio::select;
36use tokio::sync::mpsc;
37use tokio::sync::oneshot::{Receiver, Sender};
38use tokio::task::JoinHandle;
39use tonic::Status;
40use tracing::{Instrument, debug, error, info, warn};
41
42use crate::barrier::checkpoint::{CheckpointControl, CheckpointControlEvent};
43use crate::barrier::complete_task::{BarrierCompleteOutput, CompletingTask};
44use crate::barrier::context::recovery::{RenderedDatabaseRuntimeInfo, render_runtime_info};
45use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
46use crate::barrier::info::InflightDatabaseInfo;
47use crate::barrier::rpc::{
48 DatabaseInitialBarrierCollector, database_partial_graphs, from_partial_graph_id,
49 merge_node_rpc_errors,
50};
51use crate::barrier::schedule::{MarkReadyOptions, PeriodicBarriers};
52use crate::barrier::{
53 BarrierManagerRequest, BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command,
54 RecoveryReason, RescheduleContext, UpdateDatabaseBarrierRequest, schedule,
55};
56use crate::controller::scale::{materialize_actor_assignments, preview_actor_assignments};
57use crate::error::MetaErrorInner;
58use crate::hummock::HummockManagerRef;
59use crate::manager::iceberg_v3_sink::IcebergV3SinkManager;
60use crate::manager::sink_coordination::SinkCoordinatorManager;
61use crate::manager::{
62 ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
63 MetadataManager,
64};
65use crate::rpc::metrics::GLOBAL_META_METRICS;
66use crate::stream::{
67 GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef, build_reschedule_commands,
68 rendered_layout_matches_current,
69};
70use crate::{MetaError, MetaResult};
71
72pub(super) struct GlobalBarrierWorker<C> {
82 enable_recovery: bool,
84
85 periodic_barriers: PeriodicBarriers,
87
88 system_enable_per_database_isolation: bool,
90
91 pub(super) context: Arc<C>,
92
93 env: MetaSrvEnv,
94
95 checkpoint_control: CheckpointControl,
96
97 completing_task: CompletingTask,
100
101 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
102
103 active_streaming_nodes: ActiveStreamingWorkerNodes,
104
105 partial_graph_manager: PartialGraphManager,
106}
107
108#[cfg(test)]
109mod tests {
110 use std::collections::HashMap;
111
112 use tokio::sync::oneshot;
113
114 use super::*;
115 use crate::barrier::RescheduleContext;
116 use crate::barrier::notifier::Notifier;
117
118 #[tokio::test]
119 async fn test_reschedule_intent_without_workers_notifies_start_failed() {
120 let env = MetaSrvEnv::for_test().await;
121 let database_id = DatabaseId::new(1);
122 let database_info =
123 InflightDatabaseInfo::empty(database_id, env.shared_actor_infos().clone());
124 let (started_tx, started_rx) = oneshot::channel();
125 let (_collected_tx, _collected_rx) = oneshot::channel();
126
127 let notifier = Notifier {
128 started: Some(started_tx),
129 collected: Some(_collected_tx),
130 };
131
132 let new_barrier = schedule::NewBarrier {
133 database_id,
134 command: Some((
135 Command::RescheduleIntent {
136 context: RescheduleContext::empty(),
137 reschedule_plan: None,
138 },
139 vec![notifier],
140 )),
141 span: tracing::Span::none(),
142 checkpoint: false,
143 };
144
145 let result =
146 resolve_reschedule_intent(env, HashMap::new(), Some(&database_info), new_barrier);
147
148 assert!(matches!(result, Ok(None)));
149 let started = started_rx.await.expect("started notifier dropped");
150 assert!(started.is_err());
151 }
152}
153
154impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
155 pub(super) async fn new_inner(
156 env: MetaSrvEnv,
157 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
158 context: Arc<C>,
159 ) -> Self {
160 let enable_recovery = env.opts.enable_recovery;
161
162 let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized();
163
164 let partial_graph_manager = PartialGraphManager::uninitialized(env.clone());
165
166 let reader = env.system_params_reader().await;
167 let system_enable_per_database_isolation = reader.per_database_isolation();
168 let periodic_barriers = PeriodicBarriers::default();
170
171 let checkpoint_control = CheckpointControl::new(env.clone());
172 Self {
173 enable_recovery,
174 periodic_barriers,
175 system_enable_per_database_isolation,
176 context,
177 env,
178 checkpoint_control,
179 completing_task: CompletingTask::None,
180 request_rx,
181 active_streaming_nodes,
182 partial_graph_manager,
183 }
184 }
185}
186
187fn resolve_reschedule_intent(
188 env: MetaSrvEnv,
189 worker_nodes: HashMap<WorkerId, WorkerNode>,
190 database_info: Option<&InflightDatabaseInfo>,
191 mut new_barrier: schedule::NewBarrier,
192) -> MetaResult<Option<schedule::NewBarrier>> {
193 let Some((command, notifiers)) = new_barrier.command.take() else {
194 return Ok(Some(new_barrier));
195 };
196
197 match command {
198 Command::RescheduleIntent {
199 context,
200 reschedule_plan,
201 } => {
202 if let Some(reschedule_plan) = reschedule_plan {
203 new_barrier.command = Some((
204 Command::RescheduleIntent {
205 context,
206 reschedule_plan: Some(reschedule_plan),
207 },
208 notifiers,
209 ));
210 return Ok(Some(new_barrier));
211 }
212 let span = tracing::info_span!(
213 "resolve_reschedule_intent",
214 database_id = %new_barrier.database_id
215 );
216 let reschedule_plan = {
217 let _guard = span.enter();
218 build_reschedule_from_context(
219 &env,
220 worker_nodes,
221 new_barrier.database_id,
222 context,
223 database_info.ok_or_else(|| {
224 anyhow!(
225 "database {} not found when resolving reschedule intent",
226 new_barrier.database_id
227 )
228 })?,
229 )
230 };
231 match reschedule_plan {
232 Ok(Some(reschedule_plan)) => {
233 new_barrier.command = Some((
234 Command::RescheduleIntent {
235 context: RescheduleContext::empty(),
236 reschedule_plan: Some(reschedule_plan),
237 },
238 notifiers,
239 ));
240 Ok(Some(new_barrier))
241 }
242 Ok(None) => {
243 for mut notifier in notifiers {
245 notifier.notify_started();
246 notifier.notify_collected();
247 }
248 Ok(None)
249 }
250 Err(err) => {
251 for notifier in notifiers {
252 notifier.notify_start_failed(err.clone());
253 }
254 Ok(None)
255 }
256 }
257 }
258 _ => {
259 new_barrier.command = Some((command, notifiers));
260 Ok(Some(new_barrier))
261 }
262 }
263}
264
265fn build_reschedule_from_context(
266 env: &MetaSrvEnv,
267 worker_nodes: HashMap<WorkerId, WorkerNode>,
268 database_id: DatabaseId,
269 context: RescheduleContext,
270 database_info: &InflightDatabaseInfo,
271) -> MetaResult<Option<crate::barrier::ReschedulePlan>> {
272 if worker_nodes.is_empty() {
273 return Err(anyhow!("no active streaming workers for reschedule").into());
274 }
275
276 if context.is_empty() {
277 return Ok(None);
278 }
279
280 let all_prev_fragments = database_info
283 .fragment_infos()
284 .map(|fragment| (fragment.fragment_id, fragment))
285 .collect();
286
287 let previewed = preview_actor_assignments(&worker_nodes, &context.loaded)?;
288
289 if rendered_layout_matches_current(&previewed.fragments, &all_prev_fragments)? {
290 return Ok(None);
291 }
292
293 let actor_id_counter = env.actor_id_generator();
294 let rendered = materialize_actor_assignments(actor_id_counter, previewed);
297 let mut commands = build_reschedule_commands(rendered.fragments, context, all_prev_fragments)?;
298 Ok(commands.remove(&database_id))
299}
300
301impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl> {
302 #[expect(clippy::too_many_arguments)]
304 pub async fn new(
305 scheduled_barriers: schedule::ScheduledBarriers,
306 env: MetaSrvEnv,
307 metadata_manager: MetadataManager,
308 hummock_manager: HummockManagerRef,
309 source_manager: SourceManagerRef,
310 sink_manager: SinkCoordinatorManager,
311 iceberg_v3_sink_manager: IcebergV3SinkManager,
312 scale_controller: ScaleControllerRef,
313 request_rx: mpsc::UnboundedReceiver<BarrierManagerRequest>,
314 barrier_scheduler: schedule::BarrierScheduler,
315 refresh_manager: GlobalRefreshManagerRef,
316 ) -> Self {
317 let status = Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting)));
318
319 let context = Arc::new(GlobalBarrierWorkerContextImpl::new(
320 scheduled_barriers,
321 status,
322 metadata_manager,
323 hummock_manager,
324 source_manager,
325 scale_controller,
326 env.clone(),
327 barrier_scheduler,
328 refresh_manager,
329 sink_manager,
330 iceberg_v3_sink_manager,
331 ));
332
333 Self::new_inner(env, request_rx, context).await
334 }
335
336 pub fn start(self) -> (JoinHandle<()>, Sender<()>) {
337 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
338 let fut = (self.env.await_tree_reg())
339 .register_derived_root("Global Barrier Worker")
340 .instrument(self.run(shutdown_rx));
341 let join_handle = tokio::spawn(fut);
342
343 (join_handle, shutdown_tx)
344 }
345
346 async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> {
348 let paused = self
349 .env
350 .system_params_reader()
351 .await
352 .pause_on_next_bootstrap()
353 || self.env.opts.pause_on_next_bootstrap_offline;
354
355 if paused {
356 warn!(
357 "The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
358 It will now be reset to `false`. \
359 To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
360 PAUSE_ON_NEXT_BOOTSTRAP_KEY
361 );
362 self.env
363 .system_params_manager_impl_ref()
364 .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
365 .await?;
366 }
367 Ok(paused)
368 }
369
370 async fn run(mut self, shutdown_rx: Receiver<()>) {
372 tracing::info!(
373 "Starting barrier manager with: enable_recovery={}, in_flight_barrier_nums={}",
374 self.enable_recovery,
375 self.checkpoint_control.in_flight_barrier_nums,
376 );
377
378 if !self.enable_recovery {
379 let job_exist = self
380 .context
381 .metadata_manager
382 .catalog_controller
383 .has_any_streaming_jobs()
384 .await
385 .unwrap();
386 if job_exist {
387 panic!(
388 "Some streaming jobs already exist in meta, please start with recovery enabled \
389 or clean up the metadata using `./risedev clean-data`"
390 );
391 }
392 }
393
394 {
395 let span = tracing::info_span!("bootstrap_recovery");
400 crate::telemetry::report_event(
401 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
402 "normal_recovery",
403 0,
404 None,
405 None,
406 None,
407 );
408
409 let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
410
411 self.recovery(paused, RecoveryReason::Bootstrap)
412 .instrument(span)
413 .await;
414 }
415
416 Box::pin(self.run_inner(shutdown_rx)).await
417 }
418}
419
420impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
421 fn enable_per_database_isolation(&self) -> bool {
422 self.system_enable_per_database_isolation && {
423 if let Err(e) =
424 risingwave_common::license::Feature::DatabaseFailureIsolation.check_available()
425 {
426 warn!(error = %e.as_report(), "DatabaseFailureIsolation disabled by license");
427 false
428 } else {
429 true
430 }
431 }
432 }
433
434 pub(super) async fn run_inner(mut self, mut shutdown_rx: Receiver<()>) {
435 let (local_notification_tx, mut local_notification_rx) =
436 tokio::sync::mpsc::unbounded_channel();
437 self.env
438 .notification_manager()
439 .insert_local_sender(local_notification_tx);
440
441 loop {
443 tokio::select! {
444 biased;
445
446 _ = &mut shutdown_rx => {
448 tracing::info!("Barrier manager is stopped");
449 break;
450 }
451
452 request = self.request_rx.recv() => {
453 if let Some(request) = request {
454 match request {
455 BarrierManagerRequest::GetBackfillProgress(result_tx) => {
456 let progress = self.checkpoint_control.gen_backfill_progress();
457 if result_tx.send(Ok(progress)).is_err() {
458 error!("failed to send get ddl progress");
459 }
460 }
461 BarrierManagerRequest::GetFragmentBackfillProgress(result_tx) => {
462 let progress =
463 self.checkpoint_control.gen_fragment_backfill_progress();
464 if result_tx.send(Ok(progress)).is_err() {
465 error!("failed to send get fragment backfill progress");
466 }
467 }
468 BarrierManagerRequest::GetCdcProgress(result_tx) => {
469 let progress = self.checkpoint_control.gen_cdc_progress();
470 if result_tx.send(Ok(progress)).is_err() {
471 error!("failed to send get ddl progress");
472 }
473 }
474 BarrierManagerRequest::AdhocRecovery(sender) => {
476 self.adhoc_recovery().await;
477 if sender.send(()).is_err() {
478 warn!("failed to notify finish of adhoc recovery");
479 }
480 }
481 BarrierManagerRequest::UpdateDatabaseBarrier( UpdateDatabaseBarrierRequest {
482 database_id,
483 barrier_interval_ms,
484 checkpoint_frequency,
485 sender,
486 }) => {
487 self.periodic_barriers
488 .update_database_barrier(
489 database_id,
490 barrier_interval_ms,
491 checkpoint_frequency,
492 );
493 if sender.send(()).is_err() {
494 warn!("failed to notify finish of update database barrier");
495 }
496 }
497 BarrierManagerRequest::MayHaveSnapshotBackfillingJob(tx) => {
498 if tx.send(self.checkpoint_control.may_have_snapshot_backfilling_jobs()).is_err() {
499 warn!("failed to may have snapshot backfill job");
500 }
501 }
502 }
503 } else {
504 tracing::info!("end of request stream. meta node may be shutting down. Stop global barrier manager");
505 return;
506 }
507 }
508
509 changed_worker = self.active_streaming_nodes.changed() => {
510 #[cfg(debug_assertions)]
511 {
512 self.active_streaming_nodes.validate_change().await;
513 }
514
515 info!(?changed_worker, "worker changed");
516
517 match changed_worker {
518 ActiveStreamingWorkerChange::Add(node)
519 | ActiveStreamingWorkerChange::Update(node) => {
520 self.partial_graph_manager
521 .add_worker(node, self.context.clone())
522 .await;
523 }
524 ActiveStreamingWorkerChange::Remove(node) => {
525 self.partial_graph_manager.remove_worker(node);
526 }
527 }
528 }
529
530 notification = local_notification_rx.recv() => {
531 let notification = notification.unwrap();
532 if let LocalNotification::SystemParamsChange(p) = notification {
533 {
534 self.periodic_barriers.set_sys_barrier_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
535 self.periodic_barriers
536 .set_sys_checkpoint_frequency(p.checkpoint_frequency());
537 self.system_enable_per_database_isolation = p.per_database_isolation();
538 }
539 }
540 }
541 complete_result = self
542 .completing_task
543 .next_completed_barrier(
544 &mut self.periodic_barriers,
545 &mut self.checkpoint_control,
546 &mut self.partial_graph_manager,
547 &self.context,
548 &self.env,
549 ) => {
550 match complete_result {
551 Ok(output) => {
552 self.checkpoint_control.ack_completed(&mut self.partial_graph_manager, output);
553 }
554 Err(e) => {
555 self.failure_recovery(e).await;
556 }
557 }
558 },
559 event = self.checkpoint_control.next_event() => {
560 let result: MetaResult<()> = try {
561 match event {
562 CheckpointControlEvent::EnteringInitializing(entering_initializing) => {
563 let database_id = entering_initializing.database_id();
564 let error = merge_node_rpc_errors(&format!("database {} reset", database_id), entering_initializing.action.0.iter().filter_map(|(worker_id, resp)| {
565 resp.root_err.as_ref().map(|root_err| {
566 (*worker_id, ScoredError {
567 error: Status::internal(&root_err.err_msg),
568 score: Score(root_err.score)
569 })
570 })
571 }));
572 Self::report_collect_failure(&self.env, &error);
573 self.context.notify_creating_job_failed(Some(database_id), format!("{}", error.as_report())).await;
574 let result: MetaResult<_> = try {
575 let runtime_info = self.context.reload_database_runtime_info(database_id).await.inspect_err(|err| {
576 warn!(%database_id, err = %err.as_report(), "reload runtime info failed");
577 })?;
578 let rendered_info = render_runtime_info(
579 self.env.actor_id_generator(),
580 &self.active_streaming_nodes,
581 &runtime_info.recovery_context,
582 database_id,
583 )
584 .inspect_err(|err: &MetaError| {
585 warn!(%database_id, err = %err.as_report(), "render runtime info failed");
586 })?;
587 if let Some(rendered_info) = rendered_info {
588 BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
589 database_id,
590 &rendered_info.job_infos,
591 &self.active_streaming_nodes,
592 &rendered_info.stream_actors,
593 &runtime_info.state_table_committed_epochs,
594 )
595 .inspect_err(|err| {
596 warn!(%database_id, err = ?err.as_report(), "database runtime info failed validation");
597 })?;
598 Some((runtime_info, rendered_info))
599 } else {
600 None
601 }
602 };
603 match result {
604 Ok(Some((runtime_info, rendered_info))) => {
605 entering_initializing.enter(
606 runtime_info,
607 rendered_info,
608 &mut self.partial_graph_manager,
609 );
610 }
611 Ok(None) => {
612 info!(%database_id, "database removed after reloading empty runtime info");
613 self.context.mark_ready(MarkReadyOptions::Database(database_id));
615 entering_initializing.remove();
616 }
617 Err(e) => {
618 entering_initializing.fail_reload_runtime_info(e);
619 }
620 }
621 }
622 CheckpointControlEvent::EnteringRunning(entering_running) => {
623 self.context.mark_ready(MarkReadyOptions::Database(entering_running.database_id()));
624 entering_running.enter();
625 }
626 CheckpointControlEvent::BatchRefreshTrigger { database_id, job_id } => {
627 self.handle_batch_refresh_trigger(database_id, job_id).await?;
628 }
629 }
630 };
631 if let Err(e) = result {
632 self.failure_recovery(e).await;
633 }
634 }
635 event = self.partial_graph_manager.next_event(&self.context) => {
636 let result: MetaResult<()> = try {
637 match event {
638 PartialGraphManagerEvent::Worker(_worker_id, WorkerEvent::WorkerConnected) => {
639 }
641 PartialGraphManagerEvent::Worker(worker_id, WorkerEvent::WorkerError { err, affected_partial_graphs }) => {
642 let failed_databases = self
643 .checkpoint_control
644 .databases_failed_at_worker_err(worker_id)
645 .chain(
646 affected_partial_graphs
647 .into_iter()
648 .map(|partial_graph_id| {
649 let (database_id, _) = from_partial_graph_id(partial_graph_id);
650 database_id
651 })
652 )
653 .collect::<HashSet<_>>();
654 if !failed_databases.is_empty() {
655 if !self.enable_recovery {
656 panic!("control stream to worker {} failed but recovery not enabled: {}", worker_id, err.as_report());
657 }
658 if !self.enable_per_database_isolation() {
659 Err(err.clone())?;
660 }
661 Self::report_collect_failure(&self.env, &err);
662 for database_id in failed_databases {
663 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
664 warn!(%worker_id, %database_id, "database entering recovery on node failure");
665 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
666 self.context.notify_creating_job_failed(Some(database_id), format!("database {} reset due to node {} failure: {}", database_id, worker_id, err.as_report())).await;
667 let output = self.completing_task.wait_completing_task().await?;
669 entering_recovery.enter(output, &mut self.partial_graph_manager);
670 }
671 }
672 } else {
673 warn!(%worker_id, "no barrier to collect from worker, ignore err");
674 }
675 continue;
676 }
677 PartialGraphManagerEvent::PartialGraph(partial_graph_id, event) => {
678 let (database_id, _creating_job_id) = from_partial_graph_id(partial_graph_id);
679 match event {
680 PartialGraphEvent::BarrierCollected(collected_barrier) => {
681 self.checkpoint_control.barrier_collected(partial_graph_id, collected_barrier, &mut self.periodic_barriers)?;
682 }
683 PartialGraphEvent::Error(worker_id) => {
684 if !self.enable_recovery {
685 panic!("database {database_id} failure reported from {worker_id} but recovery not enabled")
686 }
687 if !self.enable_per_database_isolation() {
688 Err(MetaError::from(anyhow!("database {database_id} report failure from {worker_id}")))?;
689 }
690 if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
691 warn!(%database_id, "database entering recovery");
692 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!("reset database: {}", database_id).into()));
693 let output = self.completing_task.wait_completing_task().await?;
695 entering_recovery.enter(output, &mut self.partial_graph_manager);
696 }
697 }
698 PartialGraphEvent::Reset(reset_resps) => {
699 self.checkpoint_control.on_partial_graph_reset(partial_graph_id, reset_resps);
700 }
701 PartialGraphEvent::Initialized => {
702 self.checkpoint_control.on_partial_graph_initialized(
703 partial_graph_id,
704 &mut self.partial_graph_manager,
705 )?;
706 }
707 }
708 }
709 };
710 };
711 if let Err(e) = result {
712 self.failure_recovery(e).await;
713 }
714 }
715 new_barrier = self.periodic_barriers.next_barrier(&*self.context) => {
716 let database_id = new_barrier.database_id;
717 let new_barrier = if matches!(
718 new_barrier.command,
719 Some((Command::RescheduleIntent { .. }, _))
720 ) {
721 let env = self.env.clone();
722 let worker_nodes = self
723 .active_streaming_nodes
724 .current()
725 .iter()
726 .map(|(worker_id, worker)| (*worker_id, worker.clone()))
727 .collect();
728 let database_info = self.checkpoint_control.database_info(database_id);
729 match resolve_reschedule_intent(
730 env,
731 worker_nodes,
732 database_info,
733 new_barrier,
734 ) {
735 Ok(Some(new_barrier)) => new_barrier,
736 Ok(None) => continue,
737 Err(err) => {
738 self.failure_recovery(err).await;
739 continue;
740 }
741 }
742 } else {
743 new_barrier
744 };
745 if let Err(e) = self.checkpoint_control.handle_new_barrier(
746 new_barrier,
747 &mut self.partial_graph_manager,
748 self.active_streaming_nodes.current()
749 ) {
750 if !self.enable_recovery {
751 panic!(
752 "failed to inject barrier to some databases but recovery not enabled: {:?}", (
753 database_id,
754 e.as_report()
755 )
756 );
757 }
758 let result: MetaResult<_> = try {
759 if !self.enable_per_database_isolation() {
760 let err = anyhow!("failed to inject barrier to databases: {:?}", (database_id, e.as_report()));
761 Err(MetaError::from(err))?;
762 } else if let Some(entering_recovery) = self.checkpoint_control.on_report_failure(database_id, &mut self.partial_graph_manager) {
763 warn!(%database_id, e = %e.as_report(),"database entering recovery on inject failure");
764 self.context.abort_and_mark_blocked(Some(database_id), RecoveryReason::Failover(anyhow!(e).context("inject barrier failure").into()));
765 let output = self.completing_task.wait_completing_task().await?;
767 entering_recovery.enter(output, &mut self.partial_graph_manager);
768 }
769 };
770 if let Err(e) = result {
771 self.failure_recovery(e).await;
772 }
773 }
774 }
775 }
776 }
777 }
778}
779
780impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
781 pub async fn clear_on_err(&mut self, err: &MetaError) {
783 match replace(&mut self.completing_task, CompletingTask::None) {
785 CompletingTask::None | CompletingTask::Err(_) => {}
786 CompletingTask::Completing {
787 epochs_to_ack,
788 join_handle,
789 ..
790 } => {
791 info!("waiting for completing command to finish in recovery");
792 match join_handle.await {
793 Err(e) => {
794 warn!(err = %e.as_report(), "failed to join completing task");
795 }
796 Ok(Err(e)) => {
797 warn!(
798 err = %e.as_report(),
799 "failed to complete barrier during clear"
800 );
801 }
802 Ok(Ok(hummock_version_stats)) => {
803 self.checkpoint_control.ack_completed(
804 &mut self.partial_graph_manager,
805 BarrierCompleteOutput {
806 epochs_to_ack,
807 hummock_version_stats,
808 },
809 );
810 }
811 }
812 }
813 };
814 self.partial_graph_manager.notify_all_err(err);
815 }
816}
817
818impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
819 async fn handle_batch_refresh_trigger(
822 &mut self,
823 database_id: DatabaseId,
824 job_id: JobId,
825 ) -> MetaResult<()> {
826 let last_committed_epoch = self
828 .checkpoint_control
829 .get_batch_refresh_trigger_info(database_id, job_id);
830
831 let context = self
833 .context
834 .load_batch_refresh_trigger_context(job_id, database_id, last_committed_epoch)
835 .await?;
836
837 let started = self.checkpoint_control.start_batch_refresh_run(
839 database_id,
840 job_id,
841 &context,
842 self.active_streaming_nodes.current(),
843 self.env.actor_id_generator(),
844 &mut self.partial_graph_manager,
845 )?;
846
847 if started {
849 self.checkpoint_control
850 .apply_batch_refresh_fragment_infos(database_id, job_id);
851 }
852
853 Ok(())
854 }
855}
856
857impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
858 async fn failure_recovery(&mut self, err: MetaError) {
860 self.clear_on_err(&err).await;
861
862 if self.enable_recovery {
863 let span = tracing::info_span!(
864 "failure_recovery",
865 error = %err.as_report(),
866 );
867
868 crate::telemetry::report_event(
869 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
870 "failure_recovery",
871 0,
872 None,
873 None,
874 None,
875 );
876
877 let reason = RecoveryReason::Failover(err);
878
879 self.recovery(false, reason).instrument(span).await;
882 } else {
883 panic!(
884 "a streaming error occurred while recovery is disabled, aborting: {:?}",
885 err.as_report()
886 );
887 }
888 }
889
890 async fn adhoc_recovery(&mut self) {
891 let err = MetaErrorInner::AdhocRecovery.into();
892 self.clear_on_err(&err).await;
893
894 let span = tracing::info_span!(
895 "adhoc_recovery",
896 error = %err.as_report(),
897 );
898
899 crate::telemetry::report_event(
900 risingwave_pb::telemetry::TelemetryEventStage::Recovery,
901 "adhoc_recovery",
902 0,
903 None,
904 None,
905 None,
906 );
907
908 self.recovery(false, RecoveryReason::Adhoc)
911 .instrument(span)
912 .await;
913 }
914}
915
916impl<C> GlobalBarrierWorker<C> {
917 pub(super) fn report_collect_failure(env: &MetaSrvEnv, error: &MetaError) {
919 use risingwave_pb::meta::event_log;
921 let event = event_log::EventCollectBarrierFail {
922 error: error.to_report_string(),
923 };
924 env.event_log_manager_ref()
925 .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
926 }
927}
928
929mod retry_strategy {
930 use std::time::Duration;
931
932 use tokio_retry::strategy::{ExponentialBackoff, jitter};
933
934 const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20;
936 const RECOVERY_RETRY_MAX_INTERVAL: Duration = Duration::from_secs(5);
938
939 pub(crate) type RetryBackoffFuture = std::pin::Pin<Box<tokio::time::Sleep>>;
958
959 pub(crate) fn get_retry_backoff_future(duration: Duration) -> RetryBackoffFuture {
960 Box::pin(tokio::time::sleep(duration))
961 }
962
963 pub(crate) type RetryBackoffStrategy =
964 impl Iterator<Item = RetryBackoffFuture> + Send + 'static;
965
966 #[inline(always)]
968 pub(crate) fn get_retry_strategy() -> impl Iterator<Item = Duration> + Send + 'static {
969 ExponentialBackoff::from_millis(RECOVERY_RETRY_BASE_INTERVAL)
970 .max_delay(RECOVERY_RETRY_MAX_INTERVAL)
971 .map(jitter)
972 }
973
974 #[define_opaque(RetryBackoffStrategy)]
975 pub(crate) fn get_retry_backoff_strategy() -> RetryBackoffStrategy {
976 get_retry_strategy().map(get_retry_backoff_future)
977 }
978}
979
980pub(crate) use retry_strategy::*;
981use risingwave_common::error::tonic::extra::{Score, ScoredError};
982use risingwave_pb::meta::event_log::{Event, EventRecovery};
983
984use crate::barrier::partial_graph::{
985 PartialGraphEvent, PartialGraphManager, PartialGraphManagerEvent, WorkerEvent,
986};
987
988impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {
989 pub async fn recovery(&mut self, is_paused: bool, recovery_reason: RecoveryReason) {
997 self.partial_graph_manager.clear_worker();
999
1000 let reason_str = match &recovery_reason {
1001 RecoveryReason::Bootstrap => "bootstrap".to_owned(),
1002 RecoveryReason::Failover(err) => {
1003 format!("failed over: {}", err.as_report())
1004 }
1005 RecoveryReason::Adhoc => "adhoc recovery".to_owned(),
1006 };
1007 self.context.abort_and_mark_blocked(None, recovery_reason);
1008
1009 self.recovery_inner(is_paused, reason_str).await;
1010 self.context.mark_ready(MarkReadyOptions::Global {
1011 blocked_databases: self.checkpoint_control.recovering_databases().collect(),
1012 });
1013 }
1014
1015 #[await_tree::instrument("recovery({recovery_reason})")]
1016 async fn recovery_inner(&mut self, is_paused: bool, recovery_reason: String) {
1017 let event_log_manager_ref = self.env.event_log_manager_ref();
1018
1019 tracing::info!("recovery start!");
1020 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
1021 EventRecovery::global_recovery_start(recovery_reason.clone()),
1022 )]);
1023
1024 let retry_strategy = get_retry_strategy();
1025
1026 let recovery_timer = GLOBAL_META_METRICS
1029 .recovery_latency
1030 .with_label_values(&["global"])
1031 .start_timer();
1032
1033 let enable_per_database_isolation = self.enable_per_database_isolation();
1034
1035 let recovery_future = tokio_retry::Retry::spawn(retry_strategy, || async {
1036 self.env.stream_client_pool().invalidate_all();
1037 self.context
1042 .notify_creating_job_failed(None, recovery_reason.clone())
1043 .await;
1044
1045 let runtime_info_snapshot = self
1046 .context
1047 .reload_runtime_info()
1048 .await?;
1049 let BarrierWorkerRuntimeInfoSnapshot {
1050 active_streaming_nodes,
1051 recovery_context,
1052 mut state_table_committed_epochs,
1053 mut state_table_log_epochs,
1054 mut mv_depended_subscriptions,
1055 mut background_jobs,
1056 hummock_version_stats,
1057 database_infos,
1058 mut cdc_table_snapshot_splits,
1059 } = runtime_info_snapshot;
1060
1061 let mut partial_graph_manager = PartialGraphManager::recover(
1062 self.env.clone(),
1063 active_streaming_nodes.current(),
1064 self.context.clone(),
1065 )
1066 .await;
1067 {
1068 let mut empty_databases = HashSet::new();
1069 let mut collected_databases = HashMap::new();
1070 let mut collecting_databases = HashMap::new();
1071 let mut failed_databases = HashMap::new();
1072 for &database_id in recovery_context.fragment_context.database_map.keys() {
1073 let mut recoverer = partial_graph_manager.start_recover();
1074 let result: MetaResult<_> = try {
1075 let Some(rendered_info) = render_runtime_info(
1076 self.env.actor_id_generator(),
1077 &active_streaming_nodes,
1078 &recovery_context,
1079 database_id,
1080 )
1081 .inspect_err(|err: &MetaError| {
1082 warn!(%database_id, err = %err.as_report(), "render runtime info failed");
1083 })? else {
1084 empty_databases.insert(database_id);
1085 continue;
1086 };
1087 BarrierWorkerRuntimeInfoSnapshot::validate_database_info(
1088 database_id,
1089 &rendered_info.job_infos,
1090 &active_streaming_nodes,
1091 &rendered_info.stream_actors,
1092 &state_table_committed_epochs,
1093 )
1094 .inspect_err(|err| {
1095 warn!(%database_id, err = %err.as_report(), "rendered runtime info failed validation");
1096 })?;
1097 let RenderedDatabaseRuntimeInfo {
1098 job_infos,
1099 stream_actors,
1100 mut source_splits,
1101 batch_refresh,
1102 } = rendered_info;
1103 recoverer.inject_database_initial_barrier(
1104 database_id,
1105 job_infos,
1106 &recovery_context.job_extra_info,
1107 &mut state_table_committed_epochs,
1108 &mut state_table_log_epochs,
1109 &recovery_context.fragment_relations,
1110 &stream_actors,
1111 &mut source_splits,
1112 &mut background_jobs,
1113 &mut mv_depended_subscriptions,
1114 is_paused,
1115 &hummock_version_stats,
1116 &mut cdc_table_snapshot_splits,
1117 batch_refresh,
1118 )?
1119 };
1120 let collector = match result {
1121 Ok(database) => {
1122 DatabaseInitialBarrierCollector {
1123 database_id,
1124 initializing_partial_graphs: recoverer.all_initializing(),
1125 database,
1126 }
1127 }
1128 Err(e) => {
1129 warn!(%database_id, e = %e.as_report(), "failed to inject database initial barrier");
1130 assert!(failed_databases.insert(database_id, recoverer.failed()).is_none(), "non-duplicate");
1131 continue;
1132 }
1133 };
1134 if !collector.is_collected() {
1135 assert!(collecting_databases.insert(database_id, collector).is_none());
1136 } else {
1137 warn!(%database_id, "database has no node to inject initial barrier");
1138 assert!(collected_databases.insert(database_id, collector.finish()).is_none());
1139 }
1140 }
1141 if !empty_databases.is_empty() {
1142 info!(?empty_databases, "empty database in global recovery");
1143 }
1144 while !collecting_databases.is_empty() {
1145 match partial_graph_manager.next_event(&self.context).await {
1146 PartialGraphManagerEvent::Worker(_, WorkerEvent::WorkerConnected) => {
1147 }
1149 PartialGraphManagerEvent::Worker(worker_id, WorkerEvent::WorkerError { err, affected_partial_graphs }) => {
1150 let affected_databases: HashSet<_> = affected_partial_graphs.into_iter().map(|partial_graph_id| {
1151 let (database_id, _) = from_partial_graph_id(partial_graph_id);
1152 database_id
1153 }).collect();
1154 warn!(%worker_id, err = %err.as_report(), "worker node failure during recovery");
1155 for (failed_database_id, collector) in collecting_databases.extract_if(|database_id, collector| {
1156 !collector.is_valid_after_worker_err(worker_id) || affected_databases.contains(database_id)
1157 }) {
1158 warn!(%failed_database_id, %worker_id, "database failed to recovery in global recovery due to worker node err");
1159 let resetting_partial_graphs: HashSet<_> = collector.all_partial_graphs().collect();
1160 partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1161 assert!(failed_databases.insert(failed_database_id, resetting_partial_graphs).is_none());
1162 }
1163 }
1164 PartialGraphManagerEvent::PartialGraph(partial_graph_id, event) => {
1165 match event {
1166 PartialGraphEvent::BarrierCollected(_) => {
1167 unreachable!("no barrier collected event on initializing")
1168 }
1169 PartialGraphEvent::Reset(_) => {
1170 unreachable!("no partial graph reset on initializing")
1171 }
1172 PartialGraphEvent::Error(worker_id) => {
1173 let (database_id, _) = from_partial_graph_id(partial_graph_id);
1174 if let Some(collector) = collecting_databases.remove(&database_id) {
1175 warn!(%database_id, %worker_id, "database reset during global recovery");
1176 let resetting_partial_graphs: HashSet<_> = collector.all_partial_graphs().collect();
1177 partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1178 assert!(failed_databases.insert(database_id, resetting_partial_graphs).is_none());
1179 } else if let Some(database) = collected_databases.remove(&database_id) {
1180 warn!(%database_id, %worker_id, "database initialized but later reset during global recovery");
1181 let resetting_partial_graphs: HashSet<_> = database_partial_graphs(database_id, database.independent_checkpoint_job_controls.keys().copied()).collect();
1182 partial_graph_manager.reset_partial_graphs(resetting_partial_graphs.iter().copied());
1183 assert!(failed_databases.insert(database_id, resetting_partial_graphs).is_none());
1184 } else {
1185 assert!(failed_databases.contains_key(&database_id));
1186 }
1187 }
1188 PartialGraphEvent::Initialized => {
1189 let (database_id, _) = from_partial_graph_id(partial_graph_id);
1190 if failed_databases.contains_key(&database_id) {
1191 assert!(!collecting_databases.contains_key(&database_id));
1192 continue;
1194 }
1195 let Entry::Occupied(mut entry) = collecting_databases.entry(database_id) else {
1196 unreachable!("should exist")
1197 };
1198 let collector = entry.get_mut();
1199 collector.partial_graph_initialized(partial_graph_id);
1200 if collector.is_collected() {
1201 let collector = entry.remove();
1202 assert!(collected_databases.insert(database_id, collector.finish()).is_none());
1203 }
1204 }
1205 }
1206 }
1207 }
1208 }
1209 debug!("collected initial barrier");
1210 if !background_jobs.is_empty() {
1211 warn!(job_ids = ?background_jobs.iter().collect_vec(), "unused recovered background mview in recovery");
1212 }
1213 if !mv_depended_subscriptions.is_empty() {
1214 warn!(?mv_depended_subscriptions, "unused subscription infos in recovery");
1215 }
1216 if !state_table_committed_epochs.is_empty() {
1217 warn!(?state_table_committed_epochs, "unused state table committed epoch in recovery");
1218 }
1219 if !enable_per_database_isolation && !failed_databases.is_empty() {
1220 return Err(anyhow!(
1221 "global recovery failed due to failure of databases {:?}",
1222 failed_databases.keys().collect_vec()).into()
1223 );
1224 }
1225 let checkpoint_control = CheckpointControl::recover(
1226 collected_databases,
1227 failed_databases,
1228 hummock_version_stats,
1229 self.env.clone(),
1230 );
1231
1232 let reader = self.env.system_params_reader().await;
1233 let checkpoint_frequency = reader.checkpoint_frequency();
1234 let barrier_interval = Duration::from_millis(reader.barrier_interval_ms() as u64);
1235 let periodic_barriers = PeriodicBarriers::new(
1236 barrier_interval,
1237 checkpoint_frequency,
1238 database_infos,
1239 );
1240
1241 Ok((
1242 active_streaming_nodes,
1243 partial_graph_manager,
1244 checkpoint_control,
1245 periodic_barriers,
1246 ))
1247 }
1248 }.inspect_err(|err: &MetaError| {
1249 tracing::error!(error = %err.as_report(), "recovery failed");
1250 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
1251 EventRecovery::global_recovery_failure(recovery_reason.clone(), err.to_report_string()),
1252 )]);
1253 GLOBAL_META_METRICS.recovery_failure_cnt.with_label_values(&["global"]).inc();
1254 }))
1255 .instrument(tracing::info_span!("recovery_attempt"));
1256
1257 let mut recover_txs = vec![];
1258 let mut update_barrier_requests = vec![];
1259 pin_mut!(recovery_future);
1260 let mut request_rx_closed = false;
1261 let new_state = loop {
1262 select! {
1263 biased;
1264 new_state = &mut recovery_future => {
1265 break new_state.expect("Retry until recovery success.");
1266 }
1267 request = pin!(self.request_rx.recv()), if !request_rx_closed => {
1268 let Some(request) = request else {
1269 warn!("request rx channel closed during recovery");
1270 request_rx_closed = true;
1271 continue;
1272 };
1273 match request {
1274 BarrierManagerRequest::GetBackfillProgress(tx) => {
1275 let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1276 }
1277 BarrierManagerRequest::GetFragmentBackfillProgress(tx) => {
1278 let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1279 }
1280 BarrierManagerRequest::GetCdcProgress(tx) => {
1281 let _ = tx.send(Err(anyhow!("cluster under recovery[{}]", recovery_reason).into()));
1282 }
1283 BarrierManagerRequest::AdhocRecovery(tx) => {
1284 recover_txs.push(tx);
1285 }
1286 BarrierManagerRequest::UpdateDatabaseBarrier(request) => {
1287 update_barrier_requests.push(request);
1288 }
1289 BarrierManagerRequest::MayHaveSnapshotBackfillingJob(tx) => {
1290 let _ = tx.send(true);
1292 }
1293 }
1294 }
1295 }
1296 };
1297
1298 let duration = recovery_timer.stop_and_record();
1299
1300 (
1301 self.active_streaming_nodes,
1302 self.partial_graph_manager,
1303 self.checkpoint_control,
1304 self.periodic_barriers,
1305 ) = new_state;
1306
1307 tracing::info!("recovery success");
1308
1309 for UpdateDatabaseBarrierRequest {
1310 database_id,
1311 barrier_interval_ms,
1312 checkpoint_frequency,
1313 sender,
1314 } in update_barrier_requests
1315 {
1316 self.periodic_barriers.update_database_barrier(
1317 database_id,
1318 barrier_interval_ms,
1319 checkpoint_frequency,
1320 );
1321 let _ = sender.send(());
1322 }
1323
1324 for tx in recover_txs {
1325 let _ = tx.send(());
1326 }
1327
1328 let recovering_databases = self
1329 .checkpoint_control
1330 .recovering_databases()
1331 .map(|database| database.as_raw_id())
1332 .collect_vec();
1333 let running_databases = self
1334 .checkpoint_control
1335 .running_databases()
1336 .map(|database| database.as_raw_id())
1337 .collect_vec();
1338
1339 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
1340 EventRecovery::global_recovery_success(
1341 recovery_reason.clone(),
1342 duration as f32,
1343 running_databases,
1344 recovering_databases,
1345 ),
1346 )]);
1347
1348 self.env
1349 .notification_manager()
1350 .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
1351 self.env
1352 .notification_manager()
1353 .notify_compute_without_version(Operation::Update, Info::Recovery(Recovery {}));
1354 }
1355}