risingwave_meta/barrier/checkpoint/
recovery.rs1use std::collections::{HashMap, HashSet};
16use std::mem::{replace, take};
17use std::task::{Context, Poll};
18
19use futures::FutureExt;
20use prometheus::{HistogramTimer, IntCounter};
21use risingwave_common::catalog::DatabaseId;
22use risingwave_meta_model::WorkerId;
23use risingwave_pb::id::PartialGraphId;
24use risingwave_pb::meta::event_log::{Event, EventRecovery};
25use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
26use thiserror_ext::AsReport;
27use tracing::{info, warn};
28
29use crate::barrier::DatabaseRuntimeInfoSnapshot;
30use crate::barrier::checkpoint::CheckpointControl;
31use crate::barrier::checkpoint::control::DatabaseCheckpointControlStatus;
32use crate::barrier::complete_task::BarrierCompleteOutput;
33use crate::barrier::context::recovery::RenderedDatabaseRuntimeInfo;
34use crate::barrier::partial_graph::PartialGraphManager;
35use crate::barrier::rpc::{DatabaseInitialBarrierCollector, to_partial_graph_id};
36use crate::barrier::worker::{
37 RetryBackoffFuture, RetryBackoffStrategy, get_retry_backoff_strategy,
38};
39use crate::rpc::metrics::GLOBAL_META_METRICS;
40use crate::{MetaError, MetaResult};
41
42enum DatabaseRecoveringStage {
71 Resetting {
72 resetting_partial_graphs: HashSet<PartialGraphId>,
73 reset_resps: Vec<(WorkerId, ResetPartialGraphResponse)>,
74 backoff_future: Option<RetryBackoffFuture>,
75 },
76 Initializing {
77 initial_barrier_collector: Box<DatabaseInitialBarrierCollector>,
78 },
79}
80
81pub(crate) struct DatabaseRecoveringState {
82 stage: DatabaseRecoveringStage,
83 retry_backoff_strategy: RetryBackoffStrategy,
84 metrics: DatabaseRecoveryMetrics,
85}
86
87pub(super) enum RecoveringStateAction {
88 EnterInitializing(Vec<(WorkerId, ResetPartialGraphResponse)>),
89 EnterRunning,
90}
91
92struct DatabaseRecoveryMetrics {
93 recovery_failure_cnt: IntCounter,
94 recovery_timer: Option<HistogramTimer>,
95}
96
97impl DatabaseRecoveryMetrics {
98 fn new(database_id: DatabaseId) -> Self {
99 let database_id_str = format!("database {}", database_id);
100 Self {
101 recovery_failure_cnt: GLOBAL_META_METRICS
102 .recovery_failure_cnt
103 .with_label_values(&[database_id_str.as_str()]),
104 recovery_timer: Some(
105 GLOBAL_META_METRICS
106 .recovery_latency
107 .with_label_values(&[database_id_str.as_str()])
108 .start_timer(),
109 ),
110 }
111 }
112}
113
114impl DatabaseRecoveringState {
115 pub(super) fn new_resetting(
116 database_id: DatabaseId,
117 resetting_partial_graphs: HashSet<PartialGraphId>,
118 ) -> Self {
119 let mut retry_backoff_strategy = get_retry_backoff_strategy();
120 let backoff_future = retry_backoff_strategy.next().unwrap();
121 let metrics = DatabaseRecoveryMetrics::new(database_id);
122 metrics.recovery_failure_cnt.inc();
123 Self {
124 stage: DatabaseRecoveringStage::Resetting {
125 resetting_partial_graphs,
126 reset_resps: vec![],
127 backoff_future: Some(backoff_future),
128 },
129 retry_backoff_strategy,
130 metrics,
131 }
132 }
133
134 fn next_retry(&mut self) -> RetryBackoffFuture {
135 self.retry_backoff_strategy
136 .next()
137 .expect("should not be empty")
138 }
139
140 pub(super) fn partial_graph_initialized(&mut self, partial_graph_id: PartialGraphId) {
141 match &mut self.stage {
142 DatabaseRecoveringStage::Resetting { .. } => {
143 unreachable!("should not have partial graph initialized when resetting")
144 }
145 DatabaseRecoveringStage::Initializing {
146 initial_barrier_collector,
147 } => {
148 initial_barrier_collector.partial_graph_initialized(partial_graph_id);
149 }
150 }
151 }
152
153 pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
154 match &mut self.stage {
155 DatabaseRecoveringStage::Resetting { .. } => true,
156 DatabaseRecoveringStage::Initializing {
157 initial_barrier_collector,
158 ..
159 } => initial_barrier_collector.is_valid_after_worker_err(worker_id),
160 }
161 }
162
163 pub(super) fn on_partial_graph_reset(
164 &mut self,
165 partial_graph_id: PartialGraphId,
166 new_reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
167 ) {
168 match &mut self.stage {
169 DatabaseRecoveringStage::Resetting {
170 resetting_partial_graphs,
171 reset_resps,
172 ..
173 } => {
174 assert!(resetting_partial_graphs.remove(&partial_graph_id));
175 reset_resps.extend(new_reset_resps);
176 }
177 DatabaseRecoveringStage::Initializing { .. } => {
178 unreachable!("all reset resp should have been received in Resetting")
179 }
180 }
181 }
182
183 pub(super) fn poll_next_event(&mut self, cx: &mut Context<'_>) -> Poll<RecoveringStateAction> {
184 match &mut self.stage {
185 DatabaseRecoveringStage::Resetting {
186 resetting_partial_graphs,
187 reset_resps,
188 backoff_future: backoff_future_option,
189 } => {
190 let pass_backoff = if let Some(backoff_future) = backoff_future_option {
191 if backoff_future.poll_unpin(cx).is_ready() {
192 *backoff_future_option = None;
193 true
194 } else {
195 false
196 }
197 } else {
198 true
199 };
200 if pass_backoff && resetting_partial_graphs.is_empty() {
201 return Poll::Ready(RecoveringStateAction::EnterInitializing(take(
202 reset_resps,
203 )));
204 }
205 }
206 DatabaseRecoveringStage::Initializing {
207 initial_barrier_collector,
208 ..
209 } => {
210 if initial_barrier_collector.is_collected() {
211 return Poll::Ready(RecoveringStateAction::EnterRunning);
212 }
213 }
214 }
215 Poll::Pending
216 }
217}
218
219pub(crate) struct DatabaseStatusAction<'a, A> {
220 control: &'a mut CheckpointControl,
221 database_id: DatabaseId,
222 pub(crate) action: A,
223}
224
225impl<A> DatabaseStatusAction<'_, A> {
226 pub(crate) fn database_id(&self) -> DatabaseId {
227 self.database_id
228 }
229}
230
231impl CheckpointControl {
232 pub(super) fn new_database_status_action<A>(
233 &mut self,
234 database_id: DatabaseId,
235 action: A,
236 ) -> DatabaseStatusAction<'_, A> {
237 DatabaseStatusAction {
238 control: self,
239 database_id,
240 action,
241 }
242 }
243}
244
245pub(crate) struct EnterReset;
246
247impl DatabaseStatusAction<'_, EnterReset> {
248 pub(crate) fn enter(
249 self,
250 barrier_complete_output: Option<BarrierCompleteOutput>,
251 partial_graph_manager: &mut PartialGraphManager,
252 ) {
253 let event_log_manager_ref = self.control.env.event_log_manager_ref();
254 if let Some(output) = barrier_complete_output {
255 self.control.ack_completed(output);
256 }
257 let database_status = self
258 .control
259 .databases
260 .get_mut(&self.database_id)
261 .expect("should exist");
262 match database_status {
263 DatabaseCheckpointControlStatus::Running(database) => {
264 let mut resetting_partial_graphs = HashSet::new();
265 let new_reset_partial_graphs: HashSet<_> = database
266 .creating_streaming_job_controls
267 .drain()
268 .filter_map(|(job_id, job)| {
269 let partial_graph_id = to_partial_graph_id(self.database_id, Some(job_id));
270 if job.reset() {
271 resetting_partial_graphs.insert(partial_graph_id);
272 None
273 } else {
274 Some(partial_graph_id)
275 }
276 })
277 .chain([to_partial_graph_id(self.database_id, None)])
278 .collect();
279 resetting_partial_graphs
280 .iter()
281 .for_each(|partial_graph_id| {
282 partial_graph_manager.assert_resetting(*partial_graph_id)
283 });
284 partial_graph_manager
285 .reset_partial_graphs(new_reset_partial_graphs.iter().copied());
286 resetting_partial_graphs.extend(new_reset_partial_graphs);
287
288 let metrics = DatabaseRecoveryMetrics::new(self.database_id);
289 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
290 EventRecovery::database_recovery_start(self.database_id.as_raw_id()),
291 )]);
292 *database_status =
293 DatabaseCheckpointControlStatus::Recovering(DatabaseRecoveringState {
294 stage: DatabaseRecoveringStage::Resetting {
295 resetting_partial_graphs,
296 reset_resps: vec![],
297 backoff_future: None,
298 },
299 retry_backoff_strategy: get_retry_backoff_strategy(),
300 metrics,
301 });
302 }
303 DatabaseCheckpointControlStatus::Recovering(state) => match &mut state.stage {
304 DatabaseRecoveringStage::Resetting { .. } => {
305 unreachable!("should not enter resetting again")
306 }
307 DatabaseRecoveringStage::Initializing {
308 initial_barrier_collector,
309 } => {
310 let partial_graphs: HashSet<_> =
311 initial_barrier_collector.all_partial_graphs().collect();
312 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
313 EventRecovery::database_recovery_failure(self.database_id.as_raw_id()),
314 )]);
315 let backoff_future = state.next_retry();
316 partial_graph_manager.reset_partial_graphs(partial_graphs.iter().copied());
317 state.metrics.recovery_failure_cnt.inc();
318 state.stage = DatabaseRecoveringStage::Resetting {
319 resetting_partial_graphs: partial_graphs,
320 reset_resps: vec![],
321 backoff_future: Some(backoff_future),
322 };
323 }
324 },
325 }
326 }
327}
328
329impl CheckpointControl {
330 pub(crate) fn on_report_failure(
331 &mut self,
332 database_id: DatabaseId,
333 partial_graph_manager: &mut PartialGraphManager,
334 ) -> Option<DatabaseStatusAction<'_, EnterReset>> {
335 let database_status = self.databases.get_mut(&database_id).expect("should exist");
336 match database_status {
337 DatabaseCheckpointControlStatus::Running(_) => {
338 Some(self.new_database_status_action(database_id, EnterReset))
339 }
340 DatabaseCheckpointControlStatus::Recovering(state) => match &mut state.stage {
341 DatabaseRecoveringStage::Resetting { .. } => {
342 None
344 }
345 DatabaseRecoveringStage::Initializing {
346 initial_barrier_collector,
347 } => {
348 warn!(database_id = %database_id, "failed to initialize database");
349 let partial_graphs: HashSet<_> =
350 initial_barrier_collector.all_partial_graphs().collect();
351 let backoff_future = state.next_retry();
352 partial_graph_manager.reset_partial_graphs(partial_graphs.iter().copied());
353 state.metrics.recovery_failure_cnt.inc();
354 state.stage = DatabaseRecoveringStage::Resetting {
355 resetting_partial_graphs: partial_graphs,
356 reset_resps: vec![],
357 backoff_future: Some(backoff_future),
358 };
359 None
360 }
361 },
362 }
363 }
364}
365
366pub(crate) struct EnterInitializing(pub(crate) Vec<(WorkerId, ResetPartialGraphResponse)>);
367
368impl DatabaseStatusAction<'_, EnterInitializing> {
369 pub(crate) fn enter(
370 self,
371 runtime_info: DatabaseRuntimeInfoSnapshot,
372 rendered_info: RenderedDatabaseRuntimeInfo,
373 partial_graph_manager: &mut PartialGraphManager,
374 ) {
375 let database_status = self
376 .control
377 .databases
378 .get_mut(&self.database_id)
379 .expect("should exist");
380 let status = match database_status {
381 DatabaseCheckpointControlStatus::Running(_) => {
382 unreachable!("should not enter initializing when running")
383 }
384 DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
385 DatabaseRecoveringStage::Initializing { .. } => {
386 unreachable!("can only enter initializing when resetting")
387 }
388 DatabaseRecoveringStage::Resetting { .. } => state,
389 },
390 };
391 let DatabaseRuntimeInfoSnapshot {
392 recovery_context,
393 mut state_table_committed_epochs,
394 mut state_table_log_epochs,
395 mut mv_depended_subscriptions,
396 mut background_jobs,
397 mut cdc_table_snapshot_splits,
398 } = runtime_info;
399 let fragment_relations = &recovery_context.fragment_relations;
400 let RenderedDatabaseRuntimeInfo {
401 job_infos,
402 stream_actors,
403 mut source_splits,
404 } = rendered_info;
405 let mut recoverer = partial_graph_manager.start_recover();
406 let result: MetaResult<_> = try {
407 recoverer.inject_database_initial_barrier(
408 self.database_id,
409 job_infos,
410 &recovery_context.job_extra_info,
411 &mut state_table_committed_epochs,
412 &mut state_table_log_epochs,
413 fragment_relations,
414 &stream_actors,
415 &mut source_splits,
416 &mut background_jobs,
417 &mut mv_depended_subscriptions,
418 false,
419 &self.control.hummock_version_stats,
420 &mut cdc_table_snapshot_splits,
421 )?
422 };
423 match result {
424 Ok(database) => {
425 let initializing_partial_graphs = recoverer.all_initializing();
426 info!(?initializing_partial_graphs, database_id = ?self.database_id, "database enter initializing");
427 status.stage = DatabaseRecoveringStage::Initializing {
428 initial_barrier_collector: DatabaseInitialBarrierCollector {
429 database_id: self.database_id,
430 initializing_partial_graphs,
431 database,
432 }
433 .into(),
434 };
435 }
436 Err(e) => {
437 warn!(
438 database_id = %self.database_id,
439 e = %e.as_report(),
440 "failed to inject initial barrier"
441 );
442 let backoff_future = status.next_retry();
443 let resetting_partial_graphs = recoverer.failed();
444 status.metrics.recovery_failure_cnt.inc();
445 status.stage = DatabaseRecoveringStage::Resetting {
446 resetting_partial_graphs,
447 reset_resps: vec![],
448 backoff_future: Some(backoff_future),
449 };
450 }
451 }
452 }
453
454 pub(crate) fn fail_reload_runtime_info(self, e: MetaError) {
455 let database_status = self
456 .control
457 .databases
458 .get_mut(&self.database_id)
459 .expect("should exist");
460 let status = match database_status {
461 DatabaseCheckpointControlStatus::Running(_) => {
462 unreachable!("should not enter initializing when running")
463 }
464 DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
465 DatabaseRecoveringStage::Initializing { .. } => {
466 unreachable!("can only enter initializing when resetting")
467 }
468 DatabaseRecoveringStage::Resetting { .. } => state,
469 },
470 };
471 warn!(
472 database_id = %self.database_id,
473 e = %e.as_report(),
474 "failed to reload runtime info"
475 );
476 let backoff_future = status.next_retry();
477 status.metrics.recovery_failure_cnt.inc();
478 status.stage = DatabaseRecoveringStage::Resetting {
479 resetting_partial_graphs: HashSet::new(),
480 reset_resps: vec![],
481 backoff_future: Some(backoff_future),
482 };
483 }
484
485 pub(crate) fn remove(self) {
486 self.control
487 .databases
488 .remove(&self.database_id)
489 .expect("should exist");
490 self.control
491 .env
492 .shared_actor_infos()
493 .remove_database(self.database_id);
494 }
495}
496
497pub(crate) struct EnterRunning;
498
499impl DatabaseStatusAction<'_, EnterRunning> {
500 pub(crate) fn enter(self) {
501 info!(database_id = ?self.database_id, "database enter running");
502 let event_log_manager_ref = self.control.env.event_log_manager_ref();
503 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
504 EventRecovery::database_recovery_success(self.database_id.as_raw_id()),
505 )]);
506 let database_status = self
507 .control
508 .databases
509 .get_mut(&self.database_id)
510 .expect("should exist");
511 match database_status {
512 DatabaseCheckpointControlStatus::Running(_) => {
513 unreachable!("should not enter running again")
514 }
515 DatabaseCheckpointControlStatus::Recovering(state) => {
516 let temp_place_holder = DatabaseRecoveringStage::Resetting {
517 resetting_partial_graphs: Default::default(),
518 reset_resps: vec![],
519 backoff_future: None,
520 };
521 match state.metrics.recovery_timer.take() {
522 Some(recovery_timer) => {
523 recovery_timer.observe_duration();
524 }
525 _ => {
526 if cfg!(debug_assertions) {
527 panic!(
528 "take database {} recovery latency for twice",
529 self.database_id
530 )
531 } else {
532 warn!(database_id = %self.database_id,"failed to take recovery latency")
533 }
534 }
535 }
536 match replace(&mut state.stage, temp_place_holder) {
537 DatabaseRecoveringStage::Resetting { .. } => {
538 unreachable!("can only enter running during initializing")
539 }
540 DatabaseRecoveringStage::Initializing {
541 initial_barrier_collector,
542 } => {
543 *database_status = DatabaseCheckpointControlStatus::Running(
544 initial_barrier_collector.finish(),
545 );
546 }
547 }
548 }
549 }
550 }
551}