1use std::collections::{HashMap, HashSet};
16use std::mem::{replace, take};
17use std::sync::LazyLock;
18use std::task::{Context, Poll};
19
20use futures::FutureExt;
21use prometheus::{HistogramTimer, IntCounter};
22use risingwave_common::catalog::{DatabaseId, TableId};
23use risingwave_meta_model::WorkerId;
24use risingwave_pb::meta::event_log::{Event, EventRecovery};
25use risingwave_pb::stream_service::BarrierCompleteResponse;
26use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
27use thiserror_ext::AsReport;
28use tracing::{info, warn};
29
30use crate::MetaResult;
31use crate::barrier::DatabaseRuntimeInfoSnapshot;
32use crate::barrier::checkpoint::control::DatabaseCheckpointControlStatus;
33use crate::barrier::checkpoint::creating_job::CreatingStreamingJobControl;
34use crate::barrier::checkpoint::{BarrierWorkerState, CheckpointControl};
35use crate::barrier::complete_task::BarrierCompleteOutput;
36use crate::barrier::edge_builder::FragmentEdgeBuilder;
37use crate::barrier::rpc::{ControlStreamManager, DatabaseInitialBarrierCollector};
38use crate::barrier::worker::{
39 RetryBackoffFuture, RetryBackoffStrategy, get_retry_backoff_strategy,
40};
41use crate::rpc::metrics::GLOBAL_META_METRICS;
42
43enum DatabaseRecoveringStage {
72 Resetting {
73 remaining_workers: HashSet<WorkerId>,
74 reset_resps: HashMap<WorkerId, ResetDatabaseResponse>,
75 reset_request_id: u32,
76 backoff_future: Option<RetryBackoffFuture>,
77 },
78 Initializing {
79 initial_barrier_collector: Box<DatabaseInitialBarrierCollector>,
80 },
81}
82
83pub(crate) struct DatabaseRecoveringState {
84 stage: DatabaseRecoveringStage,
85 next_reset_request_id: u32,
86 retry_backoff_strategy: RetryBackoffStrategy,
87 metrics: DatabaseRecoveryMetrics,
88}
89
90pub(super) enum RecoveringStateAction {
91 EnterInitializing(HashMap<WorkerId, ResetDatabaseResponse>),
92 EnterRunning,
93}
94
95struct DatabaseRecoveryMetrics {
96 recovery_failure_cnt: IntCounter,
97 recovery_timer: Option<HistogramTimer>,
98}
99
100impl DatabaseRecoveryMetrics {
101 fn new(database_id: DatabaseId) -> Self {
102 let database_id_str = format!("database {}", database_id.database_id);
103 Self {
104 recovery_failure_cnt: GLOBAL_META_METRICS
105 .recovery_failure_cnt
106 .with_label_values(&[database_id_str.as_str()]),
107 recovery_timer: Some(
108 GLOBAL_META_METRICS
109 .recovery_latency
110 .with_label_values(&[database_id_str.as_str()])
111 .start_timer(),
112 ),
113 }
114 }
115}
116
117const INITIAL_RESET_REQUEST_ID: u32 = 0;
118
119impl DatabaseRecoveringState {
120 pub(super) fn resetting(
121 database_id: DatabaseId,
122 control_stream_manager: &mut ControlStreamManager,
123 ) -> Self {
124 let mut retry_backoff_strategy = get_retry_backoff_strategy();
125 let backoff_future = retry_backoff_strategy.next().unwrap();
126 let metrics = DatabaseRecoveryMetrics::new(database_id);
127 metrics.recovery_failure_cnt.inc();
128
129 Self {
130 stage: DatabaseRecoveringStage::Resetting {
131 remaining_workers: control_stream_manager
132 .reset_database(database_id, INITIAL_RESET_REQUEST_ID),
133 reset_resps: Default::default(),
134 reset_request_id: INITIAL_RESET_REQUEST_ID,
135 backoff_future: Some(backoff_future),
136 },
137 next_reset_request_id: INITIAL_RESET_REQUEST_ID + 1,
138 retry_backoff_strategy,
139 metrics,
140 }
141 }
142
143 fn next_retry(&mut self) -> (RetryBackoffFuture, u32) {
144 let backoff_future = self
145 .retry_backoff_strategy
146 .next()
147 .expect("should not be empty");
148 let request_id = self.next_reset_request_id;
149 self.next_reset_request_id += 1;
150 (backoff_future, request_id)
151 }
152
153 pub(super) fn barrier_collected(
154 &mut self,
155 database_id: DatabaseId,
156 resp: BarrierCompleteResponse,
157 ) {
158 match &mut self.stage {
159 DatabaseRecoveringStage::Resetting { .. } => {
160 }
162 DatabaseRecoveringStage::Initializing {
163 initial_barrier_collector,
164 } => {
165 let worker_id = resp.worker_id as WorkerId;
166 initial_barrier_collector.collect_resp(resp);
167 info!(
168 ?database_id,
169 worker_id,
170 remaining_workers = ?initial_barrier_collector,
171 "initializing database barrier collected"
172 );
173 }
174 }
175 }
176
177 pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
178 match &mut self.stage {
179 DatabaseRecoveringStage::Resetting {
180 remaining_workers, ..
181 } => {
182 remaining_workers.remove(&worker_id);
183 true
184 }
185 DatabaseRecoveringStage::Initializing {
186 initial_barrier_collector,
187 ..
188 } => initial_barrier_collector.is_valid_after_worker_err(worker_id),
189 }
190 }
191
192 pub(super) fn on_reset_database_resp(
193 &mut self,
194 worker_id: WorkerId,
195 resp: ResetDatabaseResponse,
196 ) {
197 match &mut self.stage {
198 DatabaseRecoveringStage::Resetting {
199 remaining_workers,
200 reset_resps,
201 reset_request_id,
202 ..
203 } => {
204 if resp.reset_request_id < *reset_request_id {
205 info!(
206 database_id = resp.database_id,
207 worker_id,
208 received_request_id = resp.reset_request_id,
209 ongoing_request_id = reset_request_id,
210 "ignore stale reset response"
211 );
212 } else {
213 assert_eq!(resp.reset_request_id, *reset_request_id);
214 assert!(remaining_workers.remove(&worker_id));
215 reset_resps
216 .try_insert(worker_id, resp)
217 .expect("non-duplicate");
218 }
219 }
220 DatabaseRecoveringStage::Initializing { .. } => {
221 unreachable!("all reset resp should have been received in Resetting")
222 }
223 }
224 }
225
226 pub(super) fn poll_next_event(&mut self, cx: &mut Context<'_>) -> Poll<RecoveringStateAction> {
227 match &mut self.stage {
228 DatabaseRecoveringStage::Resetting {
229 remaining_workers,
230 reset_resps,
231 backoff_future: backoff_future_option,
232 ..
233 } => {
234 let pass_backoff = if let Some(backoff_future) = backoff_future_option {
235 if backoff_future.poll_unpin(cx).is_ready() {
236 *backoff_future_option = None;
237 true
238 } else {
239 false
240 }
241 } else {
242 true
243 };
244 if pass_backoff && remaining_workers.is_empty() {
245 return Poll::Ready(RecoveringStateAction::EnterInitializing(take(
246 reset_resps,
247 )));
248 }
249 }
250 DatabaseRecoveringStage::Initializing {
251 initial_barrier_collector,
252 ..
253 } => {
254 if initial_barrier_collector.is_collected() {
255 return Poll::Ready(RecoveringStateAction::EnterRunning);
256 }
257 }
258 }
259 Poll::Pending
260 }
261
262 pub(super) fn database_state(
263 &self,
264 ) -> Option<(
265 &BarrierWorkerState,
266 &HashMap<TableId, CreatingStreamingJobControl>,
267 )> {
268 match &self.stage {
269 DatabaseRecoveringStage::Resetting { .. } => None,
270 DatabaseRecoveringStage::Initializing {
271 initial_barrier_collector,
272 ..
273 } => Some((initial_barrier_collector.database_state(), {
274 static EMPTY_CREATING_JOBS: LazyLock<
275 HashMap<TableId, CreatingStreamingJobControl>,
276 > = LazyLock::new(HashMap::new);
277 &EMPTY_CREATING_JOBS
278 })),
279 }
280 }
281}
282
283pub(crate) struct DatabaseStatusAction<'a, A> {
284 control: &'a mut CheckpointControl,
285 database_id: DatabaseId,
286 pub(crate) action: A,
287}
288
289impl<A> DatabaseStatusAction<'_, A> {
290 pub(crate) fn database_id(&self) -> DatabaseId {
291 self.database_id
292 }
293}
294
295impl CheckpointControl {
296 pub(super) fn new_database_status_action<A>(
297 &mut self,
298 database_id: DatabaseId,
299 action: A,
300 ) -> DatabaseStatusAction<'_, A> {
301 DatabaseStatusAction {
302 control: self,
303 database_id,
304 action,
305 }
306 }
307}
308
309pub(crate) struct EnterReset;
310
311impl DatabaseStatusAction<'_, EnterReset> {
312 pub(crate) fn enter(
313 self,
314 barrier_complete_output: Option<BarrierCompleteOutput>,
315 control_stream_manager: &mut ControlStreamManager,
316 ) {
317 let event_log_manager_ref = self.control.env.event_log_manager_ref();
318 if let Some(output) = barrier_complete_output {
319 self.control.ack_completed(output);
320 }
321 let database_status = self
322 .control
323 .databases
324 .get_mut(&self.database_id)
325 .expect("should exist");
326 match database_status {
327 DatabaseCheckpointControlStatus::Running(_) => {
328 let reset_request_id = INITIAL_RESET_REQUEST_ID;
329 let remaining_workers =
330 control_stream_manager.reset_database(self.database_id, reset_request_id);
331 let metrics = DatabaseRecoveryMetrics::new(self.database_id);
332 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
333 EventRecovery::database_recovery_start(self.database_id.database_id),
334 )]);
335 *database_status =
336 DatabaseCheckpointControlStatus::Recovering(DatabaseRecoveringState {
337 stage: DatabaseRecoveringStage::Resetting {
338 remaining_workers,
339 reset_resps: Default::default(),
340 reset_request_id,
341 backoff_future: None,
342 },
343 next_reset_request_id: reset_request_id + 1,
344 retry_backoff_strategy: get_retry_backoff_strategy(),
345 metrics,
346 });
347 }
348 DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
349 DatabaseRecoveringStage::Resetting { .. } => {
350 unreachable!("should not enter resetting again")
351 }
352 DatabaseRecoveringStage::Initializing { .. } => {
353 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
354 EventRecovery::database_recovery_failure(self.database_id.database_id),
355 )]);
356 let (backoff_future, reset_request_id) = state.next_retry();
357 let remaining_workers =
358 control_stream_manager.reset_database(self.database_id, reset_request_id);
359 state.metrics.recovery_failure_cnt.inc();
360 state.stage = DatabaseRecoveringStage::Resetting {
361 remaining_workers,
362 reset_resps: Default::default(),
363 reset_request_id,
364 backoff_future: Some(backoff_future),
365 };
366 }
367 },
368 }
369 }
370}
371
372impl CheckpointControl {
373 pub(crate) fn on_report_failure(
374 &mut self,
375 database_id: DatabaseId,
376 control_stream_manager: &mut ControlStreamManager,
377 ) -> Option<DatabaseStatusAction<'_, EnterReset>> {
378 let database_status = self.databases.get_mut(&database_id).expect("should exist");
379 match database_status {
380 DatabaseCheckpointControlStatus::Running(_) => {
381 Some(self.new_database_status_action(database_id, EnterReset))
382 }
383 DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
384 DatabaseRecoveringStage::Resetting { .. } => {
385 None
387 }
388 DatabaseRecoveringStage::Initializing { .. } => {
389 warn!(database_id = database_id.database_id, "");
390 let (backoff_future, reset_request_id) = state.next_retry();
391 let remaining_workers =
392 control_stream_manager.reset_database(database_id, reset_request_id);
393 state.metrics.recovery_failure_cnt.inc();
394 state.stage = DatabaseRecoveringStage::Resetting {
395 remaining_workers,
396 reset_resps: Default::default(),
397 reset_request_id,
398 backoff_future: Some(backoff_future),
399 };
400 None
401 }
402 },
403 }
404 }
405}
406
407pub(crate) struct EnterInitializing(pub(crate) HashMap<WorkerId, ResetDatabaseResponse>);
408
409impl DatabaseStatusAction<'_, EnterInitializing> {
410 pub(crate) fn control(&self) -> &CheckpointControl {
411 &*self.control
412 }
413
414 pub(crate) fn enter(
415 self,
416 runtime_info: DatabaseRuntimeInfoSnapshot,
417 control_stream_manager: &mut ControlStreamManager,
418 ) {
419 let database_status = self
420 .control
421 .databases
422 .get_mut(&self.database_id)
423 .expect("should exist");
424 let status = match database_status {
425 DatabaseCheckpointControlStatus::Running(_) => {
426 unreachable!("should not enter initializing when running")
427 }
428 DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
429 DatabaseRecoveringStage::Initializing { .. } => {
430 unreachable!("can only enter initializing when resetting")
431 }
432 DatabaseRecoveringStage::Resetting { .. } => state,
433 },
434 };
435 let DatabaseRuntimeInfoSnapshot {
436 database_fragment_info,
437 mut state_table_committed_epochs,
438 subscription_info,
439 stream_actors,
440 fragment_relations,
441 mut source_splits,
442 mut background_jobs,
443 } = runtime_info;
444 let result: MetaResult<_> = try {
445 let mut builder = FragmentEdgeBuilder::new(database_fragment_info.fragment_infos());
446 builder.add_relations(&fragment_relations);
447 let mut edges = builder.build();
448 control_stream_manager.inject_database_initial_barrier(
449 self.database_id,
450 database_fragment_info,
451 &mut state_table_committed_epochs,
452 &mut edges,
453 &stream_actors,
454 &mut source_splits,
455 &mut background_jobs,
456 subscription_info,
457 false,
458 &self.control.hummock_version_stats,
459 )?
460 };
461 match result {
462 Ok(initial_barrier_collector) => {
463 info!(node_to_collect = ?initial_barrier_collector, database_id = ?self.database_id, "database enter initializing");
464 status.stage = DatabaseRecoveringStage::Initializing {
465 initial_barrier_collector: initial_barrier_collector.into(),
466 };
467 }
468 Err(e) => {
469 warn!(
470 database_id = self.database_id.database_id,
471 e = %e.as_report(),
472 "failed to inject initial barrier"
473 );
474 let (backoff_future, reset_request_id) = status.next_retry();
475 let remaining_workers =
476 control_stream_manager.reset_database(self.database_id, reset_request_id);
477 status.metrics.recovery_failure_cnt.inc();
478 status.stage = DatabaseRecoveringStage::Resetting {
479 remaining_workers,
480 reset_resps: Default::default(),
481 reset_request_id,
482 backoff_future: Some(backoff_future),
483 };
484 }
485 }
486 }
487
488 pub(crate) fn remove(self) {
489 self.control
490 .databases
491 .remove(&self.database_id)
492 .expect("should exist");
493 }
494}
495
496pub(crate) struct EnterRunning;
497
498impl DatabaseStatusAction<'_, EnterRunning> {
499 pub(crate) fn enter(self) {
500 info!(database_id = ?self.database_id, "database enter running");
501 let event_log_manager_ref = self.control.env.event_log_manager_ref();
502 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
503 EventRecovery::database_recovery_success(self.database_id.database_id),
504 )]);
505 let database_status = self
506 .control
507 .databases
508 .get_mut(&self.database_id)
509 .expect("should exist");
510 match database_status {
511 DatabaseCheckpointControlStatus::Running(_) => {
512 unreachable!("should not enter running again")
513 }
514 DatabaseCheckpointControlStatus::Recovering(state) => {
515 let temp_place_holder = DatabaseRecoveringStage::Resetting {
516 remaining_workers: Default::default(),
517 reset_resps: Default::default(),
518 reset_request_id: 0,
519 backoff_future: None,
520 };
521 match state.metrics.recovery_timer.take() {
522 Some(recovery_timer) => {
523 recovery_timer.observe_duration();
524 }
525 _ => {
526 if cfg!(debug_assertions) {
527 panic!(
528 "take database {} recovery latency for twice",
529 self.database_id
530 )
531 } else {
532 warn!(database_id = %self.database_id,"failed to take recovery latency")
533 }
534 }
535 }
536 match replace(&mut state.stage, temp_place_holder) {
537 DatabaseRecoveringStage::Resetting { .. } => {
538 unreachable!("can only enter running during initializing")
539 }
540 DatabaseRecoveringStage::Initializing {
541 initial_barrier_collector,
542 } => {
543 *database_status = DatabaseCheckpointControlStatus::Running(
544 initial_barrier_collector.finish(),
545 );
546 }
547 }
548 }
549 }
550 }
551}