risingwave_meta/barrier/checkpoint/
recovery.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::mem::{replace, take};
17use std::task::{Context, Poll};
18
19use futures::FutureExt;
20use prometheus::{HistogramTimer, IntCounter};
21use risingwave_common::catalog::DatabaseId;
22use risingwave_meta_model::WorkerId;
23use risingwave_pb::id::PartialGraphId;
24use risingwave_pb::meta::event_log::{Event, EventRecovery};
25use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
26use thiserror_ext::AsReport;
27use tracing::{info, warn};
28
29use crate::barrier::DatabaseRuntimeInfoSnapshot;
30use crate::barrier::checkpoint::CheckpointControl;
31use crate::barrier::checkpoint::control::DatabaseCheckpointControlStatus;
32use crate::barrier::complete_task::BarrierCompleteOutput;
33use crate::barrier::context::recovery::RenderedDatabaseRuntimeInfo;
34use crate::barrier::partial_graph::PartialGraphManager;
35use crate::barrier::rpc::{DatabaseInitialBarrierCollector, to_partial_graph_id};
36use crate::barrier::worker::{
37    RetryBackoffFuture, RetryBackoffStrategy, get_retry_backoff_strategy,
38};
39use crate::rpc::metrics::GLOBAL_META_METRICS;
40use crate::{MetaError, MetaResult};
41
42/// We can treat each database as a state machine of 3 states: `Running`, `Resetting` and `Initializing`.
43/// The state transition can be triggered when receiving 3 variants of response: `ReportDatabaseFailure`, `BarrierComplete`, `DatabaseReset`.
44/// The logic of state transition can be summarized as followed:
45///
46/// `Running`
47///     - on `ReportDatabaseFailure`
48///         - wait for the inflight B`arrierCompletingTask` to finish if there is any, mark the database as blocked in command queue
49///         - send `ResetDatabaseRequest` to all CNs.
50///         - enter `Resetting` state.
51///     - on `BarrierComplete`: update the `DatabaseCheckpointControl`.
52///     - on `DatabaseReset`: unreachable
53/// `Resetting`
54///     - on `ReportDatabaseFailure` or `BarrierComplete`: ignore
55///     - on `DatabaseReset`:
56///         - mark the CN as collected.
57///         - when all CNs have collected the response:
58///             - load the database runtime info from catalog manager and fragment manager
59///             - inject the initial barrier to CNs, save the set of nodes that need to collect response
60///             - if any failure happened, re-enter `Resetting` state.
61///             - enter `Initializing` state
62/// `Initializing`
63///     - on `BarrierComplete`:
64///         - mark the CN as collected
65///         - when all CNs have collected the response: enter Running
66///     - on `ReportDatabaseFailure`
67///         - send `ResetDatabaseRequest` to all CNs
68///         - enter `Resetting`
69///     - on `DatabaseReset`: unreachable
70enum DatabaseRecoveringStage {
71    Resetting {
72        resetting_partial_graphs: HashSet<PartialGraphId>,
73        reset_resps: Vec<(WorkerId, ResetPartialGraphResponse)>,
74        backoff_future: Option<RetryBackoffFuture>,
75    },
76    Initializing {
77        initial_barrier_collector: Box<DatabaseInitialBarrierCollector>,
78    },
79}
80
81pub(crate) struct DatabaseRecoveringState {
82    stage: DatabaseRecoveringStage,
83    retry_backoff_strategy: RetryBackoffStrategy,
84    metrics: DatabaseRecoveryMetrics,
85}
86
87pub(super) enum RecoveringStateAction {
88    EnterInitializing(Vec<(WorkerId, ResetPartialGraphResponse)>),
89    EnterRunning,
90}
91
92struct DatabaseRecoveryMetrics {
93    recovery_failure_cnt: IntCounter,
94    recovery_timer: Option<HistogramTimer>,
95}
96
97impl DatabaseRecoveryMetrics {
98    fn new(database_id: DatabaseId) -> Self {
99        let database_id_str = format!("database {}", database_id);
100        Self {
101            recovery_failure_cnt: GLOBAL_META_METRICS
102                .recovery_failure_cnt
103                .with_label_values(&[database_id_str.as_str()]),
104            recovery_timer: Some(
105                GLOBAL_META_METRICS
106                    .recovery_latency
107                    .with_label_values(&[database_id_str.as_str()])
108                    .start_timer(),
109            ),
110        }
111    }
112}
113
114impl DatabaseRecoveringState {
115    pub(super) fn new_resetting(
116        database_id: DatabaseId,
117        resetting_partial_graphs: HashSet<PartialGraphId>,
118    ) -> Self {
119        let mut retry_backoff_strategy = get_retry_backoff_strategy();
120        let backoff_future = retry_backoff_strategy.next().unwrap();
121        let metrics = DatabaseRecoveryMetrics::new(database_id);
122        metrics.recovery_failure_cnt.inc();
123        Self {
124            stage: DatabaseRecoveringStage::Resetting {
125                resetting_partial_graphs,
126                reset_resps: vec![],
127                backoff_future: Some(backoff_future),
128            },
129            retry_backoff_strategy,
130            metrics,
131        }
132    }
133
134    fn next_retry(&mut self) -> RetryBackoffFuture {
135        self.retry_backoff_strategy
136            .next()
137            .expect("should not be empty")
138    }
139
140    pub(super) fn partial_graph_initialized(&mut self, partial_graph_id: PartialGraphId) {
141        match &mut self.stage {
142            DatabaseRecoveringStage::Resetting { .. } => {
143                unreachable!("should not have partial graph initialized when resetting")
144            }
145            DatabaseRecoveringStage::Initializing {
146                initial_barrier_collector,
147            } => {
148                initial_barrier_collector.partial_graph_initialized(partial_graph_id);
149            }
150        }
151    }
152
153    pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
154        match &mut self.stage {
155            DatabaseRecoveringStage::Resetting { .. } => true,
156            DatabaseRecoveringStage::Initializing {
157                initial_barrier_collector,
158                ..
159            } => initial_barrier_collector.is_valid_after_worker_err(worker_id),
160        }
161    }
162
163    pub(super) fn on_partial_graph_reset(
164        &mut self,
165        partial_graph_id: PartialGraphId,
166        new_reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
167    ) {
168        match &mut self.stage {
169            DatabaseRecoveringStage::Resetting {
170                resetting_partial_graphs,
171                reset_resps,
172                ..
173            } => {
174                assert!(resetting_partial_graphs.remove(&partial_graph_id));
175                reset_resps.extend(new_reset_resps);
176            }
177            DatabaseRecoveringStage::Initializing { .. } => {
178                unreachable!("all reset resp should have been received in Resetting")
179            }
180        }
181    }
182
183    pub(super) fn poll_next_event(&mut self, cx: &mut Context<'_>) -> Poll<RecoveringStateAction> {
184        match &mut self.stage {
185            DatabaseRecoveringStage::Resetting {
186                resetting_partial_graphs,
187                reset_resps,
188                backoff_future: backoff_future_option,
189            } => {
190                let pass_backoff = if let Some(backoff_future) = backoff_future_option {
191                    if backoff_future.poll_unpin(cx).is_ready() {
192                        *backoff_future_option = None;
193                        true
194                    } else {
195                        false
196                    }
197                } else {
198                    true
199                };
200                if pass_backoff && resetting_partial_graphs.is_empty() {
201                    return Poll::Ready(RecoveringStateAction::EnterInitializing(take(
202                        reset_resps,
203                    )));
204                }
205            }
206            DatabaseRecoveringStage::Initializing {
207                initial_barrier_collector,
208                ..
209            } => {
210                if initial_barrier_collector.is_collected() {
211                    return Poll::Ready(RecoveringStateAction::EnterRunning);
212                }
213            }
214        }
215        Poll::Pending
216    }
217}
218
219pub(crate) struct DatabaseStatusAction<'a, A> {
220    control: &'a mut CheckpointControl,
221    database_id: DatabaseId,
222    pub(crate) action: A,
223}
224
225impl<A> DatabaseStatusAction<'_, A> {
226    pub(crate) fn database_id(&self) -> DatabaseId {
227        self.database_id
228    }
229}
230
231impl CheckpointControl {
232    pub(super) fn new_database_status_action<A>(
233        &mut self,
234        database_id: DatabaseId,
235        action: A,
236    ) -> DatabaseStatusAction<'_, A> {
237        DatabaseStatusAction {
238            control: self,
239            database_id,
240            action,
241        }
242    }
243}
244
245pub(crate) struct EnterReset;
246
247impl DatabaseStatusAction<'_, EnterReset> {
248    pub(crate) fn enter(
249        self,
250        barrier_complete_output: Option<BarrierCompleteOutput>,
251        partial_graph_manager: &mut PartialGraphManager,
252    ) {
253        let event_log_manager_ref = self.control.env.event_log_manager_ref();
254        if let Some(output) = barrier_complete_output {
255            self.control.ack_completed(output);
256        }
257        let database_status = self
258            .control
259            .databases
260            .get_mut(&self.database_id)
261            .expect("should exist");
262        match database_status {
263            DatabaseCheckpointControlStatus::Running(database) => {
264                let mut resetting_partial_graphs = HashSet::new();
265                let new_reset_partial_graphs: HashSet<_> = database
266                    .creating_streaming_job_controls
267                    .drain()
268                    .filter_map(|(job_id, job)| {
269                        let partial_graph_id = to_partial_graph_id(self.database_id, Some(job_id));
270                        if job.reset() {
271                            resetting_partial_graphs.insert(partial_graph_id);
272                            None
273                        } else {
274                            Some(partial_graph_id)
275                        }
276                    })
277                    .chain([to_partial_graph_id(self.database_id, None)])
278                    .collect();
279                resetting_partial_graphs
280                    .iter()
281                    .for_each(|partial_graph_id| {
282                        partial_graph_manager.assert_resetting(*partial_graph_id)
283                    });
284                partial_graph_manager
285                    .reset_partial_graphs(new_reset_partial_graphs.iter().copied());
286                resetting_partial_graphs.extend(new_reset_partial_graphs);
287
288                let metrics = DatabaseRecoveryMetrics::new(self.database_id);
289                event_log_manager_ref.add_event_logs(vec![Event::Recovery(
290                    EventRecovery::database_recovery_start(self.database_id.as_raw_id()),
291                )]);
292                *database_status =
293                    DatabaseCheckpointControlStatus::Recovering(DatabaseRecoveringState {
294                        stage: DatabaseRecoveringStage::Resetting {
295                            resetting_partial_graphs,
296                            reset_resps: vec![],
297                            backoff_future: None,
298                        },
299                        retry_backoff_strategy: get_retry_backoff_strategy(),
300                        metrics,
301                    });
302            }
303            DatabaseCheckpointControlStatus::Recovering(state) => match &mut state.stage {
304                DatabaseRecoveringStage::Resetting { .. } => {
305                    unreachable!("should not enter resetting again")
306                }
307                DatabaseRecoveringStage::Initializing {
308                    initial_barrier_collector,
309                } => {
310                    let partial_graphs: HashSet<_> =
311                        initial_barrier_collector.all_partial_graphs().collect();
312                    event_log_manager_ref.add_event_logs(vec![Event::Recovery(
313                        EventRecovery::database_recovery_failure(self.database_id.as_raw_id()),
314                    )]);
315                    let backoff_future = state.next_retry();
316                    partial_graph_manager.reset_partial_graphs(partial_graphs.iter().copied());
317                    state.metrics.recovery_failure_cnt.inc();
318                    state.stage = DatabaseRecoveringStage::Resetting {
319                        resetting_partial_graphs: partial_graphs,
320                        reset_resps: vec![],
321                        backoff_future: Some(backoff_future),
322                    };
323                }
324            },
325        }
326    }
327}
328
329impl CheckpointControl {
330    pub(crate) fn on_report_failure(
331        &mut self,
332        database_id: DatabaseId,
333        partial_graph_manager: &mut PartialGraphManager,
334    ) -> Option<DatabaseStatusAction<'_, EnterReset>> {
335        let database_status = self.databases.get_mut(&database_id).expect("should exist");
336        match database_status {
337            DatabaseCheckpointControlStatus::Running(_) => {
338                Some(self.new_database_status_action(database_id, EnterReset))
339            }
340            DatabaseCheckpointControlStatus::Recovering(state) => match &mut state.stage {
341                DatabaseRecoveringStage::Resetting { .. } => {
342                    // ignore reported failure during resetting or backoff.
343                    None
344                }
345                DatabaseRecoveringStage::Initializing {
346                    initial_barrier_collector,
347                } => {
348                    warn!(database_id = %database_id, "failed to initialize database");
349                    let partial_graphs: HashSet<_> =
350                        initial_barrier_collector.all_partial_graphs().collect();
351                    let backoff_future = state.next_retry();
352                    partial_graph_manager.reset_partial_graphs(partial_graphs.iter().copied());
353                    state.metrics.recovery_failure_cnt.inc();
354                    state.stage = DatabaseRecoveringStage::Resetting {
355                        resetting_partial_graphs: partial_graphs,
356                        reset_resps: vec![],
357                        backoff_future: Some(backoff_future),
358                    };
359                    None
360                }
361            },
362        }
363    }
364}
365
366pub(crate) struct EnterInitializing(pub(crate) Vec<(WorkerId, ResetPartialGraphResponse)>);
367
368impl DatabaseStatusAction<'_, EnterInitializing> {
369    pub(crate) fn enter(
370        self,
371        runtime_info: DatabaseRuntimeInfoSnapshot,
372        rendered_info: RenderedDatabaseRuntimeInfo,
373        partial_graph_manager: &mut PartialGraphManager,
374    ) {
375        let database_status = self
376            .control
377            .databases
378            .get_mut(&self.database_id)
379            .expect("should exist");
380        let status = match database_status {
381            DatabaseCheckpointControlStatus::Running(_) => {
382                unreachable!("should not enter initializing when running")
383            }
384            DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
385                DatabaseRecoveringStage::Initializing { .. } => {
386                    unreachable!("can only enter initializing when resetting")
387                }
388                DatabaseRecoveringStage::Resetting { .. } => state,
389            },
390        };
391        let DatabaseRuntimeInfoSnapshot {
392            recovery_context,
393            mut state_table_committed_epochs,
394            mut state_table_log_epochs,
395            mut mv_depended_subscriptions,
396            mut background_jobs,
397            mut cdc_table_snapshot_splits,
398        } = runtime_info;
399        let fragment_relations = &recovery_context.fragment_relations;
400        let RenderedDatabaseRuntimeInfo {
401            job_infos,
402            stream_actors,
403            mut source_splits,
404        } = rendered_info;
405        let mut recoverer = partial_graph_manager.start_recover();
406        let result: MetaResult<_> = try {
407            recoverer.inject_database_initial_barrier(
408                self.database_id,
409                job_infos,
410                &recovery_context.job_extra_info,
411                &mut state_table_committed_epochs,
412                &mut state_table_log_epochs,
413                fragment_relations,
414                &stream_actors,
415                &mut source_splits,
416                &mut background_jobs,
417                &mut mv_depended_subscriptions,
418                false,
419                &self.control.hummock_version_stats,
420                &mut cdc_table_snapshot_splits,
421            )?
422        };
423        match result {
424            Ok(database) => {
425                let initializing_partial_graphs = recoverer.all_initializing();
426                info!(?initializing_partial_graphs, database_id = ?self.database_id, "database enter initializing");
427                status.stage = DatabaseRecoveringStage::Initializing {
428                    initial_barrier_collector: DatabaseInitialBarrierCollector {
429                        database_id: self.database_id,
430                        initializing_partial_graphs,
431                        database,
432                    }
433                    .into(),
434                };
435            }
436            Err(e) => {
437                warn!(
438                    database_id = %self.database_id,
439                    e = %e.as_report(),
440                    "failed to inject initial barrier"
441                );
442                let backoff_future = status.next_retry();
443                let resetting_partial_graphs = recoverer.failed();
444                status.metrics.recovery_failure_cnt.inc();
445                status.stage = DatabaseRecoveringStage::Resetting {
446                    resetting_partial_graphs,
447                    reset_resps: vec![],
448                    backoff_future: Some(backoff_future),
449                };
450            }
451        }
452    }
453
454    pub(crate) fn fail_reload_runtime_info(self, e: MetaError) {
455        let database_status = self
456            .control
457            .databases
458            .get_mut(&self.database_id)
459            .expect("should exist");
460        let status = match database_status {
461            DatabaseCheckpointControlStatus::Running(_) => {
462                unreachable!("should not enter initializing when running")
463            }
464            DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
465                DatabaseRecoveringStage::Initializing { .. } => {
466                    unreachable!("can only enter initializing when resetting")
467                }
468                DatabaseRecoveringStage::Resetting { .. } => state,
469            },
470        };
471        warn!(
472            database_id = %self.database_id,
473            e = %e.as_report(),
474            "failed to reload runtime info"
475        );
476        let backoff_future = status.next_retry();
477        status.metrics.recovery_failure_cnt.inc();
478        status.stage = DatabaseRecoveringStage::Resetting {
479            resetting_partial_graphs: HashSet::new(),
480            reset_resps: vec![],
481            backoff_future: Some(backoff_future),
482        };
483    }
484
485    pub(crate) fn remove(self) {
486        self.control
487            .databases
488            .remove(&self.database_id)
489            .expect("should exist");
490        self.control
491            .env
492            .shared_actor_infos()
493            .remove_database(self.database_id);
494    }
495}
496
497pub(crate) struct EnterRunning;
498
499impl DatabaseStatusAction<'_, EnterRunning> {
500    pub(crate) fn enter(self) {
501        info!(database_id = ?self.database_id, "database enter running");
502        let event_log_manager_ref = self.control.env.event_log_manager_ref();
503        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
504            EventRecovery::database_recovery_success(self.database_id.as_raw_id()),
505        )]);
506        let database_status = self
507            .control
508            .databases
509            .get_mut(&self.database_id)
510            .expect("should exist");
511        match database_status {
512            DatabaseCheckpointControlStatus::Running(_) => {
513                unreachable!("should not enter running again")
514            }
515            DatabaseCheckpointControlStatus::Recovering(state) => {
516                let temp_place_holder = DatabaseRecoveringStage::Resetting {
517                    resetting_partial_graphs: Default::default(),
518                    reset_resps: vec![],
519                    backoff_future: None,
520                };
521                match state.metrics.recovery_timer.take() {
522                    Some(recovery_timer) => {
523                        recovery_timer.observe_duration();
524                    }
525                    _ => {
526                        if cfg!(debug_assertions) {
527                            panic!(
528                                "take database {} recovery latency for twice",
529                                self.database_id
530                            )
531                        } else {
532                            warn!(database_id = %self.database_id,"failed to take recovery latency")
533                        }
534                    }
535                }
536                match replace(&mut state.stage, temp_place_holder) {
537                    DatabaseRecoveringStage::Resetting { .. } => {
538                        unreachable!("can only enter running during initializing")
539                    }
540                    DatabaseRecoveringStage::Initializing {
541                        initial_barrier_collector,
542                    } => {
543                        *database_status = DatabaseCheckpointControlStatus::Running(
544                            initial_barrier_collector.finish(),
545                        );
546                    }
547                }
548            }
549        }
550    }
551}