1use std::collections::{HashMap, HashSet};
16use std::mem::{replace, take};
17use std::task::{Context, Poll};
18
19use futures::FutureExt;
20use itertools::Itertools;
21use prometheus::{HistogramTimer, IntCounter};
22use risingwave_common::catalog::DatabaseId;
23use risingwave_common::id::JobId;
24use risingwave_meta_model::WorkerId;
25use risingwave_pb::meta::event_log::{Event, EventRecovery};
26use risingwave_pb::stream_service::BarrierCompleteResponse;
27use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
28use thiserror_ext::AsReport;
29use tracing::{info, warn};
30
31use crate::barrier::DatabaseRuntimeInfoSnapshot;
32use crate::barrier::checkpoint::control::DatabaseCheckpointControlStatus;
33use crate::barrier::checkpoint::creating_job::CreatingStreamingJobControl;
34use crate::barrier::checkpoint::{BarrierWorkerState, CheckpointControl};
35use crate::barrier::complete_task::BarrierCompleteOutput;
36use crate::barrier::context::recovery::RenderedDatabaseRuntimeInfo;
37use crate::barrier::rpc::{
38 ControlStreamManager, DatabaseInitialBarrierCollector, from_partial_graph_id,
39 to_partial_graph_id,
40};
41use crate::barrier::worker::{
42 RetryBackoffFuture, RetryBackoffStrategy, get_retry_backoff_strategy,
43};
44use crate::rpc::metrics::GLOBAL_META_METRICS;
45use crate::{MetaError, MetaResult};
46
47#[derive(Default, Debug)]
48pub(super) struct ResetPartialGraphCollector {
49 pub(super) remaining_workers: HashSet<WorkerId>,
50 pub(super) reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
51}
52
53impl ResetPartialGraphCollector {
54 pub(super) fn collect(&mut self, worker_id: WorkerId, resp: ResetPartialGraphResponse) {
55 assert!(self.remaining_workers.remove(&worker_id));
56 self.reset_resps
57 .try_insert(worker_id, resp)
58 .expect("non-duplicate");
59 }
60}
61
62enum DatabaseRecoveringStage {
91 Resetting {
92 database_resp_collector: ResetPartialGraphCollector,
93 creating_job_collectors: HashMap<JobId, ResetPartialGraphCollector>,
94 backoff_future: Option<RetryBackoffFuture>,
95 },
96 Initializing {
97 initial_barrier_collector: Box<DatabaseInitialBarrierCollector>,
98 },
99}
100
101pub(crate) struct DatabaseRecoveringState {
102 stage: DatabaseRecoveringStage,
103 retry_backoff_strategy: RetryBackoffStrategy,
104 metrics: DatabaseRecoveryMetrics,
105}
106
107pub(super) enum RecoveringStateAction {
108 EnterInitializing(Vec<(WorkerId, ResetPartialGraphResponse)>),
109 EnterRunning,
110}
111
112struct DatabaseRecoveryMetrics {
113 recovery_failure_cnt: IntCounter,
114 recovery_timer: Option<HistogramTimer>,
115}
116
117impl DatabaseRecoveryMetrics {
118 fn new(database_id: DatabaseId) -> Self {
119 let database_id_str = format!("database {}", database_id);
120 Self {
121 recovery_failure_cnt: GLOBAL_META_METRICS
122 .recovery_failure_cnt
123 .with_label_values(&[database_id_str.as_str()]),
124 recovery_timer: Some(
125 GLOBAL_META_METRICS
126 .recovery_latency
127 .with_label_values(&[database_id_str.as_str()])
128 .start_timer(),
129 ),
130 }
131 }
132}
133
134impl DatabaseRecoveringState {
135 fn reset_database_partial_graphs(
136 database_id: DatabaseId,
137 creating_jobs: impl Iterator<Item = JobId>,
138 control_stream_manager: &mut ControlStreamManager,
139 ) -> (
140 ResetPartialGraphCollector,
141 HashMap<JobId, ResetPartialGraphCollector>,
142 ) {
143 let creating_jobs = creating_jobs.collect_vec();
144 let remaining_workers = control_stream_manager.reset_partial_graphs(
145 creating_jobs
146 .iter()
147 .copied()
148 .map(Some)
149 .chain([None])
150 .map(|creating_job_id| to_partial_graph_id(database_id, creating_job_id))
151 .collect(),
152 );
153 (
154 ResetPartialGraphCollector {
155 remaining_workers: remaining_workers.clone(),
156 reset_resps: Default::default(),
157 },
158 creating_jobs
159 .into_iter()
160 .map(|job_id| {
161 (
162 job_id,
163 ResetPartialGraphCollector {
164 remaining_workers: remaining_workers.clone(),
165 reset_resps: Default::default(),
166 },
167 )
168 })
169 .collect(),
170 )
171 }
172
173 pub(super) fn new_resetting(
174 database_id: DatabaseId,
175 creating_jobs: impl Iterator<Item = JobId>,
176 control_stream_manager: &mut ControlStreamManager,
177 ) -> Self {
178 let mut retry_backoff_strategy = get_retry_backoff_strategy();
179 let backoff_future = retry_backoff_strategy.next().unwrap();
180 let metrics = DatabaseRecoveryMetrics::new(database_id);
181 metrics.recovery_failure_cnt.inc();
182 let (database_resp_collector, creating_job_collectors) =
183 Self::reset_database_partial_graphs(database_id, creating_jobs, control_stream_manager);
184 Self {
185 stage: DatabaseRecoveringStage::Resetting {
186 database_resp_collector,
187 creating_job_collectors,
188 backoff_future: Some(backoff_future),
189 },
190 retry_backoff_strategy,
191 metrics,
192 }
193 }
194
195 fn next_retry(&mut self) -> RetryBackoffFuture {
196 self.retry_backoff_strategy
197 .next()
198 .expect("should not be empty")
199 }
200
201 pub(super) fn barrier_collected(
202 &mut self,
203 database_id: DatabaseId,
204 resp: BarrierCompleteResponse,
205 ) {
206 match &mut self.stage {
207 DatabaseRecoveringStage::Resetting { .. } => {
208 }
210 DatabaseRecoveringStage::Initializing {
211 initial_barrier_collector,
212 } => {
213 let worker_id = resp.worker_id;
214 initial_barrier_collector.collect_resp(resp);
215 info!(
216 ?database_id,
217 %worker_id,
218 remaining_workers = ?initial_barrier_collector,
219 "initializing database barrier collected"
220 );
221 }
222 }
223 }
224
225 pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
226 match &mut self.stage {
227 DatabaseRecoveringStage::Resetting {
228 database_resp_collector,
229 creating_job_collectors,
230 ..
231 } => {
232 for collector in creating_job_collectors
233 .values_mut()
234 .chain([database_resp_collector])
235 {
236 collector.remaining_workers.remove(&worker_id);
237 }
238 true
239 }
240 DatabaseRecoveringStage::Initializing {
241 initial_barrier_collector,
242 ..
243 } => initial_barrier_collector.is_valid_after_worker_err(worker_id),
244 }
245 }
246
247 pub(super) fn on_reset_partial_graph_resp(
248 &mut self,
249 worker_id: WorkerId,
250 resp: ResetPartialGraphResponse,
251 ) {
252 let (database_id, creating_job_id) = from_partial_graph_id(resp.partial_graph_id);
253 match &mut self.stage {
254 DatabaseRecoveringStage::Resetting {
255 database_resp_collector,
256 creating_job_collectors,
257 ..
258 } => {
259 let collector = if let Some(creating_job_id) = creating_job_id {
260 let Some(collector) = creating_job_collectors.get_mut(&creating_job_id) else {
261 if cfg!(debug_assertions) {
262 panic!(
263 "receive reset partial graph resp on non-existing creating job: {resp:?}"
264 )
265 }
266 warn!(
267 %database_id,
268 %creating_job_id,
269 %worker_id,
270 ?resp,
271 "ignore reset partial graph resp on non-existing creating job"
272 );
273 return;
274 };
275 collector
276 } else {
277 database_resp_collector
278 };
279 collector.collect(worker_id, resp);
280 }
281 DatabaseRecoveringStage::Initializing { .. } => {
282 unreachable!("all reset resp should have been received in Resetting")
283 }
284 }
285 }
286
287 pub(super) fn poll_next_event(&mut self, cx: &mut Context<'_>) -> Poll<RecoveringStateAction> {
288 match &mut self.stage {
289 DatabaseRecoveringStage::Resetting {
290 database_resp_collector,
291 creating_job_collectors,
292 backoff_future: backoff_future_option,
293 ..
294 } => {
295 let pass_backoff = if let Some(backoff_future) = backoff_future_option {
296 if backoff_future.poll_unpin(cx).is_ready() {
297 *backoff_future_option = None;
298 true
299 } else {
300 false
301 }
302 } else {
303 true
304 };
305 if pass_backoff
306 && database_resp_collector.remaining_workers.is_empty()
307 && creating_job_collectors
308 .values()
309 .all(|collector| collector.remaining_workers.is_empty())
310 {
311 return Poll::Ready(RecoveringStateAction::EnterInitializing(
312 creating_job_collectors
313 .values_mut()
314 .chain([database_resp_collector])
315 .flat_map(|collector| take(&mut collector.reset_resps))
316 .collect(),
317 ));
318 }
319 }
320 DatabaseRecoveringStage::Initializing {
321 initial_barrier_collector,
322 ..
323 } => {
324 if initial_barrier_collector.is_collected() {
325 return Poll::Ready(RecoveringStateAction::EnterRunning);
326 }
327 }
328 }
329 Poll::Pending
330 }
331
332 pub(super) fn database_state(
333 &self,
334 ) -> Option<(
335 &BarrierWorkerState,
336 &HashMap<JobId, CreatingStreamingJobControl>,
337 )> {
338 match &self.stage {
339 DatabaseRecoveringStage::Resetting { .. } => None,
340 DatabaseRecoveringStage::Initializing {
341 initial_barrier_collector,
342 ..
343 } => Some(initial_barrier_collector.database_state()),
344 }
345 }
346}
347
348pub(crate) struct DatabaseStatusAction<'a, A> {
349 control: &'a mut CheckpointControl,
350 database_id: DatabaseId,
351 pub(crate) action: A,
352}
353
354impl<A> DatabaseStatusAction<'_, A> {
355 pub(crate) fn database_id(&self) -> DatabaseId {
356 self.database_id
357 }
358}
359
360impl CheckpointControl {
361 pub(super) fn new_database_status_action<A>(
362 &mut self,
363 database_id: DatabaseId,
364 action: A,
365 ) -> DatabaseStatusAction<'_, A> {
366 DatabaseStatusAction {
367 control: self,
368 database_id,
369 action,
370 }
371 }
372}
373
374pub(crate) struct EnterReset;
375
376impl DatabaseStatusAction<'_, EnterReset> {
377 pub(crate) fn enter(
378 self,
379 barrier_complete_output: Option<BarrierCompleteOutput>,
380 control_stream_manager: &mut ControlStreamManager,
381 ) {
382 let event_log_manager_ref = self.control.env.event_log_manager_ref();
383 if let Some(output) = barrier_complete_output {
384 self.control.ack_completed(output);
385 }
386 let database_status = self
387 .control
388 .databases
389 .get_mut(&self.database_id)
390 .expect("should exist");
391 match database_status {
392 DatabaseCheckpointControlStatus::Running(database) => {
393 let mut resetting_job_collectors = HashMap::new();
394 let (database_resp_collector, mut creating_job_collectors) =
395 DatabaseRecoveringState::reset_database_partial_graphs(
396 self.database_id,
397 database.creating_streaming_job_controls.drain().filter_map(
398 |(job_id, job)| {
399 if let Some(collector) = job.reset() {
400 resetting_job_collectors.insert(job_id, collector);
401 None
402 } else {
403 Some(job_id)
404 }
405 },
406 ),
407 control_stream_manager,
408 );
409 creating_job_collectors.extend(resetting_job_collectors);
410 let metrics = DatabaseRecoveryMetrics::new(self.database_id);
411 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
412 EventRecovery::database_recovery_start(self.database_id.as_raw_id()),
413 )]);
414 *database_status =
415 DatabaseCheckpointControlStatus::Recovering(DatabaseRecoveringState {
416 stage: DatabaseRecoveringStage::Resetting {
417 database_resp_collector,
418 creating_job_collectors,
419 backoff_future: None,
420 },
421 retry_backoff_strategy: get_retry_backoff_strategy(),
422 metrics,
423 });
424 }
425 DatabaseCheckpointControlStatus::Recovering(state) => match &mut state.stage {
426 DatabaseRecoveringStage::Resetting { .. } => {
427 unreachable!("should not enter resetting again")
428 }
429 DatabaseRecoveringStage::Initializing {
430 initial_barrier_collector,
431 } => {
432 let creating_jobs = initial_barrier_collector.creating_job_ids().collect_vec();
433 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
434 EventRecovery::database_recovery_failure(self.database_id.as_raw_id()),
435 )]);
436 let backoff_future = state.next_retry();
437 let (database_resp_collector, creating_job_collectors) =
438 DatabaseRecoveringState::reset_database_partial_graphs(
439 self.database_id,
440 creating_jobs.into_iter(),
441 control_stream_manager,
442 );
443 state.metrics.recovery_failure_cnt.inc();
444 state.stage = DatabaseRecoveringStage::Resetting {
445 database_resp_collector,
446 creating_job_collectors,
447 backoff_future: Some(backoff_future),
448 };
449 }
450 },
451 }
452 }
453}
454
455impl CheckpointControl {
456 pub(crate) fn on_report_failure(
457 &mut self,
458 database_id: DatabaseId,
459 control_stream_manager: &mut ControlStreamManager,
460 ) -> Option<DatabaseStatusAction<'_, EnterReset>> {
461 let database_status = self.databases.get_mut(&database_id).expect("should exist");
462 match database_status {
463 DatabaseCheckpointControlStatus::Running(_) => {
464 Some(self.new_database_status_action(database_id, EnterReset))
465 }
466 DatabaseCheckpointControlStatus::Recovering(state) => match &mut state.stage {
467 DatabaseRecoveringStage::Resetting { .. } => {
468 None
470 }
471 DatabaseRecoveringStage::Initializing {
472 initial_barrier_collector,
473 } => {
474 warn!(database_id = %database_id, "failed to initialize database");
475 let creating_jobs = initial_barrier_collector.creating_job_ids().collect_vec();
476 let backoff_future = state.next_retry();
477 let (database_resp_collector, creating_job_collectors) =
478 DatabaseRecoveringState::reset_database_partial_graphs(
479 database_id,
480 creating_jobs.into_iter(),
481 control_stream_manager,
482 );
483 state.metrics.recovery_failure_cnt.inc();
484 state.stage = DatabaseRecoveringStage::Resetting {
485 database_resp_collector,
486 creating_job_collectors,
487 backoff_future: Some(backoff_future),
488 };
489 None
490 }
491 },
492 }
493 }
494}
495
496pub(crate) struct EnterInitializing(pub(crate) Vec<(WorkerId, ResetPartialGraphResponse)>);
497
498impl DatabaseStatusAction<'_, EnterInitializing> {
499 pub(crate) fn enter(
500 self,
501 runtime_info: DatabaseRuntimeInfoSnapshot,
502 rendered_info: RenderedDatabaseRuntimeInfo,
503 control_stream_manager: &mut ControlStreamManager,
504 ) {
505 let database_status = self
506 .control
507 .databases
508 .get_mut(&self.database_id)
509 .expect("should exist");
510 let status = match database_status {
511 DatabaseCheckpointControlStatus::Running(_) => {
512 unreachable!("should not enter initializing when running")
513 }
514 DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
515 DatabaseRecoveringStage::Initializing { .. } => {
516 unreachable!("can only enter initializing when resetting")
517 }
518 DatabaseRecoveringStage::Resetting { .. } => state,
519 },
520 };
521 let DatabaseRuntimeInfoSnapshot {
522 recovery_context,
523 mut state_table_committed_epochs,
524 mut state_table_log_epochs,
525 mut mv_depended_subscriptions,
526 mut background_jobs,
527 mut cdc_table_snapshot_splits,
528 } = runtime_info;
529 let fragment_relations = &recovery_context.fragment_relations;
530 let RenderedDatabaseRuntimeInfo {
531 job_infos,
532 stream_actors,
533 mut source_splits,
534 } = rendered_info;
535 let mut injected_creating_jobs = HashSet::new();
536 let result: MetaResult<_> = try {
537 control_stream_manager.inject_database_initial_barrier(
538 self.database_id,
539 job_infos,
540 &recovery_context.job_extra_info,
541 &mut state_table_committed_epochs,
542 &mut state_table_log_epochs,
543 fragment_relations,
544 &stream_actors,
545 &mut source_splits,
546 &mut background_jobs,
547 &mut mv_depended_subscriptions,
548 false,
549 &self.control.hummock_version_stats,
550 &mut cdc_table_snapshot_splits,
551 &mut injected_creating_jobs,
552 )?
553 };
554 match result {
555 Ok(initial_barrier_collector) => {
556 info!(node_to_collect = ?initial_barrier_collector, database_id = ?self.database_id, "database enter initializing");
557 status.stage = DatabaseRecoveringStage::Initializing {
558 initial_barrier_collector: initial_barrier_collector.into(),
559 };
560 }
561 Err(e) => {
562 warn!(
563 database_id = %self.database_id,
564 e = %e.as_report(),
565 "failed to inject initial barrier"
566 );
567 let backoff_future = status.next_retry();
568 let (database_resp_collector, creating_job_collectors) =
569 DatabaseRecoveringState::reset_database_partial_graphs(
570 self.database_id,
571 injected_creating_jobs.into_iter(),
572 control_stream_manager,
573 );
574 status.metrics.recovery_failure_cnt.inc();
575 status.stage = DatabaseRecoveringStage::Resetting {
576 database_resp_collector,
577 creating_job_collectors,
578 backoff_future: Some(backoff_future),
579 };
580 }
581 }
582 }
583
584 pub(crate) fn fail_reload_runtime_info(self, e: MetaError) {
585 let database_status = self
586 .control
587 .databases
588 .get_mut(&self.database_id)
589 .expect("should exist");
590 let status = match database_status {
591 DatabaseCheckpointControlStatus::Running(_) => {
592 unreachable!("should not enter initializing when running")
593 }
594 DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
595 DatabaseRecoveringStage::Initializing { .. } => {
596 unreachable!("can only enter initializing when resetting")
597 }
598 DatabaseRecoveringStage::Resetting { .. } => state,
599 },
600 };
601 warn!(
602 database_id = %self.database_id,
603 e = %e.as_report(),
604 "failed to reload runtime info"
605 );
606 let backoff_future = status.next_retry();
607 status.metrics.recovery_failure_cnt.inc();
608 status.stage = DatabaseRecoveringStage::Resetting {
609 database_resp_collector: Default::default(),
610 creating_job_collectors: Default::default(),
611 backoff_future: Some(backoff_future),
612 };
613 }
614
615 pub(crate) fn remove(self) {
616 self.control
617 .databases
618 .remove(&self.database_id)
619 .expect("should exist");
620 self.control
621 .env
622 .shared_actor_infos()
623 .remove_database(self.database_id);
624 }
625}
626
627pub(crate) struct EnterRunning;
628
629impl DatabaseStatusAction<'_, EnterRunning> {
630 pub(crate) fn enter(self) {
631 info!(database_id = ?self.database_id, "database enter running");
632 let event_log_manager_ref = self.control.env.event_log_manager_ref();
633 event_log_manager_ref.add_event_logs(vec![Event::Recovery(
634 EventRecovery::database_recovery_success(self.database_id.as_raw_id()),
635 )]);
636 let database_status = self
637 .control
638 .databases
639 .get_mut(&self.database_id)
640 .expect("should exist");
641 match database_status {
642 DatabaseCheckpointControlStatus::Running(_) => {
643 unreachable!("should not enter running again")
644 }
645 DatabaseCheckpointControlStatus::Recovering(state) => {
646 let temp_place_holder = DatabaseRecoveringStage::Resetting {
647 database_resp_collector: Default::default(),
648 creating_job_collectors: Default::default(),
649 backoff_future: None,
650 };
651 match state.metrics.recovery_timer.take() {
652 Some(recovery_timer) => {
653 recovery_timer.observe_duration();
654 }
655 _ => {
656 if cfg!(debug_assertions) {
657 panic!(
658 "take database {} recovery latency for twice",
659 self.database_id
660 )
661 } else {
662 warn!(database_id = %self.database_id,"failed to take recovery latency")
663 }
664 }
665 }
666 match replace(&mut state.stage, temp_place_holder) {
667 DatabaseRecoveringStage::Resetting { .. } => {
668 unreachable!("can only enter running during initializing")
669 }
670 DatabaseRecoveringStage::Initializing {
671 initial_barrier_collector,
672 } => {
673 *database_status = DatabaseCheckpointControlStatus::Running(
674 initial_barrier_collector.finish(),
675 );
676 }
677 }
678 }
679 }
680 }
681}