1use std::collections::{HashMap, HashSet};
16use std::mem::{replace, take};
17use std::task::{Context, Poll};
18
19use futures::FutureExt;
20use prometheus::{HistogramTimer, IntCounter};
21use risingwave_common::catalog::DatabaseId;
22use risingwave_meta_model::WorkerId;
23use risingwave_pb::id::PartialGraphId;
24use risingwave_pb::meta::event_log::{Event, EventRecovery};
25use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
26use thiserror_ext::AsReport;
27use tracing::{info, warn};
28
29use crate::barrier::DatabaseRuntimeInfoSnapshot;
30use crate::barrier::checkpoint::CheckpointControl;
31use crate::barrier::checkpoint::control::DatabaseCheckpointControlStatus;
32use crate::barrier::complete_task::BarrierCompleteOutput;
33use crate::barrier::context::recovery::RenderedDatabaseRuntimeInfo;
34use crate::barrier::partial_graph::PartialGraphManager;
35use crate::barrier::rpc::{DatabaseInitialBarrierCollector, to_partial_graph_id};
36use crate::barrier::worker::{
37 RetryBackoffFuture, RetryBackoffStrategy, get_retry_backoff_strategy,
38};
39use crate::rpc::metrics::GLOBAL_META_METRICS;
40use crate::{MetaError, MetaResult};
41
42enum DatabaseRecoveringStage {
71 Resetting {
72 resetting_partial_graphs: HashSet<PartialGraphId>,
73 reset_resps: Vec<(WorkerId, ResetPartialGraphResponse)>,
74 backoff_future: Option<RetryBackoffFuture>,
75 },
76 Initializing {
77 initial_barrier_collector: Box<DatabaseInitialBarrierCollector>,
78 },
79}
80
81pub(crate) struct DatabaseRecoveringState {
82 stage: DatabaseRecoveringStage,
83 retry_backoff_strategy: RetryBackoffStrategy,
84 metrics: DatabaseRecoveryMetrics,
85}
86
87pub(super) enum RecoveringStateAction {
88 EnterInitializing(Vec<(WorkerId, ResetPartialGraphResponse)>),
89 EnterRunning,
90}
91
92struct DatabaseRecoveryMetrics {
93 recovery_failure_cnt: IntCounter,
94 recovery_timer: Option<HistogramTimer>,
95}
96
97impl DatabaseRecoveryMetrics {
98 fn new(database_id: DatabaseId) -> Self {
99 let database_id_str = format!("database {}", database_id);
100 Self {
101 recovery_failure_cnt: GLOBAL_META_METRICS
102 .recovery_failure_cnt
103 .with_label_values(&[database_id_str.as_str()]),
104 recovery_timer: Some(
105 GLOBAL_META_METRICS
106 .recovery_latency
107 .with_label_values(&[database_id_str.as_str()])
108 .start_timer(),
109 ),
110 }
111 }
112}
113
114impl DatabaseRecoveringState {
115 pub(super) fn new_resetting(
116 database_id: DatabaseId,
117 resetting_partial_graphs: HashSet<PartialGraphId>,
118 ) -> Self {
119 let mut retry_backoff_strategy = get_retry_backoff_strategy();
120 let backoff_future = retry_backoff_strategy.next().unwrap();
121 let metrics = DatabaseRecoveryMetrics::new(database_id);
122 metrics.recovery_failure_cnt.inc();
123 Self {
124 stage: DatabaseRecoveringStage::Resetting {
125 resetting_partial_graphs,
126 reset_resps: vec![],
127 backoff_future: Some(backoff_future),
128 },
129 retry_backoff_strategy,
130 metrics,
131 }
132 }
133
134 fn next_retry(&mut self) -> RetryBackoffFuture {
135 self.retry_backoff_strategy
136 .next()
137 .expect("should not be empty")
138 }
139
140 pub(super) fn partial_graph_initialized(&mut self, partial_graph_id: PartialGraphId) {
141 match &mut self.stage {
142 DatabaseRecoveringStage::Resetting { .. } => {
143 unreachable!("should not have partial graph initialized when resetting")
144 }
145 DatabaseRecoveringStage::Initializing {
146 initial_barrier_collector,
147 } => {
148 initial_barrier_collector.partial_graph_initialized(partial_graph_id);
149 }
150 }
151 }
152
153 pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
154 match &mut self.stage {
155 DatabaseRecoveringStage::Resetting { .. } => true,
156 DatabaseRecoveringStage::Initializing {
157 initial_barrier_collector,
158 ..
159 } => initial_barrier_collector.is_valid_after_worker_err(worker_id),
160 }
161 }
162
163 pub(super) fn on_partial_graph_reset(
164 &mut self,
165 partial_graph_id: PartialGraphId,
166 new_reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
167 ) {
168 match &mut self.stage {
169 DatabaseRecoveringStage::Resetting {
170 resetting_partial_graphs,
171 reset_resps,
172 ..
173 } => {
174 assert!(resetting_partial_graphs.remove(&partial_graph_id));
175 reset_resps.extend(new_reset_resps);
176 }
177 DatabaseRecoveringStage::Initializing { .. } => {
178 unreachable!("all reset resp should have been received in Resetting")
179 }
180 }
181 }
182
183 pub(super) fn poll_next_event(&mut self, cx: &mut Context<'_>) -> Poll<RecoveringStateAction> {
184 match &mut self.stage {
185 DatabaseRecoveringStage::Resetting {
186 resetting_partial_graphs,
187 reset_resps,
188 backoff_future: backoff_future_option,
189 } => {
190 let pass_backoff = if let Some(backoff_future) = backoff_future_option {
191 if backoff_future.poll_unpin(cx).is_ready() {
192 *backoff_future_option = None;
193 true
194 } else {
195 false
196 }
197 } else {
198 true
199 };
200 if pass_backoff && resetting_partial_graphs.is_empty() {
201 return Poll::Ready(RecoveringStateAction::EnterInitializing(take(
202 reset_resps,
203 )));
204 }
205 }
206 DatabaseRecoveringStage::Initializing {
207 initial_barrier_collector,
208 ..
209 } => {
210 if initial_barrier_collector.is_collected() {
211 return Poll::Ready(RecoveringStateAction::EnterRunning);
212 }
213 }
214 }
215 Poll::Pending
216 }
217}
218
219pub(crate) struct DatabaseStatusAction<'a, A> {
220 control: &'a mut CheckpointControl,
221 database_id: DatabaseId,
222 pub(crate) action: A,
223}
224
225impl<A> DatabaseStatusAction<'_, A> {
226 pub(crate) fn database_id(&self) -> DatabaseId {
227 self.database_id
228 }
229}
230
231impl CheckpointControl {
232 pub(super) fn new_database_status_action<A>(
233 &mut self,
234 database_id: DatabaseId,
235 action: A,
236 ) -> DatabaseStatusAction<'_, A> {
237 DatabaseStatusAction {
238 control: self,
239 database_id,
240 action,
241 }
242 }
243}
244
245pub(crate) struct EnterReset;
246
247impl DatabaseStatusAction<'_, EnterReset> {
248 pub(crate) fn enter(
249 self,
250 barrier_complete_output: Option<BarrierCompleteOutput>,
251 partial_graph_manager: &mut PartialGraphManager,
252 ) {
253 let event_log_manager_ref = self.control.env.event_log_manager_ref();
254 if let Some(output) = barrier_complete_output {
255 self.control.ack_completed(partial_graph_manager, output);
256 }
257 let database_status = self
258 .control
259 .databases
260 .get_mut(&self.database_id)
261 .expect("should exist");
262 match database_status {
263 DatabaseCheckpointControlStatus::Running(database) => {
264 let mut resetting_partial_graphs = HashSet::new();
265 let new_reset_partial_graphs: HashSet<_> = database
266 .independent_checkpoint_job_controls
267 .drain()
268 .filter_map(|(job_id, job)| {
269 let partial_graph_id = to_partial_graph_id(self.database_id, Some(job_id));
270 if job.reset() {
271 resetting_partial_graphs.insert(partial_graph_id);
272 None
273 } else {
274 Some(partial_graph_id)
275 }
276 })
277 .chain([to_partial_graph_id(self.database_id, None)])
278 .collect();
279 resetting_partial_graphs
280 .iter()
281 .for_each(|partial_graph_id| {
282 partial_graph_manager.assert_resetting(*partial_graph_id)
283 });
284 partial_graph_manager
285 .reset_partial_graphs(new_reset_partial_graphs.iter().copied());
286 resetting_partial_graphs.extend(new_reset_partial_graphs);
287
288 let metrics = DatabaseRecoveryMetrics::new(self.database_id);
289 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
290 EventRecovery::database_recovery_start(self.database_id.as_raw_id()),
291 )]);
292 *database_status =
293 DatabaseCheckpointControlStatus::Recovering(DatabaseRecoveringState {
294 stage: DatabaseRecoveringStage::Resetting {
295 resetting_partial_graphs,
296 reset_resps: vec![],
297 backoff_future: None,
298 },
299 retry_backoff_strategy: get_retry_backoff_strategy(),
300 metrics,
301 });
302 }
303 DatabaseCheckpointControlStatus::Recovering(state) => match &mut state.stage {
304 DatabaseRecoveringStage::Resetting { .. } => {
305 unreachable!("should not enter resetting again")
306 }
307 DatabaseRecoveringStage::Initializing {
308 initial_barrier_collector,
309 } => {
310 let partial_graphs: HashSet<_> =
311 initial_barrier_collector.all_partial_graphs().collect();
312 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
313 EventRecovery::database_recovery_failure(self.database_id.as_raw_id()),
314 )]);
315 let backoff_future = state.next_retry();
316 partial_graph_manager.reset_partial_graphs(partial_graphs.iter().copied());
317 state.metrics.recovery_failure_cnt.inc();
318 state.stage = DatabaseRecoveringStage::Resetting {
319 resetting_partial_graphs: partial_graphs,
320 reset_resps: vec![],
321 backoff_future: Some(backoff_future),
322 };
323 }
324 },
325 }
326 }
327}
328
329impl CheckpointControl {
330 pub(crate) fn on_report_failure(
331 &mut self,
332 database_id: DatabaseId,
333 partial_graph_manager: &mut PartialGraphManager,
334 ) -> Option<DatabaseStatusAction<'_, EnterReset>> {
335 let database_status = self.databases.get_mut(&database_id).expect("should exist");
336 match database_status {
337 DatabaseCheckpointControlStatus::Running(_) => {
338 Some(self.new_database_status_action(database_id, EnterReset))
339 }
340 DatabaseCheckpointControlStatus::Recovering(state) => match &mut state.stage {
341 DatabaseRecoveringStage::Resetting { .. } => {
342 None
344 }
345 DatabaseRecoveringStage::Initializing {
346 initial_barrier_collector,
347 } => {
348 warn!(database_id = %database_id, "failed to initialize database");
349 let partial_graphs: HashSet<_> =
350 initial_barrier_collector.all_partial_graphs().collect();
351 let backoff_future = state.next_retry();
352 partial_graph_manager.reset_partial_graphs(partial_graphs.iter().copied());
353 state.metrics.recovery_failure_cnt.inc();
354 state.stage = DatabaseRecoveringStage::Resetting {
355 resetting_partial_graphs: partial_graphs,
356 reset_resps: vec![],
357 backoff_future: Some(backoff_future),
358 };
359 None
360 }
361 },
362 }
363 }
364}
365
366pub(crate) struct EnterInitializing(pub(crate) Vec<(WorkerId, ResetPartialGraphResponse)>);
367
368impl DatabaseStatusAction<'_, EnterInitializing> {
369 pub(crate) fn enter(
370 self,
371 runtime_info: DatabaseRuntimeInfoSnapshot,
372 rendered_info: RenderedDatabaseRuntimeInfo,
373 partial_graph_manager: &mut PartialGraphManager,
374 ) {
375 let database_status = self
376 .control
377 .databases
378 .get_mut(&self.database_id)
379 .expect("should exist");
380 let status = match database_status {
381 DatabaseCheckpointControlStatus::Running(_) => {
382 unreachable!("should not enter initializing when running")
383 }
384 DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
385 DatabaseRecoveringStage::Initializing { .. } => {
386 unreachable!("can only enter initializing when resetting")
387 }
388 DatabaseRecoveringStage::Resetting { .. } => state,
389 },
390 };
391 let DatabaseRuntimeInfoSnapshot {
392 recovery_context,
393 mut state_table_committed_epochs,
394 mut state_table_log_epochs,
395 mut mv_depended_subscriptions,
396 mut background_jobs,
397 mut cdc_table_snapshot_splits,
398 } = runtime_info;
399 let fragment_relations = &recovery_context.fragment_relations;
400 let RenderedDatabaseRuntimeInfo {
401 job_infos,
402 stream_actors,
403 mut source_splits,
404 batch_refresh,
405 } = rendered_info;
406 let mut recoverer = partial_graph_manager.start_recover();
407 let result: MetaResult<_> = try {
408 recoverer.inject_database_initial_barrier(
409 self.database_id,
410 job_infos,
411 &recovery_context.job_extra_info,
412 &mut state_table_committed_epochs,
413 &mut state_table_log_epochs,
414 fragment_relations,
415 &stream_actors,
416 &mut source_splits,
417 &mut background_jobs,
418 &mut mv_depended_subscriptions,
419 false,
420 &self.control.hummock_version_stats,
421 &mut cdc_table_snapshot_splits,
422 batch_refresh,
423 )?
424 };
425 match result {
426 Ok(database) => {
427 let initializing_partial_graphs = recoverer.all_initializing();
428 info!(?initializing_partial_graphs, database_id = ?self.database_id, "database enter initializing");
429 status.stage = DatabaseRecoveringStage::Initializing {
430 initial_barrier_collector: DatabaseInitialBarrierCollector {
431 database_id: self.database_id,
432 initializing_partial_graphs,
433 database,
434 }
435 .into(),
436 };
437 }
438 Err(e) => {
439 warn!(
440 database_id = %self.database_id,
441 e = %e.as_report(),
442 "failed to inject initial barrier"
443 );
444 let backoff_future = status.next_retry();
445 let resetting_partial_graphs = recoverer.failed();
446 status.metrics.recovery_failure_cnt.inc();
447 status.stage = DatabaseRecoveringStage::Resetting {
448 resetting_partial_graphs,
449 reset_resps: vec![],
450 backoff_future: Some(backoff_future),
451 };
452 }
453 }
454 }
455
456 pub(crate) fn fail_reload_runtime_info(self, e: MetaError) {
457 let database_status = self
458 .control
459 .databases
460 .get_mut(&self.database_id)
461 .expect("should exist");
462 let status = match database_status {
463 DatabaseCheckpointControlStatus::Running(_) => {
464 unreachable!("should not enter initializing when running")
465 }
466 DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
467 DatabaseRecoveringStage::Initializing { .. } => {
468 unreachable!("can only enter initializing when resetting")
469 }
470 DatabaseRecoveringStage::Resetting { .. } => state,
471 },
472 };
473 warn!(
474 database_id = %self.database_id,
475 e = %e.as_report(),
476 "failed to reload runtime info"
477 );
478 let backoff_future = status.next_retry();
479 status.metrics.recovery_failure_cnt.inc();
480 status.stage = DatabaseRecoveringStage::Resetting {
481 resetting_partial_graphs: HashSet::new(),
482 reset_resps: vec![],
483 backoff_future: Some(backoff_future),
484 };
485 }
486
487 pub(crate) fn remove(self) {
488 self.control
489 .databases
490 .remove(&self.database_id)
491 .expect("should exist");
492 self.control
493 .env
494 .shared_actor_infos()
495 .remove_database(self.database_id);
496 }
497}
498
499pub(crate) struct EnterRunning;
500
501impl DatabaseStatusAction<'_, EnterRunning> {
502 pub(crate) fn enter(self) {
503 info!(database_id = ?self.database_id, "database enter running");
504 let event_log_manager_ref = self.control.env.event_log_manager_ref();
505 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
506 EventRecovery::database_recovery_success(self.database_id.as_raw_id()),
507 )]);
508 let database_status = self
509 .control
510 .databases
511 .get_mut(&self.database_id)
512 .expect("should exist");
513 match database_status {
514 DatabaseCheckpointControlStatus::Running(_) => {
515 unreachable!("should not enter running again")
516 }
517 DatabaseCheckpointControlStatus::Recovering(state) => {
518 let temp_place_holder = DatabaseRecoveringStage::Resetting {
519 resetting_partial_graphs: Default::default(),
520 reset_resps: vec![],
521 backoff_future: None,
522 };
523 match state.metrics.recovery_timer.take() {
524 Some(recovery_timer) => {
525 recovery_timer.observe_duration();
526 }
527 _ => {
528 if cfg!(debug_assertions) {
529 panic!(
530 "take database {} recovery latency for twice",
531 self.database_id
532 )
533 } else {
534 warn!(database_id = %self.database_id,"failed to take recovery latency")
535 }
536 }
537 }
538 match replace(&mut state.stage, temp_place_holder) {
539 DatabaseRecoveringStage::Resetting { .. } => {
540 unreachable!("can only enter running during initializing")
541 }
542 DatabaseRecoveringStage::Initializing {
543 initial_barrier_collector,
544 } => {
545 *database_status = DatabaseCheckpointControlStatus::Running(
546 initial_barrier_collector.finish(),
547 );
548 }
549 }
550 }
551 }
552 }
553}