1use std::collections::{HashMap, HashSet};
16use std::mem::{replace, take};
17use std::task::{Context, Poll};
18
19use futures::FutureExt;
20use prometheus::{HistogramTimer, IntCounter};
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_meta_model::WorkerId;
23use risingwave_pb::meta::event_log::{Event, EventRecovery};
24use risingwave_pb::stream_service::BarrierCompleteResponse;
25use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
26use thiserror_ext::AsReport;
27use tracing::{info, warn};
28
29use crate::MetaResult;
30use crate::barrier::DatabaseRuntimeInfoSnapshot;
31use crate::barrier::checkpoint::control::DatabaseCheckpointControlStatus;
32use crate::barrier::checkpoint::creating_job::CreatingStreamingJobControl;
33use crate::barrier::checkpoint::{BarrierWorkerState, CheckpointControl};
34use crate::barrier::complete_task::BarrierCompleteOutput;
35use crate::barrier::edge_builder::FragmentEdgeBuilder;
36use crate::barrier::rpc::{ControlStreamManager, DatabaseInitialBarrierCollector};
37use crate::barrier::worker::{
38 RetryBackoffFuture, RetryBackoffStrategy, get_retry_backoff_strategy,
39};
40use crate::rpc::metrics::GLOBAL_META_METRICS;
41
42enum DatabaseRecoveringStage {
71 Resetting {
72 remaining_workers: HashSet<WorkerId>,
73 reset_resps: HashMap<WorkerId, ResetDatabaseResponse>,
74 reset_request_id: u32,
75 backoff_future: Option<RetryBackoffFuture>,
76 },
77 Initializing {
78 initial_barrier_collector: Box<DatabaseInitialBarrierCollector>,
79 },
80}
81
82pub(crate) struct DatabaseRecoveringState {
83 stage: DatabaseRecoveringStage,
84 next_reset_request_id: u32,
85 retry_backoff_strategy: RetryBackoffStrategy,
86 metrics: DatabaseRecoveryMetrics,
87}
88
89pub(super) enum RecoveringStateAction {
90 EnterInitializing(HashMap<WorkerId, ResetDatabaseResponse>),
91 EnterRunning,
92}
93
94struct DatabaseRecoveryMetrics {
95 recovery_failure_cnt: IntCounter,
96 recovery_timer: Option<HistogramTimer>,
97}
98
99impl DatabaseRecoveryMetrics {
100 fn new(database_id: DatabaseId) -> Self {
101 let database_id_str = format!("database {}", database_id.database_id);
102 Self {
103 recovery_failure_cnt: GLOBAL_META_METRICS
104 .recovery_failure_cnt
105 .with_label_values(&[database_id_str.as_str()]),
106 recovery_timer: Some(
107 GLOBAL_META_METRICS
108 .recovery_latency
109 .with_label_values(&[database_id_str.as_str()])
110 .start_timer(),
111 ),
112 }
113 }
114}
115
116const INITIAL_RESET_REQUEST_ID: u32 = 0;
117
118impl DatabaseRecoveringState {
119 pub(super) fn resetting(
120 database_id: DatabaseId,
121 control_stream_manager: &mut ControlStreamManager,
122 ) -> Self {
123 let mut retry_backoff_strategy = get_retry_backoff_strategy();
124 let backoff_future = retry_backoff_strategy.next().unwrap();
125 let metrics = DatabaseRecoveryMetrics::new(database_id);
126 metrics.recovery_failure_cnt.inc();
127
128 Self {
129 stage: DatabaseRecoveringStage::Resetting {
130 remaining_workers: control_stream_manager
131 .reset_database(database_id, INITIAL_RESET_REQUEST_ID),
132 reset_resps: Default::default(),
133 reset_request_id: INITIAL_RESET_REQUEST_ID,
134 backoff_future: Some(backoff_future),
135 },
136 next_reset_request_id: INITIAL_RESET_REQUEST_ID + 1,
137 retry_backoff_strategy,
138 metrics,
139 }
140 }
141
142 fn next_retry(&mut self) -> (RetryBackoffFuture, u32) {
143 let backoff_future = self
144 .retry_backoff_strategy
145 .next()
146 .expect("should not be empty");
147 let request_id = self.next_reset_request_id;
148 self.next_reset_request_id += 1;
149 (backoff_future, request_id)
150 }
151
152 pub(super) fn barrier_collected(
153 &mut self,
154 database_id: DatabaseId,
155 resp: BarrierCompleteResponse,
156 ) {
157 match &mut self.stage {
158 DatabaseRecoveringStage::Resetting { .. } => {
159 }
161 DatabaseRecoveringStage::Initializing {
162 initial_barrier_collector,
163 } => {
164 let worker_id = resp.worker_id as WorkerId;
165 initial_barrier_collector.collect_resp(resp);
166 info!(
167 ?database_id,
168 worker_id,
169 remaining_workers = ?initial_barrier_collector,
170 "initializing database barrier collected"
171 );
172 }
173 }
174 }
175
176 pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
177 match &mut self.stage {
178 DatabaseRecoveringStage::Resetting {
179 remaining_workers, ..
180 } => {
181 remaining_workers.remove(&worker_id);
182 true
183 }
184 DatabaseRecoveringStage::Initializing {
185 initial_barrier_collector,
186 ..
187 } => initial_barrier_collector.is_valid_after_worker_err(worker_id),
188 }
189 }
190
191 pub(super) fn on_reset_database_resp(
192 &mut self,
193 worker_id: WorkerId,
194 resp: ResetDatabaseResponse,
195 ) {
196 match &mut self.stage {
197 DatabaseRecoveringStage::Resetting {
198 remaining_workers,
199 reset_resps,
200 reset_request_id,
201 ..
202 } => {
203 if resp.reset_request_id < *reset_request_id {
204 info!(
205 database_id = resp.database_id,
206 worker_id,
207 received_request_id = resp.reset_request_id,
208 ongoing_request_id = reset_request_id,
209 "ignore stale reset response"
210 );
211 } else {
212 assert_eq!(resp.reset_request_id, *reset_request_id);
213 assert!(remaining_workers.remove(&worker_id));
214 reset_resps
215 .try_insert(worker_id, resp)
216 .expect("non-duplicate");
217 }
218 }
219 DatabaseRecoveringStage::Initializing { .. } => {
220 unreachable!("all reset resp should have been received in Resetting")
221 }
222 }
223 }
224
225 pub(super) fn poll_next_event(&mut self, cx: &mut Context<'_>) -> Poll<RecoveringStateAction> {
226 match &mut self.stage {
227 DatabaseRecoveringStage::Resetting {
228 remaining_workers,
229 reset_resps,
230 backoff_future: backoff_future_option,
231 ..
232 } => {
233 let pass_backoff = if let Some(backoff_future) = backoff_future_option {
234 if backoff_future.poll_unpin(cx).is_ready() {
235 *backoff_future_option = None;
236 true
237 } else {
238 false
239 }
240 } else {
241 true
242 };
243 if pass_backoff && remaining_workers.is_empty() {
244 return Poll::Ready(RecoveringStateAction::EnterInitializing(take(
245 reset_resps,
246 )));
247 }
248 }
249 DatabaseRecoveringStage::Initializing {
250 initial_barrier_collector,
251 ..
252 } => {
253 if initial_barrier_collector.is_collected() {
254 return Poll::Ready(RecoveringStateAction::EnterRunning);
255 }
256 }
257 }
258 Poll::Pending
259 }
260
261 pub(super) fn database_state(
262 &self,
263 ) -> Option<(
264 &BarrierWorkerState,
265 &HashMap<TableId, CreatingStreamingJobControl>,
266 )> {
267 match &self.stage {
268 DatabaseRecoveringStage::Resetting { .. } => None,
269 DatabaseRecoveringStage::Initializing {
270 initial_barrier_collector,
271 ..
272 } => Some(initial_barrier_collector.database_state()),
273 }
274 }
275}
276
277pub(crate) struct DatabaseStatusAction<'a, A> {
278 control: &'a mut CheckpointControl,
279 database_id: DatabaseId,
280 pub(crate) action: A,
281}
282
283impl<A> DatabaseStatusAction<'_, A> {
284 pub(crate) fn database_id(&self) -> DatabaseId {
285 self.database_id
286 }
287}
288
289impl CheckpointControl {
290 pub(super) fn new_database_status_action<A>(
291 &mut self,
292 database_id: DatabaseId,
293 action: A,
294 ) -> DatabaseStatusAction<'_, A> {
295 DatabaseStatusAction {
296 control: self,
297 database_id,
298 action,
299 }
300 }
301}
302
303pub(crate) struct EnterReset;
304
305impl DatabaseStatusAction<'_, EnterReset> {
306 pub(crate) fn enter(
307 self,
308 barrier_complete_output: Option<BarrierCompleteOutput>,
309 control_stream_manager: &mut ControlStreamManager,
310 ) {
311 let event_log_manager_ref = self.control.env.event_log_manager_ref();
312 if let Some(output) = barrier_complete_output {
313 self.control.ack_completed(output);
314 }
315 let database_status = self
316 .control
317 .databases
318 .get_mut(&self.database_id)
319 .expect("should exist");
320 match database_status {
321 DatabaseCheckpointControlStatus::Running(_) => {
322 let reset_request_id = INITIAL_RESET_REQUEST_ID;
323 let remaining_workers =
324 control_stream_manager.reset_database(self.database_id, reset_request_id);
325 let metrics = DatabaseRecoveryMetrics::new(self.database_id);
326 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
327 EventRecovery::database_recovery_start(self.database_id.database_id),
328 )]);
329 *database_status =
330 DatabaseCheckpointControlStatus::Recovering(DatabaseRecoveringState {
331 stage: DatabaseRecoveringStage::Resetting {
332 remaining_workers,
333 reset_resps: Default::default(),
334 reset_request_id,
335 backoff_future: None,
336 },
337 next_reset_request_id: reset_request_id + 1,
338 retry_backoff_strategy: get_retry_backoff_strategy(),
339 metrics,
340 });
341 }
342 DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
343 DatabaseRecoveringStage::Resetting { .. } => {
344 unreachable!("should not enter resetting again")
345 }
346 DatabaseRecoveringStage::Initializing { .. } => {
347 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
348 EventRecovery::database_recovery_failure(self.database_id.database_id),
349 )]);
350 let (backoff_future, reset_request_id) = state.next_retry();
351 let remaining_workers =
352 control_stream_manager.reset_database(self.database_id, reset_request_id);
353 state.metrics.recovery_failure_cnt.inc();
354 state.stage = DatabaseRecoveringStage::Resetting {
355 remaining_workers,
356 reset_resps: Default::default(),
357 reset_request_id,
358 backoff_future: Some(backoff_future),
359 };
360 }
361 },
362 }
363 }
364}
365
366impl CheckpointControl {
367 pub(crate) fn on_report_failure(
368 &mut self,
369 database_id: DatabaseId,
370 control_stream_manager: &mut ControlStreamManager,
371 ) -> Option<DatabaseStatusAction<'_, EnterReset>> {
372 let database_status = self.databases.get_mut(&database_id).expect("should exist");
373 match database_status {
374 DatabaseCheckpointControlStatus::Running(_) => {
375 Some(self.new_database_status_action(database_id, EnterReset))
376 }
377 DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
378 DatabaseRecoveringStage::Resetting { .. } => {
379 None
381 }
382 DatabaseRecoveringStage::Initializing { .. } => {
383 warn!(database_id = database_id.database_id, "");
384 let (backoff_future, reset_request_id) = state.next_retry();
385 let remaining_workers =
386 control_stream_manager.reset_database(database_id, reset_request_id);
387 state.metrics.recovery_failure_cnt.inc();
388 state.stage = DatabaseRecoveringStage::Resetting {
389 remaining_workers,
390 reset_resps: Default::default(),
391 reset_request_id,
392 backoff_future: Some(backoff_future),
393 };
394 None
395 }
396 },
397 }
398 }
399}
400
401pub(crate) struct EnterInitializing(pub(crate) HashMap<WorkerId, ResetDatabaseResponse>);
402
403impl DatabaseStatusAction<'_, EnterInitializing> {
404 pub(crate) fn control(&self) -> &CheckpointControl {
405 &*self.control
406 }
407
408 pub(crate) fn enter(
409 self,
410 runtime_info: DatabaseRuntimeInfoSnapshot,
411 control_stream_manager: &mut ControlStreamManager,
412 ) {
413 let database_status = self
414 .control
415 .databases
416 .get_mut(&self.database_id)
417 .expect("should exist");
418 let status = match database_status {
419 DatabaseCheckpointControlStatus::Running(_) => {
420 unreachable!("should not enter initializing when running")
421 }
422 DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
423 DatabaseRecoveringStage::Initializing { .. } => {
424 unreachable!("can only enter initializing when resetting")
425 }
426 DatabaseRecoveringStage::Resetting { .. } => state,
427 },
428 };
429 let DatabaseRuntimeInfoSnapshot {
430 job_infos,
431 mut state_table_committed_epochs,
432 mut state_table_log_epochs,
433 subscription_info,
434 stream_actors,
435 fragment_relations,
436 mut source_splits,
437 mut background_jobs,
438 mut cdc_table_snapshot_split_assignment,
439 } = runtime_info;
440 let result: MetaResult<_> = try {
441 let mut builder =
442 FragmentEdgeBuilder::new(job_infos.values().flatten(), control_stream_manager);
443 builder.add_relations(&fragment_relations);
444 let mut edges = builder.build();
445 control_stream_manager.inject_database_initial_barrier(
446 self.database_id,
447 job_infos,
448 &mut state_table_committed_epochs,
449 &mut state_table_log_epochs,
450 &mut edges,
451 &stream_actors,
452 &mut source_splits,
453 &mut background_jobs,
454 subscription_info,
455 false,
456 &self.control.hummock_version_stats,
457 &mut cdc_table_snapshot_split_assignment,
458 )?
459 };
460 match result {
461 Ok(initial_barrier_collector) => {
462 info!(node_to_collect = ?initial_barrier_collector, database_id = ?self.database_id, "database enter initializing");
463 status.stage = DatabaseRecoveringStage::Initializing {
464 initial_barrier_collector: initial_barrier_collector.into(),
465 };
466 }
467 Err(e) => {
468 warn!(
469 database_id = self.database_id.database_id,
470 e = %e.as_report(),
471 "failed to inject initial barrier"
472 );
473 let (backoff_future, reset_request_id) = status.next_retry();
474 let remaining_workers =
475 control_stream_manager.reset_database(self.database_id, reset_request_id);
476 status.metrics.recovery_failure_cnt.inc();
477 status.stage = DatabaseRecoveringStage::Resetting {
478 remaining_workers,
479 reset_resps: Default::default(),
480 reset_request_id,
481 backoff_future: Some(backoff_future),
482 };
483 }
484 }
485 }
486
487 pub(crate) fn remove(self) {
488 self.control
489 .databases
490 .remove(&self.database_id)
491 .expect("should exist");
492 self.control
493 .env
494 .shared_actor_infos()
495 .remove_database(self.database_id);
496 }
497}
498
499pub(crate) struct EnterRunning;
500
501impl DatabaseStatusAction<'_, EnterRunning> {
502 pub(crate) fn enter(self) {
503 info!(database_id = ?self.database_id, "database enter running");
504 let event_log_manager_ref = self.control.env.event_log_manager_ref();
505 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
506 EventRecovery::database_recovery_success(self.database_id.database_id),
507 )]);
508 let database_status = self
509 .control
510 .databases
511 .get_mut(&self.database_id)
512 .expect("should exist");
513 match database_status {
514 DatabaseCheckpointControlStatus::Running(_) => {
515 unreachable!("should not enter running again")
516 }
517 DatabaseCheckpointControlStatus::Recovering(state) => {
518 let temp_place_holder = DatabaseRecoveringStage::Resetting {
519 remaining_workers: Default::default(),
520 reset_resps: Default::default(),
521 reset_request_id: 0,
522 backoff_future: None,
523 };
524 match state.metrics.recovery_timer.take() {
525 Some(recovery_timer) => {
526 recovery_timer.observe_duration();
527 }
528 _ => {
529 if cfg!(debug_assertions) {
530 panic!(
531 "take database {} recovery latency for twice",
532 self.database_id
533 )
534 } else {
535 warn!(database_id = %self.database_id,"failed to take recovery latency")
536 }
537 }
538 }
539 match replace(&mut state.stage, temp_place_holder) {
540 DatabaseRecoveringStage::Resetting { .. } => {
541 unreachable!("can only enter running during initializing")
542 }
543 DatabaseRecoveringStage::Initializing {
544 initial_barrier_collector,
545 } => {
546 *database_status = DatabaseCheckpointControlStatus::Running(
547 initial_barrier_collector.finish(),
548 );
549 }
550 }
551 }
552 }
553 }
554}