1use std::collections::{HashMap, HashSet};
16use std::mem::{replace, take};
17use std::task::{Context, Poll};
18
19use futures::FutureExt;
20use prometheus::{HistogramTimer, IntCounter};
21use risingwave_common::catalog::DatabaseId;
22use risingwave_common::id::JobId;
23use risingwave_meta_model::WorkerId;
24use risingwave_pb::meta::event_log::{Event, EventRecovery};
25use risingwave_pb::stream_service::BarrierCompleteResponse;
26use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
27use thiserror_ext::AsReport;
28use tracing::{info, warn};
29
30use crate::MetaResult;
31use crate::barrier::DatabaseRuntimeInfoSnapshot;
32use crate::barrier::checkpoint::control::DatabaseCheckpointControlStatus;
33use crate::barrier::checkpoint::creating_job::CreatingStreamingJobControl;
34use crate::barrier::checkpoint::{BarrierWorkerState, CheckpointControl};
35use crate::barrier::complete_task::BarrierCompleteOutput;
36use crate::barrier::edge_builder::FragmentEdgeBuilder;
37use crate::barrier::rpc::{ControlStreamManager, DatabaseInitialBarrierCollector};
38use crate::barrier::worker::{
39 RetryBackoffFuture, RetryBackoffStrategy, get_retry_backoff_strategy,
40};
41use crate::rpc::metrics::GLOBAL_META_METRICS;
42
43enum DatabaseRecoveringStage {
72 Resetting {
73 remaining_workers: HashSet<WorkerId>,
74 reset_resps: HashMap<WorkerId, ResetDatabaseResponse>,
75 reset_request_id: u32,
76 backoff_future: Option<RetryBackoffFuture>,
77 },
78 Initializing {
79 initial_barrier_collector: Box<DatabaseInitialBarrierCollector>,
80 },
81}
82
83pub(crate) struct DatabaseRecoveringState {
84 stage: DatabaseRecoveringStage,
85 next_reset_request_id: u32,
86 retry_backoff_strategy: RetryBackoffStrategy,
87 metrics: DatabaseRecoveryMetrics,
88}
89
90pub(super) enum RecoveringStateAction {
91 EnterInitializing(HashMap<WorkerId, ResetDatabaseResponse>),
92 EnterRunning,
93}
94
95struct DatabaseRecoveryMetrics {
96 recovery_failure_cnt: IntCounter,
97 recovery_timer: Option<HistogramTimer>,
98}
99
100impl DatabaseRecoveryMetrics {
101 fn new(database_id: DatabaseId) -> Self {
102 let database_id_str = format!("database {}", database_id);
103 Self {
104 recovery_failure_cnt: GLOBAL_META_METRICS
105 .recovery_failure_cnt
106 .with_label_values(&[database_id_str.as_str()]),
107 recovery_timer: Some(
108 GLOBAL_META_METRICS
109 .recovery_latency
110 .with_label_values(&[database_id_str.as_str()])
111 .start_timer(),
112 ),
113 }
114 }
115}
116
117const INITIAL_RESET_REQUEST_ID: u32 = 0;
118
119impl DatabaseRecoveringState {
120 pub(super) fn resetting(
121 database_id: DatabaseId,
122 control_stream_manager: &mut ControlStreamManager,
123 ) -> Self {
124 let mut retry_backoff_strategy = get_retry_backoff_strategy();
125 let backoff_future = retry_backoff_strategy.next().unwrap();
126 let metrics = DatabaseRecoveryMetrics::new(database_id);
127 metrics.recovery_failure_cnt.inc();
128
129 Self {
130 stage: DatabaseRecoveringStage::Resetting {
131 remaining_workers: control_stream_manager
132 .reset_database(database_id, INITIAL_RESET_REQUEST_ID),
133 reset_resps: Default::default(),
134 reset_request_id: INITIAL_RESET_REQUEST_ID,
135 backoff_future: Some(backoff_future),
136 },
137 next_reset_request_id: INITIAL_RESET_REQUEST_ID + 1,
138 retry_backoff_strategy,
139 metrics,
140 }
141 }
142
143 fn next_retry(&mut self) -> (RetryBackoffFuture, u32) {
144 let backoff_future = self
145 .retry_backoff_strategy
146 .next()
147 .expect("should not be empty");
148 let request_id = self.next_reset_request_id;
149 self.next_reset_request_id += 1;
150 (backoff_future, request_id)
151 }
152
153 pub(super) fn barrier_collected(
154 &mut self,
155 database_id: DatabaseId,
156 resp: BarrierCompleteResponse,
157 ) {
158 match &mut self.stage {
159 DatabaseRecoveringStage::Resetting { .. } => {
160 }
162 DatabaseRecoveringStage::Initializing {
163 initial_barrier_collector,
164 } => {
165 let worker_id = resp.worker_id;
166 initial_barrier_collector.collect_resp(resp);
167 info!(
168 ?database_id,
169 %worker_id,
170 remaining_workers = ?initial_barrier_collector,
171 "initializing database barrier collected"
172 );
173 }
174 }
175 }
176
177 pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
178 match &mut self.stage {
179 DatabaseRecoveringStage::Resetting {
180 remaining_workers, ..
181 } => {
182 remaining_workers.remove(&worker_id);
183 true
184 }
185 DatabaseRecoveringStage::Initializing {
186 initial_barrier_collector,
187 ..
188 } => initial_barrier_collector.is_valid_after_worker_err(worker_id),
189 }
190 }
191
192 pub(super) fn on_reset_database_resp(
193 &mut self,
194 worker_id: WorkerId,
195 resp: ResetDatabaseResponse,
196 ) {
197 match &mut self.stage {
198 DatabaseRecoveringStage::Resetting {
199 remaining_workers,
200 reset_resps,
201 reset_request_id,
202 ..
203 } => {
204 if resp.reset_request_id < *reset_request_id {
205 info!(
206 database_id = %resp.database_id,
207 %worker_id,
208 received_request_id = resp.reset_request_id,
209 ongoing_request_id = reset_request_id,
210 "ignore stale reset response"
211 );
212 } else {
213 assert_eq!(resp.reset_request_id, *reset_request_id);
214 assert!(remaining_workers.remove(&worker_id));
215 reset_resps
216 .try_insert(worker_id, resp)
217 .expect("non-duplicate");
218 }
219 }
220 DatabaseRecoveringStage::Initializing { .. } => {
221 unreachable!("all reset resp should have been received in Resetting")
222 }
223 }
224 }
225
226 pub(super) fn poll_next_event(&mut self, cx: &mut Context<'_>) -> Poll<RecoveringStateAction> {
227 match &mut self.stage {
228 DatabaseRecoveringStage::Resetting {
229 remaining_workers,
230 reset_resps,
231 backoff_future: backoff_future_option,
232 ..
233 } => {
234 let pass_backoff = if let Some(backoff_future) = backoff_future_option {
235 if backoff_future.poll_unpin(cx).is_ready() {
236 *backoff_future_option = None;
237 true
238 } else {
239 false
240 }
241 } else {
242 true
243 };
244 if pass_backoff && remaining_workers.is_empty() {
245 return Poll::Ready(RecoveringStateAction::EnterInitializing(take(
246 reset_resps,
247 )));
248 }
249 }
250 DatabaseRecoveringStage::Initializing {
251 initial_barrier_collector,
252 ..
253 } => {
254 if initial_barrier_collector.is_collected() {
255 return Poll::Ready(RecoveringStateAction::EnterRunning);
256 }
257 }
258 }
259 Poll::Pending
260 }
261
262 pub(super) fn database_state(
263 &self,
264 ) -> Option<(
265 &BarrierWorkerState,
266 &HashMap<JobId, CreatingStreamingJobControl>,
267 )> {
268 match &self.stage {
269 DatabaseRecoveringStage::Resetting { .. } => None,
270 DatabaseRecoveringStage::Initializing {
271 initial_barrier_collector,
272 ..
273 } => Some(initial_barrier_collector.database_state()),
274 }
275 }
276}
277
278pub(crate) struct DatabaseStatusAction<'a, A> {
279 control: &'a mut CheckpointControl,
280 database_id: DatabaseId,
281 pub(crate) action: A,
282}
283
284impl<A> DatabaseStatusAction<'_, A> {
285 pub(crate) fn database_id(&self) -> DatabaseId {
286 self.database_id
287 }
288}
289
290impl CheckpointControl {
291 pub(super) fn new_database_status_action<A>(
292 &mut self,
293 database_id: DatabaseId,
294 action: A,
295 ) -> DatabaseStatusAction<'_, A> {
296 DatabaseStatusAction {
297 control: self,
298 database_id,
299 action,
300 }
301 }
302}
303
304pub(crate) struct EnterReset;
305
306impl DatabaseStatusAction<'_, EnterReset> {
307 pub(crate) fn enter(
308 self,
309 barrier_complete_output: Option<BarrierCompleteOutput>,
310 control_stream_manager: &mut ControlStreamManager,
311 ) {
312 let event_log_manager_ref = self.control.env.event_log_manager_ref();
313 if let Some(output) = barrier_complete_output {
314 self.control.ack_completed(output);
315 }
316 let database_status = self
317 .control
318 .databases
319 .get_mut(&self.database_id)
320 .expect("should exist");
321 match database_status {
322 DatabaseCheckpointControlStatus::Running(_) => {
323 let reset_request_id = INITIAL_RESET_REQUEST_ID;
324 let remaining_workers =
325 control_stream_manager.reset_database(self.database_id, reset_request_id);
326 let metrics = DatabaseRecoveryMetrics::new(self.database_id);
327 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
328 EventRecovery::database_recovery_start(self.database_id.as_raw_id()),
329 )]);
330 *database_status =
331 DatabaseCheckpointControlStatus::Recovering(DatabaseRecoveringState {
332 stage: DatabaseRecoveringStage::Resetting {
333 remaining_workers,
334 reset_resps: Default::default(),
335 reset_request_id,
336 backoff_future: None,
337 },
338 next_reset_request_id: reset_request_id + 1,
339 retry_backoff_strategy: get_retry_backoff_strategy(),
340 metrics,
341 });
342 }
343 DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
344 DatabaseRecoveringStage::Resetting { .. } => {
345 unreachable!("should not enter resetting again")
346 }
347 DatabaseRecoveringStage::Initializing { .. } => {
348 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
349 EventRecovery::database_recovery_failure(self.database_id.as_raw_id()),
350 )]);
351 let (backoff_future, reset_request_id) = state.next_retry();
352 let remaining_workers =
353 control_stream_manager.reset_database(self.database_id, reset_request_id);
354 state.metrics.recovery_failure_cnt.inc();
355 state.stage = DatabaseRecoveringStage::Resetting {
356 remaining_workers,
357 reset_resps: Default::default(),
358 reset_request_id,
359 backoff_future: Some(backoff_future),
360 };
361 }
362 },
363 }
364 }
365}
366
367impl CheckpointControl {
368 pub(crate) fn on_report_failure(
369 &mut self,
370 database_id: DatabaseId,
371 control_stream_manager: &mut ControlStreamManager,
372 ) -> Option<DatabaseStatusAction<'_, EnterReset>> {
373 let database_status = self.databases.get_mut(&database_id).expect("should exist");
374 match database_status {
375 DatabaseCheckpointControlStatus::Running(_) => {
376 Some(self.new_database_status_action(database_id, EnterReset))
377 }
378 DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
379 DatabaseRecoveringStage::Resetting { .. } => {
380 None
382 }
383 DatabaseRecoveringStage::Initializing { .. } => {
384 warn!(database_id = %database_id, "");
385 let (backoff_future, reset_request_id) = state.next_retry();
386 let remaining_workers =
387 control_stream_manager.reset_database(database_id, reset_request_id);
388 state.metrics.recovery_failure_cnt.inc();
389 state.stage = DatabaseRecoveringStage::Resetting {
390 remaining_workers,
391 reset_resps: Default::default(),
392 reset_request_id,
393 backoff_future: Some(backoff_future),
394 };
395 None
396 }
397 },
398 }
399 }
400}
401
402pub(crate) struct EnterInitializing(pub(crate) HashMap<WorkerId, ResetDatabaseResponse>);
403
404impl DatabaseStatusAction<'_, EnterInitializing> {
405 pub(crate) fn enter(
406 self,
407 runtime_info: DatabaseRuntimeInfoSnapshot,
408 control_stream_manager: &mut ControlStreamManager,
409 ) {
410 let database_status = self
411 .control
412 .databases
413 .get_mut(&self.database_id)
414 .expect("should exist");
415 let status = match database_status {
416 DatabaseCheckpointControlStatus::Running(_) => {
417 unreachable!("should not enter initializing when running")
418 }
419 DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
420 DatabaseRecoveringStage::Initializing { .. } => {
421 unreachable!("can only enter initializing when resetting")
422 }
423 DatabaseRecoveringStage::Resetting { .. } => state,
424 },
425 };
426 let DatabaseRuntimeInfoSnapshot {
427 job_infos,
428 mut state_table_committed_epochs,
429 mut state_table_log_epochs,
430 mut mv_depended_subscriptions,
431 stream_actors,
432 fragment_relations,
433 mut source_splits,
434 mut background_jobs,
435 mut cdc_table_snapshot_split_assignment,
436 } = runtime_info;
437 let result: MetaResult<_> = try {
438 let mut builder = FragmentEdgeBuilder::new(
439 job_infos
440 .values()
441 .flat_map(|fragment_infos| fragment_infos.values()),
442 control_stream_manager,
443 );
444 builder.add_relations(&fragment_relations);
445 let mut edges = builder.build();
446 control_stream_manager.inject_database_initial_barrier(
447 self.database_id,
448 job_infos,
449 &mut state_table_committed_epochs,
450 &mut state_table_log_epochs,
451 &mut edges,
452 &stream_actors,
453 &mut source_splits,
454 &mut background_jobs,
455 &mut mv_depended_subscriptions,
456 false,
457 &self.control.hummock_version_stats,
458 &mut cdc_table_snapshot_split_assignment,
459 )?
460 };
461 match result {
462 Ok(initial_barrier_collector) => {
463 info!(node_to_collect = ?initial_barrier_collector, database_id = ?self.database_id, "database enter initializing");
464 status.stage = DatabaseRecoveringStage::Initializing {
465 initial_barrier_collector: initial_barrier_collector.into(),
466 };
467 }
468 Err(e) => {
469 warn!(
470 database_id = %self.database_id,
471 e = %e.as_report(),
472 "failed to inject initial barrier"
473 );
474 let (backoff_future, reset_request_id) = status.next_retry();
475 let remaining_workers =
476 control_stream_manager.reset_database(self.database_id, reset_request_id);
477 status.metrics.recovery_failure_cnt.inc();
478 status.stage = DatabaseRecoveringStage::Resetting {
479 remaining_workers,
480 reset_resps: Default::default(),
481 reset_request_id,
482 backoff_future: Some(backoff_future),
483 };
484 }
485 }
486 }
487
488 pub(crate) fn remove(self) {
489 self.control
490 .databases
491 .remove(&self.database_id)
492 .expect("should exist");
493 self.control
494 .env
495 .shared_actor_infos()
496 .remove_database(self.database_id);
497 }
498}
499
500pub(crate) struct EnterRunning;
501
502impl DatabaseStatusAction<'_, EnterRunning> {
503 pub(crate) fn enter(self) {
504 info!(database_id = ?self.database_id, "database enter running");
505 let event_log_manager_ref = self.control.env.event_log_manager_ref();
506 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
507 EventRecovery::database_recovery_success(self.database_id.as_raw_id()),
508 )]);
509 let database_status = self
510 .control
511 .databases
512 .get_mut(&self.database_id)
513 .expect("should exist");
514 match database_status {
515 DatabaseCheckpointControlStatus::Running(_) => {
516 unreachable!("should not enter running again")
517 }
518 DatabaseCheckpointControlStatus::Recovering(state) => {
519 let temp_place_holder = DatabaseRecoveringStage::Resetting {
520 remaining_workers: Default::default(),
521 reset_resps: Default::default(),
522 reset_request_id: 0,
523 backoff_future: None,
524 };
525 match state.metrics.recovery_timer.take() {
526 Some(recovery_timer) => {
527 recovery_timer.observe_duration();
528 }
529 _ => {
530 if cfg!(debug_assertions) {
531 panic!(
532 "take database {} recovery latency for twice",
533 self.database_id
534 )
535 } else {
536 warn!(database_id = %self.database_id,"failed to take recovery latency")
537 }
538 }
539 }
540 match replace(&mut state.stage, temp_place_holder) {
541 DatabaseRecoveringStage::Resetting { .. } => {
542 unreachable!("can only enter running during initializing")
543 }
544 DatabaseRecoveringStage::Initializing {
545 initial_barrier_collector,
546 } => {
547 *database_status = DatabaseCheckpointControlStatus::Running(
548 initial_barrier_collector.finish(),
549 );
550 }
551 }
552 }
553 }
554 }
555}