1use std::collections::{HashMap, HashSet};
16use std::mem::{replace, take};
17use std::task::{Context, Poll};
18
19use futures::FutureExt;
20use prometheus::{HistogramTimer, IntCounter};
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_meta_model::WorkerId;
23use risingwave_pb::meta::event_log::{Event, EventRecovery};
24use risingwave_pb::stream_service::BarrierCompleteResponse;
25use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
26use thiserror_ext::AsReport;
27use tracing::{info, warn};
28
29use crate::MetaResult;
30use crate::barrier::DatabaseRuntimeInfoSnapshot;
31use crate::barrier::checkpoint::control::DatabaseCheckpointControlStatus;
32use crate::barrier::checkpoint::creating_job::CreatingStreamingJobControl;
33use crate::barrier::checkpoint::{BarrierWorkerState, CheckpointControl};
34use crate::barrier::complete_task::BarrierCompleteOutput;
35use crate::barrier::edge_builder::FragmentEdgeBuilder;
36use crate::barrier::rpc::{ControlStreamManager, DatabaseInitialBarrierCollector};
37use crate::barrier::worker::{
38 RetryBackoffFuture, RetryBackoffStrategy, get_retry_backoff_strategy,
39};
40use crate::rpc::metrics::GLOBAL_META_METRICS;
41
42enum DatabaseRecoveringStage {
71 Resetting {
72 remaining_workers: HashSet<WorkerId>,
73 reset_resps: HashMap<WorkerId, ResetDatabaseResponse>,
74 reset_request_id: u32,
75 backoff_future: Option<RetryBackoffFuture>,
76 },
77 Initializing {
78 initial_barrier_collector: Box<DatabaseInitialBarrierCollector>,
79 },
80}
81
82pub(crate) struct DatabaseRecoveringState {
83 stage: DatabaseRecoveringStage,
84 next_reset_request_id: u32,
85 retry_backoff_strategy: RetryBackoffStrategy,
86 metrics: DatabaseRecoveryMetrics,
87}
88
89pub(super) enum RecoveringStateAction {
90 EnterInitializing(HashMap<WorkerId, ResetDatabaseResponse>),
91 EnterRunning,
92}
93
94struct DatabaseRecoveryMetrics {
95 recovery_failure_cnt: IntCounter,
96 recovery_timer: Option<HistogramTimer>,
97}
98
99impl DatabaseRecoveryMetrics {
100 fn new(database_id: DatabaseId) -> Self {
101 let database_id_str = format!("database {}", database_id.database_id);
102 Self {
103 recovery_failure_cnt: GLOBAL_META_METRICS
104 .recovery_failure_cnt
105 .with_label_values(&[database_id_str.as_str()]),
106 recovery_timer: Some(
107 GLOBAL_META_METRICS
108 .recovery_latency
109 .with_label_values(&[database_id_str.as_str()])
110 .start_timer(),
111 ),
112 }
113 }
114}
115
116const INITIAL_RESET_REQUEST_ID: u32 = 0;
117
118impl DatabaseRecoveringState {
119 pub(super) fn resetting(
120 database_id: DatabaseId,
121 control_stream_manager: &mut ControlStreamManager,
122 ) -> Self {
123 let mut retry_backoff_strategy = get_retry_backoff_strategy();
124 let backoff_future = retry_backoff_strategy.next().unwrap();
125 let metrics = DatabaseRecoveryMetrics::new(database_id);
126 metrics.recovery_failure_cnt.inc();
127
128 Self {
129 stage: DatabaseRecoveringStage::Resetting {
130 remaining_workers: control_stream_manager
131 .reset_database(database_id, INITIAL_RESET_REQUEST_ID),
132 reset_resps: Default::default(),
133 reset_request_id: INITIAL_RESET_REQUEST_ID,
134 backoff_future: Some(backoff_future),
135 },
136 next_reset_request_id: INITIAL_RESET_REQUEST_ID + 1,
137 retry_backoff_strategy,
138 metrics,
139 }
140 }
141
142 fn next_retry(&mut self) -> (RetryBackoffFuture, u32) {
143 let backoff_future = self
144 .retry_backoff_strategy
145 .next()
146 .expect("should not be empty");
147 let request_id = self.next_reset_request_id;
148 self.next_reset_request_id += 1;
149 (backoff_future, request_id)
150 }
151
152 pub(super) fn barrier_collected(
153 &mut self,
154 database_id: DatabaseId,
155 resp: BarrierCompleteResponse,
156 ) {
157 match &mut self.stage {
158 DatabaseRecoveringStage::Resetting { .. } => {
159 }
161 DatabaseRecoveringStage::Initializing {
162 initial_barrier_collector,
163 } => {
164 let worker_id = resp.worker_id as WorkerId;
165 initial_barrier_collector.collect_resp(resp);
166 info!(
167 ?database_id,
168 worker_id,
169 remaining_workers = ?initial_barrier_collector,
170 "initializing database barrier collected"
171 );
172 }
173 }
174 }
175
176 pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
177 match &mut self.stage {
178 DatabaseRecoveringStage::Resetting {
179 remaining_workers, ..
180 } => {
181 remaining_workers.remove(&worker_id);
182 true
183 }
184 DatabaseRecoveringStage::Initializing {
185 initial_barrier_collector,
186 ..
187 } => initial_barrier_collector.is_valid_after_worker_err(worker_id),
188 }
189 }
190
191 pub(super) fn on_reset_database_resp(
192 &mut self,
193 worker_id: WorkerId,
194 resp: ResetDatabaseResponse,
195 ) {
196 match &mut self.stage {
197 DatabaseRecoveringStage::Resetting {
198 remaining_workers,
199 reset_resps,
200 reset_request_id,
201 ..
202 } => {
203 if resp.reset_request_id < *reset_request_id {
204 info!(
205 database_id = resp.database_id,
206 worker_id,
207 received_request_id = resp.reset_request_id,
208 ongoing_request_id = reset_request_id,
209 "ignore stale reset response"
210 );
211 } else {
212 assert_eq!(resp.reset_request_id, *reset_request_id);
213 assert!(remaining_workers.remove(&worker_id));
214 reset_resps
215 .try_insert(worker_id, resp)
216 .expect("non-duplicate");
217 }
218 }
219 DatabaseRecoveringStage::Initializing { .. } => {
220 unreachable!("all reset resp should have been received in Resetting")
221 }
222 }
223 }
224
225 pub(super) fn poll_next_event(&mut self, cx: &mut Context<'_>) -> Poll<RecoveringStateAction> {
226 match &mut self.stage {
227 DatabaseRecoveringStage::Resetting {
228 remaining_workers,
229 reset_resps,
230 backoff_future: backoff_future_option,
231 ..
232 } => {
233 let pass_backoff = if let Some(backoff_future) = backoff_future_option {
234 if backoff_future.poll_unpin(cx).is_ready() {
235 *backoff_future_option = None;
236 true
237 } else {
238 false
239 }
240 } else {
241 true
242 };
243 if pass_backoff && remaining_workers.is_empty() {
244 return Poll::Ready(RecoveringStateAction::EnterInitializing(take(
245 reset_resps,
246 )));
247 }
248 }
249 DatabaseRecoveringStage::Initializing {
250 initial_barrier_collector,
251 ..
252 } => {
253 if initial_barrier_collector.is_collected() {
254 return Poll::Ready(RecoveringStateAction::EnterRunning);
255 }
256 }
257 }
258 Poll::Pending
259 }
260
261 pub(super) fn database_state(
262 &self,
263 ) -> Option<(
264 &BarrierWorkerState,
265 &HashMap<TableId, CreatingStreamingJobControl>,
266 )> {
267 match &self.stage {
268 DatabaseRecoveringStage::Resetting { .. } => None,
269 DatabaseRecoveringStage::Initializing {
270 initial_barrier_collector,
271 ..
272 } => Some(initial_barrier_collector.database_state()),
273 }
274 }
275}
276
277pub(crate) struct DatabaseStatusAction<'a, A> {
278 control: &'a mut CheckpointControl,
279 database_id: DatabaseId,
280 pub(crate) action: A,
281}
282
283impl<A> DatabaseStatusAction<'_, A> {
284 pub(crate) fn database_id(&self) -> DatabaseId {
285 self.database_id
286 }
287}
288
289impl CheckpointControl {
290 pub(super) fn new_database_status_action<A>(
291 &mut self,
292 database_id: DatabaseId,
293 action: A,
294 ) -> DatabaseStatusAction<'_, A> {
295 DatabaseStatusAction {
296 control: self,
297 database_id,
298 action,
299 }
300 }
301}
302
303pub(crate) struct EnterReset;
304
305impl DatabaseStatusAction<'_, EnterReset> {
306 pub(crate) fn enter(
307 self,
308 barrier_complete_output: Option<BarrierCompleteOutput>,
309 control_stream_manager: &mut ControlStreamManager,
310 ) {
311 let event_log_manager_ref = self.control.env.event_log_manager_ref();
312 if let Some(output) = barrier_complete_output {
313 self.control.ack_completed(output);
314 }
315 let database_status = self
316 .control
317 .databases
318 .get_mut(&self.database_id)
319 .expect("should exist");
320 match database_status {
321 DatabaseCheckpointControlStatus::Running(_) => {
322 let reset_request_id = INITIAL_RESET_REQUEST_ID;
323 let remaining_workers =
324 control_stream_manager.reset_database(self.database_id, reset_request_id);
325 let metrics = DatabaseRecoveryMetrics::new(self.database_id);
326 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
327 EventRecovery::database_recovery_start(self.database_id.database_id),
328 )]);
329 *database_status =
330 DatabaseCheckpointControlStatus::Recovering(DatabaseRecoveringState {
331 stage: DatabaseRecoveringStage::Resetting {
332 remaining_workers,
333 reset_resps: Default::default(),
334 reset_request_id,
335 backoff_future: None,
336 },
337 next_reset_request_id: reset_request_id + 1,
338 retry_backoff_strategy: get_retry_backoff_strategy(),
339 metrics,
340 });
341 }
342 DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
343 DatabaseRecoveringStage::Resetting { .. } => {
344 unreachable!("should not enter resetting again")
345 }
346 DatabaseRecoveringStage::Initializing { .. } => {
347 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
348 EventRecovery::database_recovery_failure(self.database_id.database_id),
349 )]);
350 let (backoff_future, reset_request_id) = state.next_retry();
351 let remaining_workers =
352 control_stream_manager.reset_database(self.database_id, reset_request_id);
353 state.metrics.recovery_failure_cnt.inc();
354 state.stage = DatabaseRecoveringStage::Resetting {
355 remaining_workers,
356 reset_resps: Default::default(),
357 reset_request_id,
358 backoff_future: Some(backoff_future),
359 };
360 }
361 },
362 }
363 }
364}
365
366impl CheckpointControl {
367 pub(crate) fn on_report_failure(
368 &mut self,
369 database_id: DatabaseId,
370 control_stream_manager: &mut ControlStreamManager,
371 ) -> Option<DatabaseStatusAction<'_, EnterReset>> {
372 let database_status = self.databases.get_mut(&database_id).expect("should exist");
373 match database_status {
374 DatabaseCheckpointControlStatus::Running(_) => {
375 Some(self.new_database_status_action(database_id, EnterReset))
376 }
377 DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
378 DatabaseRecoveringStage::Resetting { .. } => {
379 None
381 }
382 DatabaseRecoveringStage::Initializing { .. } => {
383 warn!(database_id = database_id.database_id, "");
384 let (backoff_future, reset_request_id) = state.next_retry();
385 let remaining_workers =
386 control_stream_manager.reset_database(database_id, reset_request_id);
387 state.metrics.recovery_failure_cnt.inc();
388 state.stage = DatabaseRecoveringStage::Resetting {
389 remaining_workers,
390 reset_resps: Default::default(),
391 reset_request_id,
392 backoff_future: Some(backoff_future),
393 };
394 None
395 }
396 },
397 }
398 }
399}
400
401pub(crate) struct EnterInitializing(pub(crate) HashMap<WorkerId, ResetDatabaseResponse>);
402
403impl DatabaseStatusAction<'_, EnterInitializing> {
404 pub(crate) fn control(&self) -> &CheckpointControl {
405 &*self.control
406 }
407
408 pub(crate) fn enter(
409 self,
410 runtime_info: DatabaseRuntimeInfoSnapshot,
411 control_stream_manager: &mut ControlStreamManager,
412 ) {
413 let database_status = self
414 .control
415 .databases
416 .get_mut(&self.database_id)
417 .expect("should exist");
418 let status = match database_status {
419 DatabaseCheckpointControlStatus::Running(_) => {
420 unreachable!("should not enter initializing when running")
421 }
422 DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
423 DatabaseRecoveringStage::Initializing { .. } => {
424 unreachable!("can only enter initializing when resetting")
425 }
426 DatabaseRecoveringStage::Resetting { .. } => state,
427 },
428 };
429 let DatabaseRuntimeInfoSnapshot {
430 job_infos,
431 mut state_table_committed_epochs,
432 mut state_table_log_epochs,
433 subscription_info,
434 stream_actors,
435 fragment_relations,
436 mut source_splits,
437 mut background_jobs,
438 } = runtime_info;
439 let result: MetaResult<_> = try {
440 let mut builder =
441 FragmentEdgeBuilder::new(job_infos.values().flatten(), control_stream_manager);
442 builder.add_relations(&fragment_relations);
443 let mut edges = builder.build();
444 control_stream_manager.inject_database_initial_barrier(
445 self.database_id,
446 job_infos,
447 &mut state_table_committed_epochs,
448 &mut state_table_log_epochs,
449 &mut edges,
450 &stream_actors,
451 &mut source_splits,
452 &mut background_jobs,
453 subscription_info,
454 false,
455 &self.control.hummock_version_stats,
456 )?
457 };
458 match result {
459 Ok(initial_barrier_collector) => {
460 info!(node_to_collect = ?initial_barrier_collector, database_id = ?self.database_id, "database enter initializing");
461 status.stage = DatabaseRecoveringStage::Initializing {
462 initial_barrier_collector: initial_barrier_collector.into(),
463 };
464 }
465 Err(e) => {
466 warn!(
467 database_id = self.database_id.database_id,
468 e = %e.as_report(),
469 "failed to inject initial barrier"
470 );
471 let (backoff_future, reset_request_id) = status.next_retry();
472 let remaining_workers =
473 control_stream_manager.reset_database(self.database_id, reset_request_id);
474 status.metrics.recovery_failure_cnt.inc();
475 status.stage = DatabaseRecoveringStage::Resetting {
476 remaining_workers,
477 reset_resps: Default::default(),
478 reset_request_id,
479 backoff_future: Some(backoff_future),
480 };
481 }
482 }
483 }
484
485 pub(crate) fn remove(self) {
486 self.control
487 .databases
488 .remove(&self.database_id)
489 .expect("should exist");
490 }
491}
492
493pub(crate) struct EnterRunning;
494
495impl DatabaseStatusAction<'_, EnterRunning> {
496 pub(crate) fn enter(self) {
497 info!(database_id = ?self.database_id, "database enter running");
498 let event_log_manager_ref = self.control.env.event_log_manager_ref();
499 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
500 EventRecovery::database_recovery_success(self.database_id.database_id),
501 )]);
502 let database_status = self
503 .control
504 .databases
505 .get_mut(&self.database_id)
506 .expect("should exist");
507 match database_status {
508 DatabaseCheckpointControlStatus::Running(_) => {
509 unreachable!("should not enter running again")
510 }
511 DatabaseCheckpointControlStatus::Recovering(state) => {
512 let temp_place_holder = DatabaseRecoveringStage::Resetting {
513 remaining_workers: Default::default(),
514 reset_resps: Default::default(),
515 reset_request_id: 0,
516 backoff_future: None,
517 };
518 match state.metrics.recovery_timer.take() {
519 Some(recovery_timer) => {
520 recovery_timer.observe_duration();
521 }
522 _ => {
523 if cfg!(debug_assertions) {
524 panic!(
525 "take database {} recovery latency for twice",
526 self.database_id
527 )
528 } else {
529 warn!(database_id = %self.database_id,"failed to take recovery latency")
530 }
531 }
532 }
533 match replace(&mut state.stage, temp_place_holder) {
534 DatabaseRecoveringStage::Resetting { .. } => {
535 unreachable!("can only enter running during initializing")
536 }
537 DatabaseRecoveringStage::Initializing {
538 initial_barrier_collector,
539 } => {
540 *database_status = DatabaseCheckpointControlStatus::Running(
541 initial_barrier_collector.finish(),
542 );
543 }
544 }
545 }
546 }
547 }
548}