risingwave_meta/barrier/checkpoint/
recovery.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::mem::{replace, take};
17use std::task::{Context, Poll};
18
19use futures::FutureExt;
20use itertools::Itertools;
21use prometheus::{HistogramTimer, IntCounter};
22use risingwave_common::catalog::DatabaseId;
23use risingwave_common::id::JobId;
24use risingwave_meta_model::WorkerId;
25use risingwave_pb::meta::event_log::{Event, EventRecovery};
26use risingwave_pb::stream_service::BarrierCompleteResponse;
27use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
28use thiserror_ext::AsReport;
29use tracing::{info, warn};
30
31use crate::barrier::DatabaseRuntimeInfoSnapshot;
32use crate::barrier::checkpoint::control::DatabaseCheckpointControlStatus;
33use crate::barrier::checkpoint::creating_job::CreatingStreamingJobControl;
34use crate::barrier::checkpoint::{BarrierWorkerState, CheckpointControl};
35use crate::barrier::complete_task::BarrierCompleteOutput;
36use crate::barrier::context::recovery::RenderedDatabaseRuntimeInfo;
37use crate::barrier::rpc::{
38    ControlStreamManager, DatabaseInitialBarrierCollector, from_partial_graph_id,
39    to_partial_graph_id,
40};
41use crate::barrier::worker::{
42    RetryBackoffFuture, RetryBackoffStrategy, get_retry_backoff_strategy,
43};
44use crate::rpc::metrics::GLOBAL_META_METRICS;
45use crate::{MetaError, MetaResult};
46
47#[derive(Default, Debug)]
48pub(super) struct ResetPartialGraphCollector {
49    pub(super) remaining_workers: HashSet<WorkerId>,
50    pub(super) reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
51}
52
53impl ResetPartialGraphCollector {
54    pub(super) fn collect(&mut self, worker_id: WorkerId, resp: ResetPartialGraphResponse) {
55        assert!(self.remaining_workers.remove(&worker_id));
56        self.reset_resps
57            .try_insert(worker_id, resp)
58            .expect("non-duplicate");
59    }
60}
61
62/// We can treat each database as a state machine of 3 states: `Running`, `Resetting` and `Initializing`.
63/// The state transition can be triggered when receiving 3 variants of response: `ReportDatabaseFailure`, `BarrierComplete`, `DatabaseReset`.
64/// The logic of state transition can be summarized as followed:
65///
66/// `Running`
67///     - on `ReportDatabaseFailure`
68///         - wait for the inflight B`arrierCompletingTask` to finish if there is any, mark the database as blocked in command queue
69///         - send `ResetDatabaseRequest` to all CNs.
70///         - enter `Resetting` state.
71///     - on `BarrierComplete`: update the `DatabaseCheckpointControl`.
72///     - on `DatabaseReset`: unreachable
73/// `Resetting`
74///     - on `ReportDatabaseFailure` or `BarrierComplete`: ignore
75///     - on `DatabaseReset`:
76///         - mark the CN as collected.
77///         - when all CNs have collected the response:
78///             - load the database runtime info from catalog manager and fragment manager
79///             - inject the initial barrier to CNs, save the set of nodes that need to collect response
80///             - if any failure happened, re-enter `Resetting` state.
81///             - enter `Initializing` state
82/// `Initializing`
83///     - on `BarrierComplete`:
84///         - mark the CN as collected
85///         - when all CNs have collected the response: enter Running
86///     - on `ReportDatabaseFailure`
87///         - send `ResetDatabaseRequest` to all CNs
88///         - enter `Resetting`
89///     - on `DatabaseReset`: unreachable
90enum DatabaseRecoveringStage {
91    Resetting {
92        database_resp_collector: ResetPartialGraphCollector,
93        creating_job_collectors: HashMap<JobId, ResetPartialGraphCollector>,
94        backoff_future: Option<RetryBackoffFuture>,
95    },
96    Initializing {
97        initial_barrier_collector: Box<DatabaseInitialBarrierCollector>,
98    },
99}
100
101pub(crate) struct DatabaseRecoveringState {
102    stage: DatabaseRecoveringStage,
103    retry_backoff_strategy: RetryBackoffStrategy,
104    metrics: DatabaseRecoveryMetrics,
105}
106
107pub(super) enum RecoveringStateAction {
108    EnterInitializing(Vec<(WorkerId, ResetPartialGraphResponse)>),
109    EnterRunning,
110}
111
112struct DatabaseRecoveryMetrics {
113    recovery_failure_cnt: IntCounter,
114    recovery_timer: Option<HistogramTimer>,
115}
116
117impl DatabaseRecoveryMetrics {
118    fn new(database_id: DatabaseId) -> Self {
119        let database_id_str = format!("database {}", database_id);
120        Self {
121            recovery_failure_cnt: GLOBAL_META_METRICS
122                .recovery_failure_cnt
123                .with_label_values(&[database_id_str.as_str()]),
124            recovery_timer: Some(
125                GLOBAL_META_METRICS
126                    .recovery_latency
127                    .with_label_values(&[database_id_str.as_str()])
128                    .start_timer(),
129            ),
130        }
131    }
132}
133
134impl DatabaseRecoveringState {
135    fn reset_database_partial_graphs(
136        database_id: DatabaseId,
137        creating_jobs: impl Iterator<Item = JobId>,
138        control_stream_manager: &mut ControlStreamManager,
139    ) -> (
140        ResetPartialGraphCollector,
141        HashMap<JobId, ResetPartialGraphCollector>,
142    ) {
143        let creating_jobs = creating_jobs.collect_vec();
144        let remaining_workers = control_stream_manager.reset_partial_graphs(
145            creating_jobs
146                .iter()
147                .copied()
148                .map(Some)
149                .chain([None])
150                .map(|creating_job_id| to_partial_graph_id(database_id, creating_job_id))
151                .collect(),
152        );
153        (
154            ResetPartialGraphCollector {
155                remaining_workers: remaining_workers.clone(),
156                reset_resps: Default::default(),
157            },
158            creating_jobs
159                .into_iter()
160                .map(|job_id| {
161                    (
162                        job_id,
163                        ResetPartialGraphCollector {
164                            remaining_workers: remaining_workers.clone(),
165                            reset_resps: Default::default(),
166                        },
167                    )
168                })
169                .collect(),
170        )
171    }
172
173    pub(super) fn new_resetting(
174        database_id: DatabaseId,
175        creating_jobs: impl Iterator<Item = JobId>,
176        control_stream_manager: &mut ControlStreamManager,
177    ) -> Self {
178        let mut retry_backoff_strategy = get_retry_backoff_strategy();
179        let backoff_future = retry_backoff_strategy.next().unwrap();
180        let metrics = DatabaseRecoveryMetrics::new(database_id);
181        metrics.recovery_failure_cnt.inc();
182        let (database_resp_collector, creating_job_collectors) =
183            Self::reset_database_partial_graphs(database_id, creating_jobs, control_stream_manager);
184        Self {
185            stage: DatabaseRecoveringStage::Resetting {
186                database_resp_collector,
187                creating_job_collectors,
188                backoff_future: Some(backoff_future),
189            },
190            retry_backoff_strategy,
191            metrics,
192        }
193    }
194
195    fn next_retry(&mut self) -> RetryBackoffFuture {
196        self.retry_backoff_strategy
197            .next()
198            .expect("should not be empty")
199    }
200
201    pub(super) fn barrier_collected(
202        &mut self,
203        database_id: DatabaseId,
204        resp: BarrierCompleteResponse,
205    ) {
206        match &mut self.stage {
207            DatabaseRecoveringStage::Resetting { .. } => {
208                // ignore the collected barrier on resetting or backoff
209            }
210            DatabaseRecoveringStage::Initializing {
211                initial_barrier_collector,
212            } => {
213                let worker_id = resp.worker_id;
214                initial_barrier_collector.collect_resp(resp);
215                info!(
216                    ?database_id,
217                    %worker_id,
218                    remaining_workers = ?initial_barrier_collector,
219                    "initializing database barrier collected"
220                );
221            }
222        }
223    }
224
225    pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
226        match &mut self.stage {
227            DatabaseRecoveringStage::Resetting {
228                database_resp_collector,
229                creating_job_collectors,
230                ..
231            } => {
232                for collector in creating_job_collectors
233                    .values_mut()
234                    .chain([database_resp_collector])
235                {
236                    collector.remaining_workers.remove(&worker_id);
237                }
238                true
239            }
240            DatabaseRecoveringStage::Initializing {
241                initial_barrier_collector,
242                ..
243            } => initial_barrier_collector.is_valid_after_worker_err(worker_id),
244        }
245    }
246
247    pub(super) fn on_reset_partial_graph_resp(
248        &mut self,
249        worker_id: WorkerId,
250        resp: ResetPartialGraphResponse,
251    ) {
252        let (database_id, creating_job_id) = from_partial_graph_id(resp.partial_graph_id);
253        match &mut self.stage {
254            DatabaseRecoveringStage::Resetting {
255                database_resp_collector,
256                creating_job_collectors,
257                ..
258            } => {
259                let collector = if let Some(creating_job_id) = creating_job_id {
260                    let Some(collector) = creating_job_collectors.get_mut(&creating_job_id) else {
261                        if cfg!(debug_assertions) {
262                            panic!(
263                                "receive reset partial graph resp on non-existing creating job: {resp:?}"
264                            )
265                        }
266                        warn!(
267                            %database_id,
268                            %creating_job_id,
269                            %worker_id,
270                            ?resp,
271                            "ignore reset partial graph resp on non-existing creating job"
272                        );
273                        return;
274                    };
275                    collector
276                } else {
277                    database_resp_collector
278                };
279                collector.collect(worker_id, resp);
280            }
281            DatabaseRecoveringStage::Initializing { .. } => {
282                unreachable!("all reset resp should have been received in Resetting")
283            }
284        }
285    }
286
287    pub(super) fn poll_next_event(&mut self, cx: &mut Context<'_>) -> Poll<RecoveringStateAction> {
288        match &mut self.stage {
289            DatabaseRecoveringStage::Resetting {
290                database_resp_collector,
291                creating_job_collectors,
292                backoff_future: backoff_future_option,
293                ..
294            } => {
295                let pass_backoff = if let Some(backoff_future) = backoff_future_option {
296                    if backoff_future.poll_unpin(cx).is_ready() {
297                        *backoff_future_option = None;
298                        true
299                    } else {
300                        false
301                    }
302                } else {
303                    true
304                };
305                if pass_backoff
306                    && database_resp_collector.remaining_workers.is_empty()
307                    && creating_job_collectors
308                        .values()
309                        .all(|collector| collector.remaining_workers.is_empty())
310                {
311                    return Poll::Ready(RecoveringStateAction::EnterInitializing(
312                        creating_job_collectors
313                            .values_mut()
314                            .chain([database_resp_collector])
315                            .flat_map(|collector| take(&mut collector.reset_resps))
316                            .collect(),
317                    ));
318                }
319            }
320            DatabaseRecoveringStage::Initializing {
321                initial_barrier_collector,
322                ..
323            } => {
324                if initial_barrier_collector.is_collected() {
325                    return Poll::Ready(RecoveringStateAction::EnterRunning);
326                }
327            }
328        }
329        Poll::Pending
330    }
331
332    pub(super) fn database_state(
333        &self,
334    ) -> Option<(
335        &BarrierWorkerState,
336        &HashMap<JobId, CreatingStreamingJobControl>,
337    )> {
338        match &self.stage {
339            DatabaseRecoveringStage::Resetting { .. } => None,
340            DatabaseRecoveringStage::Initializing {
341                initial_barrier_collector,
342                ..
343            } => Some(initial_barrier_collector.database_state()),
344        }
345    }
346}
347
348pub(crate) struct DatabaseStatusAction<'a, A> {
349    control: &'a mut CheckpointControl,
350    database_id: DatabaseId,
351    pub(crate) action: A,
352}
353
354impl<A> DatabaseStatusAction<'_, A> {
355    pub(crate) fn database_id(&self) -> DatabaseId {
356        self.database_id
357    }
358}
359
360impl CheckpointControl {
361    pub(super) fn new_database_status_action<A>(
362        &mut self,
363        database_id: DatabaseId,
364        action: A,
365    ) -> DatabaseStatusAction<'_, A> {
366        DatabaseStatusAction {
367            control: self,
368            database_id,
369            action,
370        }
371    }
372}
373
374pub(crate) struct EnterReset;
375
376impl DatabaseStatusAction<'_, EnterReset> {
377    pub(crate) fn enter(
378        self,
379        barrier_complete_output: Option<BarrierCompleteOutput>,
380        control_stream_manager: &mut ControlStreamManager,
381    ) {
382        let event_log_manager_ref = self.control.env.event_log_manager_ref();
383        if let Some(output) = barrier_complete_output {
384            self.control.ack_completed(output);
385        }
386        let database_status = self
387            .control
388            .databases
389            .get_mut(&self.database_id)
390            .expect("should exist");
391        match database_status {
392            DatabaseCheckpointControlStatus::Running(database) => {
393                let mut resetting_job_collectors = HashMap::new();
394                let (database_resp_collector, mut creating_job_collectors) =
395                    DatabaseRecoveringState::reset_database_partial_graphs(
396                        self.database_id,
397                        database.creating_streaming_job_controls.drain().filter_map(
398                            |(job_id, job)| {
399                                if let Some(collector) = job.reset() {
400                                    resetting_job_collectors.insert(job_id, collector);
401                                    None
402                                } else {
403                                    Some(job_id)
404                                }
405                            },
406                        ),
407                        control_stream_manager,
408                    );
409                creating_job_collectors.extend(resetting_job_collectors);
410                let metrics = DatabaseRecoveryMetrics::new(self.database_id);
411                event_log_manager_ref.add_event_logs(vec![Event::Recovery(
412                    EventRecovery::database_recovery_start(self.database_id.as_raw_id()),
413                )]);
414                *database_status =
415                    DatabaseCheckpointControlStatus::Recovering(DatabaseRecoveringState {
416                        stage: DatabaseRecoveringStage::Resetting {
417                            database_resp_collector,
418                            creating_job_collectors,
419                            backoff_future: None,
420                        },
421                        retry_backoff_strategy: get_retry_backoff_strategy(),
422                        metrics,
423                    });
424            }
425            DatabaseCheckpointControlStatus::Recovering(state) => match &mut state.stage {
426                DatabaseRecoveringStage::Resetting { .. } => {
427                    unreachable!("should not enter resetting again")
428                }
429                DatabaseRecoveringStage::Initializing {
430                    initial_barrier_collector,
431                } => {
432                    let creating_jobs = initial_barrier_collector.creating_job_ids().collect_vec();
433                    event_log_manager_ref.add_event_logs(vec![Event::Recovery(
434                        EventRecovery::database_recovery_failure(self.database_id.as_raw_id()),
435                    )]);
436                    let backoff_future = state.next_retry();
437                    let (database_resp_collector, creating_job_collectors) =
438                        DatabaseRecoveringState::reset_database_partial_graphs(
439                            self.database_id,
440                            creating_jobs.into_iter(),
441                            control_stream_manager,
442                        );
443                    state.metrics.recovery_failure_cnt.inc();
444                    state.stage = DatabaseRecoveringStage::Resetting {
445                        database_resp_collector,
446                        creating_job_collectors,
447                        backoff_future: Some(backoff_future),
448                    };
449                }
450            },
451        }
452    }
453}
454
455impl CheckpointControl {
456    pub(crate) fn on_report_failure(
457        &mut self,
458        database_id: DatabaseId,
459        control_stream_manager: &mut ControlStreamManager,
460    ) -> Option<DatabaseStatusAction<'_, EnterReset>> {
461        let database_status = self.databases.get_mut(&database_id).expect("should exist");
462        match database_status {
463            DatabaseCheckpointControlStatus::Running(_) => {
464                Some(self.new_database_status_action(database_id, EnterReset))
465            }
466            DatabaseCheckpointControlStatus::Recovering(state) => match &mut state.stage {
467                DatabaseRecoveringStage::Resetting { .. } => {
468                    // ignore reported failure during resetting or backoff.
469                    None
470                }
471                DatabaseRecoveringStage::Initializing {
472                    initial_barrier_collector,
473                } => {
474                    warn!(database_id = %database_id, "failed to initialize database");
475                    let creating_jobs = initial_barrier_collector.creating_job_ids().collect_vec();
476                    let backoff_future = state.next_retry();
477                    let (database_resp_collector, creating_job_collectors) =
478                        DatabaseRecoveringState::reset_database_partial_graphs(
479                            database_id,
480                            creating_jobs.into_iter(),
481                            control_stream_manager,
482                        );
483                    state.metrics.recovery_failure_cnt.inc();
484                    state.stage = DatabaseRecoveringStage::Resetting {
485                        database_resp_collector,
486                        creating_job_collectors,
487                        backoff_future: Some(backoff_future),
488                    };
489                    None
490                }
491            },
492        }
493    }
494}
495
496pub(crate) struct EnterInitializing(pub(crate) Vec<(WorkerId, ResetPartialGraphResponse)>);
497
498impl DatabaseStatusAction<'_, EnterInitializing> {
499    pub(crate) fn enter(
500        self,
501        runtime_info: DatabaseRuntimeInfoSnapshot,
502        rendered_info: RenderedDatabaseRuntimeInfo,
503        control_stream_manager: &mut ControlStreamManager,
504    ) {
505        let database_status = self
506            .control
507            .databases
508            .get_mut(&self.database_id)
509            .expect("should exist");
510        let status = match database_status {
511            DatabaseCheckpointControlStatus::Running(_) => {
512                unreachable!("should not enter initializing when running")
513            }
514            DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
515                DatabaseRecoveringStage::Initializing { .. } => {
516                    unreachable!("can only enter initializing when resetting")
517                }
518                DatabaseRecoveringStage::Resetting { .. } => state,
519            },
520        };
521        let DatabaseRuntimeInfoSnapshot {
522            recovery_context,
523            mut state_table_committed_epochs,
524            mut state_table_log_epochs,
525            mut mv_depended_subscriptions,
526            mut background_jobs,
527            mut cdc_table_snapshot_splits,
528        } = runtime_info;
529        let fragment_relations = &recovery_context.fragment_relations;
530        let RenderedDatabaseRuntimeInfo {
531            job_infos,
532            stream_actors,
533            mut source_splits,
534        } = rendered_info;
535        let mut injected_creating_jobs = HashSet::new();
536        let result: MetaResult<_> = try {
537            control_stream_manager.inject_database_initial_barrier(
538                self.database_id,
539                job_infos,
540                &recovery_context.job_extra_info,
541                &mut state_table_committed_epochs,
542                &mut state_table_log_epochs,
543                fragment_relations,
544                &stream_actors,
545                &mut source_splits,
546                &mut background_jobs,
547                &mut mv_depended_subscriptions,
548                false,
549                &self.control.hummock_version_stats,
550                &mut cdc_table_snapshot_splits,
551                &mut injected_creating_jobs,
552            )?
553        };
554        match result {
555            Ok(initial_barrier_collector) => {
556                info!(node_to_collect = ?initial_barrier_collector, database_id = ?self.database_id, "database enter initializing");
557                status.stage = DatabaseRecoveringStage::Initializing {
558                    initial_barrier_collector: initial_barrier_collector.into(),
559                };
560            }
561            Err(e) => {
562                warn!(
563                    database_id = %self.database_id,
564                    e = %e.as_report(),
565                    "failed to inject initial barrier"
566                );
567                let backoff_future = status.next_retry();
568                let (database_resp_collector, creating_job_collectors) =
569                    DatabaseRecoveringState::reset_database_partial_graphs(
570                        self.database_id,
571                        injected_creating_jobs.into_iter(),
572                        control_stream_manager,
573                    );
574                status.metrics.recovery_failure_cnt.inc();
575                status.stage = DatabaseRecoveringStage::Resetting {
576                    database_resp_collector,
577                    creating_job_collectors,
578                    backoff_future: Some(backoff_future),
579                };
580            }
581        }
582    }
583
584    pub(crate) fn fail_reload_runtime_info(self, e: MetaError) {
585        let database_status = self
586            .control
587            .databases
588            .get_mut(&self.database_id)
589            .expect("should exist");
590        let status = match database_status {
591            DatabaseCheckpointControlStatus::Running(_) => {
592                unreachable!("should not enter initializing when running")
593            }
594            DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
595                DatabaseRecoveringStage::Initializing { .. } => {
596                    unreachable!("can only enter initializing when resetting")
597                }
598                DatabaseRecoveringStage::Resetting { .. } => state,
599            },
600        };
601        warn!(
602            database_id = %self.database_id,
603            e = %e.as_report(),
604            "failed to reload runtime info"
605        );
606        let backoff_future = status.next_retry();
607        status.metrics.recovery_failure_cnt.inc();
608        status.stage = DatabaseRecoveringStage::Resetting {
609            database_resp_collector: Default::default(),
610            creating_job_collectors: Default::default(),
611            backoff_future: Some(backoff_future),
612        };
613    }
614
615    pub(crate) fn remove(self) {
616        self.control
617            .databases
618            .remove(&self.database_id)
619            .expect("should exist");
620        self.control
621            .env
622            .shared_actor_infos()
623            .remove_database(self.database_id);
624    }
625}
626
627pub(crate) struct EnterRunning;
628
629impl DatabaseStatusAction<'_, EnterRunning> {
630    pub(crate) fn enter(self) {
631        info!(database_id = ?self.database_id, "database enter running");
632        let event_log_manager_ref = self.control.env.event_log_manager_ref();
633        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
634            EventRecovery::database_recovery_success(self.database_id.as_raw_id()),
635        )]);
636        let database_status = self
637            .control
638            .databases
639            .get_mut(&self.database_id)
640            .expect("should exist");
641        match database_status {
642            DatabaseCheckpointControlStatus::Running(_) => {
643                unreachable!("should not enter running again")
644            }
645            DatabaseCheckpointControlStatus::Recovering(state) => {
646                let temp_place_holder = DatabaseRecoveringStage::Resetting {
647                    database_resp_collector: Default::default(),
648                    creating_job_collectors: Default::default(),
649                    backoff_future: None,
650                };
651                match state.metrics.recovery_timer.take() {
652                    Some(recovery_timer) => {
653                        recovery_timer.observe_duration();
654                    }
655                    _ => {
656                        if cfg!(debug_assertions) {
657                            panic!(
658                                "take database {} recovery latency for twice",
659                                self.database_id
660                            )
661                        } else {
662                            warn!(database_id = %self.database_id,"failed to take recovery latency")
663                        }
664                    }
665                }
666                match replace(&mut state.stage, temp_place_holder) {
667                    DatabaseRecoveringStage::Resetting { .. } => {
668                        unreachable!("can only enter running during initializing")
669                    }
670                    DatabaseRecoveringStage::Initializing {
671                        initial_barrier_collector,
672                    } => {
673                        *database_status = DatabaseCheckpointControlStatus::Running(
674                            initial_barrier_collector.finish(),
675                        );
676                    }
677                }
678            }
679        }
680    }
681}