1use std::collections::{HashMap, HashSet};
16use std::mem::{replace, take};
17use std::task::{Context, Poll};
18
19use futures::FutureExt;
20use prometheus::{HistogramTimer, IntCounter};
21use risingwave_common::catalog::DatabaseId;
22use risingwave_common::id::JobId;
23use risingwave_meta_model::WorkerId;
24use risingwave_pb::meta::event_log::{Event, EventRecovery};
25use risingwave_pb::stream_service::BarrierCompleteResponse;
26use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
27use thiserror_ext::AsReport;
28use tracing::{info, warn};
29
30use crate::barrier::DatabaseRuntimeInfoSnapshot;
31use crate::barrier::checkpoint::control::DatabaseCheckpointControlStatus;
32use crate::barrier::checkpoint::creating_job::CreatingStreamingJobControl;
33use crate::barrier::checkpoint::{BarrierWorkerState, CheckpointControl};
34use crate::barrier::complete_task::BarrierCompleteOutput;
35use crate::barrier::edge_builder::FragmentEdgeBuilder;
36use crate::barrier::rpc::{ControlStreamManager, DatabaseInitialBarrierCollector};
37use crate::barrier::worker::{
38 RetryBackoffFuture, RetryBackoffStrategy, get_retry_backoff_strategy,
39};
40use crate::rpc::metrics::GLOBAL_META_METRICS;
41use crate::{MetaError, MetaResult};
42
43enum DatabaseRecoveringStage {
72 Resetting {
73 remaining_workers: HashSet<WorkerId>,
74 reset_resps: HashMap<WorkerId, ResetDatabaseResponse>,
75 reset_request_id: u32,
76 backoff_future: Option<RetryBackoffFuture>,
77 },
78 Initializing {
79 initial_barrier_collector: Box<DatabaseInitialBarrierCollector>,
80 },
81}
82
83pub(crate) struct DatabaseRecoveringState {
84 stage: DatabaseRecoveringStage,
85 next_reset_request_id: u32,
86 retry_backoff_strategy: RetryBackoffStrategy,
87 metrics: DatabaseRecoveryMetrics,
88}
89
90pub(super) enum RecoveringStateAction {
91 EnterInitializing(HashMap<WorkerId, ResetDatabaseResponse>),
92 EnterRunning,
93}
94
95struct DatabaseRecoveryMetrics {
96 recovery_failure_cnt: IntCounter,
97 recovery_timer: Option<HistogramTimer>,
98}
99
100impl DatabaseRecoveryMetrics {
101 fn new(database_id: DatabaseId) -> Self {
102 let database_id_str = format!("database {}", database_id);
103 Self {
104 recovery_failure_cnt: GLOBAL_META_METRICS
105 .recovery_failure_cnt
106 .with_label_values(&[database_id_str.as_str()]),
107 recovery_timer: Some(
108 GLOBAL_META_METRICS
109 .recovery_latency
110 .with_label_values(&[database_id_str.as_str()])
111 .start_timer(),
112 ),
113 }
114 }
115}
116
117const INITIAL_RESET_REQUEST_ID: u32 = 0;
118
119impl DatabaseRecoveringState {
120 pub(super) fn resetting(
121 database_id: DatabaseId,
122 control_stream_manager: &mut ControlStreamManager,
123 ) -> Self {
124 let mut retry_backoff_strategy = get_retry_backoff_strategy();
125 let backoff_future = retry_backoff_strategy.next().unwrap();
126 let metrics = DatabaseRecoveryMetrics::new(database_id);
127 metrics.recovery_failure_cnt.inc();
128
129 Self {
130 stage: DatabaseRecoveringStage::Resetting {
131 remaining_workers: control_stream_manager
132 .reset_database(database_id, INITIAL_RESET_REQUEST_ID),
133 reset_resps: Default::default(),
134 reset_request_id: INITIAL_RESET_REQUEST_ID,
135 backoff_future: Some(backoff_future),
136 },
137 next_reset_request_id: INITIAL_RESET_REQUEST_ID + 1,
138 retry_backoff_strategy,
139 metrics,
140 }
141 }
142
143 fn next_retry(&mut self) -> (RetryBackoffFuture, u32) {
144 let backoff_future = self
145 .retry_backoff_strategy
146 .next()
147 .expect("should not be empty");
148 let request_id = self.next_reset_request_id;
149 self.next_reset_request_id += 1;
150 (backoff_future, request_id)
151 }
152
153 pub(super) fn barrier_collected(
154 &mut self,
155 database_id: DatabaseId,
156 resp: BarrierCompleteResponse,
157 ) {
158 match &mut self.stage {
159 DatabaseRecoveringStage::Resetting { .. } => {
160 }
162 DatabaseRecoveringStage::Initializing {
163 initial_barrier_collector,
164 } => {
165 let worker_id = resp.worker_id;
166 initial_barrier_collector.collect_resp(resp);
167 info!(
168 ?database_id,
169 %worker_id,
170 remaining_workers = ?initial_barrier_collector,
171 "initializing database barrier collected"
172 );
173 }
174 }
175 }
176
177 pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
178 match &mut self.stage {
179 DatabaseRecoveringStage::Resetting {
180 remaining_workers, ..
181 } => {
182 remaining_workers.remove(&worker_id);
183 true
184 }
185 DatabaseRecoveringStage::Initializing {
186 initial_barrier_collector,
187 ..
188 } => initial_barrier_collector.is_valid_after_worker_err(worker_id),
189 }
190 }
191
192 pub(super) fn on_reset_database_resp(
193 &mut self,
194 worker_id: WorkerId,
195 resp: ResetDatabaseResponse,
196 ) {
197 match &mut self.stage {
198 DatabaseRecoveringStage::Resetting {
199 remaining_workers,
200 reset_resps,
201 reset_request_id,
202 ..
203 } => {
204 if resp.reset_request_id < *reset_request_id {
205 info!(
206 database_id = %resp.database_id,
207 %worker_id,
208 received_request_id = resp.reset_request_id,
209 ongoing_request_id = reset_request_id,
210 "ignore stale reset response"
211 );
212 } else {
213 assert_eq!(resp.reset_request_id, *reset_request_id);
214 assert!(remaining_workers.remove(&worker_id));
215 reset_resps
216 .try_insert(worker_id, resp)
217 .expect("non-duplicate");
218 }
219 }
220 DatabaseRecoveringStage::Initializing { .. } => {
221 unreachable!("all reset resp should have been received in Resetting")
222 }
223 }
224 }
225
226 pub(super) fn poll_next_event(&mut self, cx: &mut Context<'_>) -> Poll<RecoveringStateAction> {
227 match &mut self.stage {
228 DatabaseRecoveringStage::Resetting {
229 remaining_workers,
230 reset_resps,
231 backoff_future: backoff_future_option,
232 ..
233 } => {
234 let pass_backoff = if let Some(backoff_future) = backoff_future_option {
235 if backoff_future.poll_unpin(cx).is_ready() {
236 *backoff_future_option = None;
237 true
238 } else {
239 false
240 }
241 } else {
242 true
243 };
244 if pass_backoff && remaining_workers.is_empty() {
245 return Poll::Ready(RecoveringStateAction::EnterInitializing(take(
246 reset_resps,
247 )));
248 }
249 }
250 DatabaseRecoveringStage::Initializing {
251 initial_barrier_collector,
252 ..
253 } => {
254 if initial_barrier_collector.is_collected() {
255 return Poll::Ready(RecoveringStateAction::EnterRunning);
256 }
257 }
258 }
259 Poll::Pending
260 }
261
262 pub(super) fn database_state(
263 &self,
264 ) -> Option<(
265 &BarrierWorkerState,
266 &HashMap<JobId, CreatingStreamingJobControl>,
267 )> {
268 match &self.stage {
269 DatabaseRecoveringStage::Resetting { .. } => None,
270 DatabaseRecoveringStage::Initializing {
271 initial_barrier_collector,
272 ..
273 } => Some(initial_barrier_collector.database_state()),
274 }
275 }
276}
277
278pub(crate) struct DatabaseStatusAction<'a, A> {
279 control: &'a mut CheckpointControl,
280 database_id: DatabaseId,
281 pub(crate) action: A,
282}
283
284impl<A> DatabaseStatusAction<'_, A> {
285 pub(crate) fn database_id(&self) -> DatabaseId {
286 self.database_id
287 }
288}
289
290impl CheckpointControl {
291 pub(super) fn new_database_status_action<A>(
292 &mut self,
293 database_id: DatabaseId,
294 action: A,
295 ) -> DatabaseStatusAction<'_, A> {
296 DatabaseStatusAction {
297 control: self,
298 database_id,
299 action,
300 }
301 }
302}
303
304pub(crate) struct EnterReset;
305
306impl DatabaseStatusAction<'_, EnterReset> {
307 pub(crate) fn enter(
308 self,
309 barrier_complete_output: Option<BarrierCompleteOutput>,
310 control_stream_manager: &mut ControlStreamManager,
311 ) {
312 let event_log_manager_ref = self.control.env.event_log_manager_ref();
313 if let Some(output) = barrier_complete_output {
314 self.control.ack_completed(output);
315 }
316 let database_status = self
317 .control
318 .databases
319 .get_mut(&self.database_id)
320 .expect("should exist");
321 match database_status {
322 DatabaseCheckpointControlStatus::Running(_) => {
323 let reset_request_id = INITIAL_RESET_REQUEST_ID;
324 let remaining_workers =
325 control_stream_manager.reset_database(self.database_id, reset_request_id);
326 let metrics = DatabaseRecoveryMetrics::new(self.database_id);
327 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
328 EventRecovery::database_recovery_start(self.database_id.as_raw_id()),
329 )]);
330 *database_status =
331 DatabaseCheckpointControlStatus::Recovering(DatabaseRecoveringState {
332 stage: DatabaseRecoveringStage::Resetting {
333 remaining_workers,
334 reset_resps: Default::default(),
335 reset_request_id,
336 backoff_future: None,
337 },
338 next_reset_request_id: reset_request_id + 1,
339 retry_backoff_strategy: get_retry_backoff_strategy(),
340 metrics,
341 });
342 }
343 DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
344 DatabaseRecoveringStage::Resetting { .. } => {
345 unreachable!("should not enter resetting again")
346 }
347 DatabaseRecoveringStage::Initializing { .. } => {
348 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
349 EventRecovery::database_recovery_failure(self.database_id.as_raw_id()),
350 )]);
351 let (backoff_future, reset_request_id) = state.next_retry();
352 let remaining_workers =
353 control_stream_manager.reset_database(self.database_id, reset_request_id);
354 state.metrics.recovery_failure_cnt.inc();
355 state.stage = DatabaseRecoveringStage::Resetting {
356 remaining_workers,
357 reset_resps: Default::default(),
358 reset_request_id,
359 backoff_future: Some(backoff_future),
360 };
361 }
362 },
363 }
364 }
365}
366
367impl CheckpointControl {
368 pub(crate) fn on_report_failure(
369 &mut self,
370 database_id: DatabaseId,
371 control_stream_manager: &mut ControlStreamManager,
372 ) -> Option<DatabaseStatusAction<'_, EnterReset>> {
373 let database_status = self.databases.get_mut(&database_id).expect("should exist");
374 match database_status {
375 DatabaseCheckpointControlStatus::Running(_) => {
376 Some(self.new_database_status_action(database_id, EnterReset))
377 }
378 DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
379 DatabaseRecoveringStage::Resetting { .. } => {
380 None
382 }
383 DatabaseRecoveringStage::Initializing { .. } => {
384 warn!(database_id = %database_id, "failed to initialize database");
385 let (backoff_future, reset_request_id) = state.next_retry();
386 let remaining_workers =
387 control_stream_manager.reset_database(database_id, reset_request_id);
388 state.metrics.recovery_failure_cnt.inc();
389 state.stage = DatabaseRecoveringStage::Resetting {
390 remaining_workers,
391 reset_resps: Default::default(),
392 reset_request_id,
393 backoff_future: Some(backoff_future),
394 };
395 None
396 }
397 },
398 }
399 }
400}
401
402pub(crate) struct EnterInitializing(pub(crate) HashMap<WorkerId, ResetDatabaseResponse>);
403
404impl DatabaseStatusAction<'_, EnterInitializing> {
405 pub(crate) fn enter(
406 self,
407 runtime_info: DatabaseRuntimeInfoSnapshot,
408 control_stream_manager: &mut ControlStreamManager,
409 ) {
410 let database_status = self
411 .control
412 .databases
413 .get_mut(&self.database_id)
414 .expect("should exist");
415 let status = match database_status {
416 DatabaseCheckpointControlStatus::Running(_) => {
417 unreachable!("should not enter initializing when running")
418 }
419 DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
420 DatabaseRecoveringStage::Initializing { .. } => {
421 unreachable!("can only enter initializing when resetting")
422 }
423 DatabaseRecoveringStage::Resetting { .. } => state,
424 },
425 };
426 let DatabaseRuntimeInfoSnapshot {
427 job_infos,
428 mut state_table_committed_epochs,
429 mut state_table_log_epochs,
430 mut mv_depended_subscriptions,
431 stream_actors,
432 fragment_relations,
433 mut source_splits,
434 mut background_jobs,
435 mut cdc_table_snapshot_splits,
436 } = runtime_info;
437 let result: MetaResult<_> = try {
438 let mut builder = FragmentEdgeBuilder::new(
439 job_infos
440 .values()
441 .flat_map(|fragment_infos| fragment_infos.values()),
442 control_stream_manager,
443 );
444 builder.add_relations(&fragment_relations);
445 let mut edges = builder.build();
446 control_stream_manager.inject_database_initial_barrier(
447 self.database_id,
448 job_infos,
449 &mut state_table_committed_epochs,
450 &mut state_table_log_epochs,
451 &fragment_relations,
452 &mut edges,
453 &stream_actors,
454 &mut source_splits,
455 &mut background_jobs,
456 &mut mv_depended_subscriptions,
457 false,
458 &self.control.hummock_version_stats,
459 &mut cdc_table_snapshot_splits,
460 )?
461 };
462 match result {
463 Ok(initial_barrier_collector) => {
464 info!(node_to_collect = ?initial_barrier_collector, database_id = ?self.database_id, "database enter initializing");
465 status.stage = DatabaseRecoveringStage::Initializing {
466 initial_barrier_collector: initial_barrier_collector.into(),
467 };
468 }
469 Err(e) => {
470 warn!(
471 database_id = %self.database_id,
472 e = %e.as_report(),
473 "failed to inject initial barrier"
474 );
475 let (backoff_future, reset_request_id) = status.next_retry();
476 let remaining_workers =
477 control_stream_manager.reset_database(self.database_id, reset_request_id);
478 status.metrics.recovery_failure_cnt.inc();
479 status.stage = DatabaseRecoveringStage::Resetting {
480 remaining_workers,
481 reset_resps: Default::default(),
482 reset_request_id,
483 backoff_future: Some(backoff_future),
484 };
485 }
486 }
487 }
488
489 pub(crate) fn fail_reload_runtime_info(self, e: MetaError) {
490 let database_status = self
491 .control
492 .databases
493 .get_mut(&self.database_id)
494 .expect("should exist");
495 let status = match database_status {
496 DatabaseCheckpointControlStatus::Running(_) => {
497 unreachable!("should not enter initializing when running")
498 }
499 DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
500 DatabaseRecoveringStage::Initializing { .. } => {
501 unreachable!("can only enter initializing when resetting")
502 }
503 DatabaseRecoveringStage::Resetting { .. } => state,
504 },
505 };
506 warn!(
507 database_id = %self.database_id,
508 e = %e.as_report(),
509 "failed to reload runtime info"
510 );
511 let (backoff_future, reset_request_id) = status.next_retry();
512 status.metrics.recovery_failure_cnt.inc();
513 status.stage = DatabaseRecoveringStage::Resetting {
514 remaining_workers: Default::default(),
515 reset_resps: Default::default(),
516 reset_request_id,
517 backoff_future: Some(backoff_future),
518 };
519 }
520
521 pub(crate) fn remove(self) {
522 self.control
523 .databases
524 .remove(&self.database_id)
525 .expect("should exist");
526 self.control
527 .env
528 .shared_actor_infos()
529 .remove_database(self.database_id);
530 }
531}
532
533pub(crate) struct EnterRunning;
534
535impl DatabaseStatusAction<'_, EnterRunning> {
536 pub(crate) fn enter(self) {
537 info!(database_id = ?self.database_id, "database enter running");
538 let event_log_manager_ref = self.control.env.event_log_manager_ref();
539 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
540 EventRecovery::database_recovery_success(self.database_id.as_raw_id()),
541 )]);
542 let database_status = self
543 .control
544 .databases
545 .get_mut(&self.database_id)
546 .expect("should exist");
547 match database_status {
548 DatabaseCheckpointControlStatus::Running(_) => {
549 unreachable!("should not enter running again")
550 }
551 DatabaseCheckpointControlStatus::Recovering(state) => {
552 let temp_place_holder = DatabaseRecoveringStage::Resetting {
553 remaining_workers: Default::default(),
554 reset_resps: Default::default(),
555 reset_request_id: 0,
556 backoff_future: None,
557 };
558 match state.metrics.recovery_timer.take() {
559 Some(recovery_timer) => {
560 recovery_timer.observe_duration();
561 }
562 _ => {
563 if cfg!(debug_assertions) {
564 panic!(
565 "take database {} recovery latency for twice",
566 self.database_id
567 )
568 } else {
569 warn!(database_id = %self.database_id,"failed to take recovery latency")
570 }
571 }
572 }
573 match replace(&mut state.stage, temp_place_holder) {
574 DatabaseRecoveringStage::Resetting { .. } => {
575 unreachable!("can only enter running during initializing")
576 }
577 DatabaseRecoveringStage::Initializing {
578 initial_barrier_collector,
579 } => {
580 *database_status = DatabaseCheckpointControlStatus::Running(
581 initial_barrier_collector.finish(),
582 );
583 }
584 }
585 }
586 }
587 }
588}