risingwave_meta/barrier/checkpoint/
recovery.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::mem::{replace, take};
17use std::task::{Context, Poll};
18
19use futures::FutureExt;
20use prometheus::{HistogramTimer, IntCounter};
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_meta_model::WorkerId;
23use risingwave_pb::meta::event_log::{Event, EventRecovery};
24use risingwave_pb::stream_service::BarrierCompleteResponse;
25use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
26use thiserror_ext::AsReport;
27use tracing::{info, warn};
28
29use crate::MetaResult;
30use crate::barrier::DatabaseRuntimeInfoSnapshot;
31use crate::barrier::checkpoint::control::DatabaseCheckpointControlStatus;
32use crate::barrier::checkpoint::creating_job::CreatingStreamingJobControl;
33use crate::barrier::checkpoint::{BarrierWorkerState, CheckpointControl};
34use crate::barrier::complete_task::BarrierCompleteOutput;
35use crate::barrier::edge_builder::FragmentEdgeBuilder;
36use crate::barrier::rpc::{ControlStreamManager, DatabaseInitialBarrierCollector};
37use crate::barrier::worker::{
38    RetryBackoffFuture, RetryBackoffStrategy, get_retry_backoff_strategy,
39};
40use crate::rpc::metrics::GLOBAL_META_METRICS;
41
42/// We can treat each database as a state machine of 3 states: `Running`, `Resetting` and `Initializing`.
43/// The state transition can be triggered when receiving 3 variants of response: `ReportDatabaseFailure`, `BarrierComplete`, `DatabaseReset`.
44/// The logic of state transition can be summarized as followed:
45///
46/// `Running`
47///     - on `ReportDatabaseFailure`
48///         - wait for the inflight B`arrierCompletingTask` to finish if there is any, mark the database as blocked in command queue
49///         - send `ResetDatabaseRequest` with `reset_request_id` as 0 to all CNs, and save `reset_request_id` and the set of nodes that need to collect response.
50///         - enter `Resetting` state.
51///     - on `BarrierComplete`: update the `DatabaseCheckpointControl`.
52///     - on `DatabaseReset`: unreachable
53/// `Resetting`
54///     - on `ReportDatabaseFailure` or `BarrierComplete`: ignore
55///     - on `DatabaseReset`:
56///         - if the `reset_request_id` in the response is less than the saved `reset_request_id`, ignore
57///         - otherwise, mark the CN as collected.
58///         - when all CNs have collected the response:
59///             - load the database runtime info from catalog manager and fragment manager
60///             - inject the initial barrier to CNs, save the set of nodes that need to collect response
61///             - enter `Initializing` state
62/// `Initializing`
63///     - on `BarrierComplete`:
64///         - mark the CN as collected
65///         - when all CNs have collected the response: enter Running
66///     - on `ReportDatabaseFailure`
67///         - increment the previously saved `reset_request_id`, and send `ResetDatabaseRequest` to all CNs
68///         - enter `Resetting`
69///     - on `DatabaseReset`: unreachable
70enum DatabaseRecoveringStage {
71    Resetting {
72        remaining_workers: HashSet<WorkerId>,
73        reset_resps: HashMap<WorkerId, ResetDatabaseResponse>,
74        reset_request_id: u32,
75        backoff_future: Option<RetryBackoffFuture>,
76    },
77    Initializing {
78        initial_barrier_collector: Box<DatabaseInitialBarrierCollector>,
79    },
80}
81
82pub(crate) struct DatabaseRecoveringState {
83    stage: DatabaseRecoveringStage,
84    next_reset_request_id: u32,
85    retry_backoff_strategy: RetryBackoffStrategy,
86    metrics: DatabaseRecoveryMetrics,
87}
88
89pub(super) enum RecoveringStateAction {
90    EnterInitializing(HashMap<WorkerId, ResetDatabaseResponse>),
91    EnterRunning,
92}
93
94struct DatabaseRecoveryMetrics {
95    recovery_failure_cnt: IntCounter,
96    recovery_timer: Option<HistogramTimer>,
97}
98
99impl DatabaseRecoveryMetrics {
100    fn new(database_id: DatabaseId) -> Self {
101        let database_id_str = format!("database {}", database_id.database_id);
102        Self {
103            recovery_failure_cnt: GLOBAL_META_METRICS
104                .recovery_failure_cnt
105                .with_label_values(&[database_id_str.as_str()]),
106            recovery_timer: Some(
107                GLOBAL_META_METRICS
108                    .recovery_latency
109                    .with_label_values(&[database_id_str.as_str()])
110                    .start_timer(),
111            ),
112        }
113    }
114}
115
116const INITIAL_RESET_REQUEST_ID: u32 = 0;
117
118impl DatabaseRecoveringState {
119    pub(super) fn resetting(
120        database_id: DatabaseId,
121        control_stream_manager: &mut ControlStreamManager,
122    ) -> Self {
123        let mut retry_backoff_strategy = get_retry_backoff_strategy();
124        let backoff_future = retry_backoff_strategy.next().unwrap();
125        let metrics = DatabaseRecoveryMetrics::new(database_id);
126        metrics.recovery_failure_cnt.inc();
127
128        Self {
129            stage: DatabaseRecoveringStage::Resetting {
130                remaining_workers: control_stream_manager
131                    .reset_database(database_id, INITIAL_RESET_REQUEST_ID),
132                reset_resps: Default::default(),
133                reset_request_id: INITIAL_RESET_REQUEST_ID,
134                backoff_future: Some(backoff_future),
135            },
136            next_reset_request_id: INITIAL_RESET_REQUEST_ID + 1,
137            retry_backoff_strategy,
138            metrics,
139        }
140    }
141
142    fn next_retry(&mut self) -> (RetryBackoffFuture, u32) {
143        let backoff_future = self
144            .retry_backoff_strategy
145            .next()
146            .expect("should not be empty");
147        let request_id = self.next_reset_request_id;
148        self.next_reset_request_id += 1;
149        (backoff_future, request_id)
150    }
151
152    pub(super) fn barrier_collected(
153        &mut self,
154        database_id: DatabaseId,
155        resp: BarrierCompleteResponse,
156    ) {
157        match &mut self.stage {
158            DatabaseRecoveringStage::Resetting { .. } => {
159                // ignore the collected barrier on resetting or backoff
160            }
161            DatabaseRecoveringStage::Initializing {
162                initial_barrier_collector,
163            } => {
164                let worker_id = resp.worker_id as WorkerId;
165                initial_barrier_collector.collect_resp(resp);
166                info!(
167                    ?database_id,
168                    worker_id,
169                    remaining_workers = ?initial_barrier_collector,
170                    "initializing database barrier collected"
171                );
172            }
173        }
174    }
175
176    pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
177        match &mut self.stage {
178            DatabaseRecoveringStage::Resetting {
179                remaining_workers, ..
180            } => {
181                remaining_workers.remove(&worker_id);
182                true
183            }
184            DatabaseRecoveringStage::Initializing {
185                initial_barrier_collector,
186                ..
187            } => initial_barrier_collector.is_valid_after_worker_err(worker_id),
188        }
189    }
190
191    pub(super) fn on_reset_database_resp(
192        &mut self,
193        worker_id: WorkerId,
194        resp: ResetDatabaseResponse,
195    ) {
196        match &mut self.stage {
197            DatabaseRecoveringStage::Resetting {
198                remaining_workers,
199                reset_resps,
200                reset_request_id,
201                ..
202            } => {
203                if resp.reset_request_id < *reset_request_id {
204                    info!(
205                        database_id = resp.database_id,
206                        worker_id,
207                        received_request_id = resp.reset_request_id,
208                        ongoing_request_id = reset_request_id,
209                        "ignore stale reset response"
210                    );
211                } else {
212                    assert_eq!(resp.reset_request_id, *reset_request_id);
213                    assert!(remaining_workers.remove(&worker_id));
214                    reset_resps
215                        .try_insert(worker_id, resp)
216                        .expect("non-duplicate");
217                }
218            }
219            DatabaseRecoveringStage::Initializing { .. } => {
220                unreachable!("all reset resp should have been received in Resetting")
221            }
222        }
223    }
224
225    pub(super) fn poll_next_event(&mut self, cx: &mut Context<'_>) -> Poll<RecoveringStateAction> {
226        match &mut self.stage {
227            DatabaseRecoveringStage::Resetting {
228                remaining_workers,
229                reset_resps,
230                backoff_future: backoff_future_option,
231                ..
232            } => {
233                let pass_backoff = if let Some(backoff_future) = backoff_future_option {
234                    if backoff_future.poll_unpin(cx).is_ready() {
235                        *backoff_future_option = None;
236                        true
237                    } else {
238                        false
239                    }
240                } else {
241                    true
242                };
243                if pass_backoff && remaining_workers.is_empty() {
244                    return Poll::Ready(RecoveringStateAction::EnterInitializing(take(
245                        reset_resps,
246                    )));
247                }
248            }
249            DatabaseRecoveringStage::Initializing {
250                initial_barrier_collector,
251                ..
252            } => {
253                if initial_barrier_collector.is_collected() {
254                    return Poll::Ready(RecoveringStateAction::EnterRunning);
255                }
256            }
257        }
258        Poll::Pending
259    }
260
261    pub(super) fn database_state(
262        &self,
263    ) -> Option<(
264        &BarrierWorkerState,
265        &HashMap<TableId, CreatingStreamingJobControl>,
266    )> {
267        match &self.stage {
268            DatabaseRecoveringStage::Resetting { .. } => None,
269            DatabaseRecoveringStage::Initializing {
270                initial_barrier_collector,
271                ..
272            } => Some(initial_barrier_collector.database_state()),
273        }
274    }
275}
276
277pub(crate) struct DatabaseStatusAction<'a, A> {
278    control: &'a mut CheckpointControl,
279    database_id: DatabaseId,
280    pub(crate) action: A,
281}
282
283impl<A> DatabaseStatusAction<'_, A> {
284    pub(crate) fn database_id(&self) -> DatabaseId {
285        self.database_id
286    }
287}
288
289impl CheckpointControl {
290    pub(super) fn new_database_status_action<A>(
291        &mut self,
292        database_id: DatabaseId,
293        action: A,
294    ) -> DatabaseStatusAction<'_, A> {
295        DatabaseStatusAction {
296            control: self,
297            database_id,
298            action,
299        }
300    }
301}
302
303pub(crate) struct EnterReset;
304
305impl DatabaseStatusAction<'_, EnterReset> {
306    pub(crate) fn enter(
307        self,
308        barrier_complete_output: Option<BarrierCompleteOutput>,
309        control_stream_manager: &mut ControlStreamManager,
310    ) {
311        let event_log_manager_ref = self.control.env.event_log_manager_ref();
312        if let Some(output) = barrier_complete_output {
313            self.control.ack_completed(output);
314        }
315        let database_status = self
316            .control
317            .databases
318            .get_mut(&self.database_id)
319            .expect("should exist");
320        match database_status {
321            DatabaseCheckpointControlStatus::Running(_) => {
322                let reset_request_id = INITIAL_RESET_REQUEST_ID;
323                let remaining_workers =
324                    control_stream_manager.reset_database(self.database_id, reset_request_id);
325                let metrics = DatabaseRecoveryMetrics::new(self.database_id);
326                event_log_manager_ref.add_event_logs(vec![Event::Recovery(
327                    EventRecovery::database_recovery_start(self.database_id.database_id),
328                )]);
329                *database_status =
330                    DatabaseCheckpointControlStatus::Recovering(DatabaseRecoveringState {
331                        stage: DatabaseRecoveringStage::Resetting {
332                            remaining_workers,
333                            reset_resps: Default::default(),
334                            reset_request_id,
335                            backoff_future: None,
336                        },
337                        next_reset_request_id: reset_request_id + 1,
338                        retry_backoff_strategy: get_retry_backoff_strategy(),
339                        metrics,
340                    });
341            }
342            DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
343                DatabaseRecoveringStage::Resetting { .. } => {
344                    unreachable!("should not enter resetting again")
345                }
346                DatabaseRecoveringStage::Initializing { .. } => {
347                    event_log_manager_ref.add_event_logs(vec![Event::Recovery(
348                        EventRecovery::database_recovery_failure(self.database_id.database_id),
349                    )]);
350                    let (backoff_future, reset_request_id) = state.next_retry();
351                    let remaining_workers =
352                        control_stream_manager.reset_database(self.database_id, reset_request_id);
353                    state.metrics.recovery_failure_cnt.inc();
354                    state.stage = DatabaseRecoveringStage::Resetting {
355                        remaining_workers,
356                        reset_resps: Default::default(),
357                        reset_request_id,
358                        backoff_future: Some(backoff_future),
359                    };
360                }
361            },
362        }
363    }
364}
365
366impl CheckpointControl {
367    pub(crate) fn on_report_failure(
368        &mut self,
369        database_id: DatabaseId,
370        control_stream_manager: &mut ControlStreamManager,
371    ) -> Option<DatabaseStatusAction<'_, EnterReset>> {
372        let database_status = self.databases.get_mut(&database_id).expect("should exist");
373        match database_status {
374            DatabaseCheckpointControlStatus::Running(_) => {
375                Some(self.new_database_status_action(database_id, EnterReset))
376            }
377            DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
378                DatabaseRecoveringStage::Resetting { .. } => {
379                    // ignore reported failure during resetting or backoff.
380                    None
381                }
382                DatabaseRecoveringStage::Initializing { .. } => {
383                    warn!(database_id = database_id.database_id, "");
384                    let (backoff_future, reset_request_id) = state.next_retry();
385                    let remaining_workers =
386                        control_stream_manager.reset_database(database_id, reset_request_id);
387                    state.metrics.recovery_failure_cnt.inc();
388                    state.stage = DatabaseRecoveringStage::Resetting {
389                        remaining_workers,
390                        reset_resps: Default::default(),
391                        reset_request_id,
392                        backoff_future: Some(backoff_future),
393                    };
394                    None
395                }
396            },
397        }
398    }
399}
400
401pub(crate) struct EnterInitializing(pub(crate) HashMap<WorkerId, ResetDatabaseResponse>);
402
403impl DatabaseStatusAction<'_, EnterInitializing> {
404    pub(crate) fn control(&self) -> &CheckpointControl {
405        &*self.control
406    }
407
408    pub(crate) fn enter(
409        self,
410        runtime_info: DatabaseRuntimeInfoSnapshot,
411        control_stream_manager: &mut ControlStreamManager,
412    ) {
413        let database_status = self
414            .control
415            .databases
416            .get_mut(&self.database_id)
417            .expect("should exist");
418        let status = match database_status {
419            DatabaseCheckpointControlStatus::Running(_) => {
420                unreachable!("should not enter initializing when running")
421            }
422            DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
423                DatabaseRecoveringStage::Initializing { .. } => {
424                    unreachable!("can only enter initializing when resetting")
425                }
426                DatabaseRecoveringStage::Resetting { .. } => state,
427            },
428        };
429        let DatabaseRuntimeInfoSnapshot {
430            job_infos,
431            mut state_table_committed_epochs,
432            mut state_table_log_epochs,
433            subscription_info,
434            stream_actors,
435            fragment_relations,
436            mut source_splits,
437            mut background_jobs,
438        } = runtime_info;
439        let result: MetaResult<_> = try {
440            let mut builder =
441                FragmentEdgeBuilder::new(job_infos.values().flatten(), control_stream_manager);
442            builder.add_relations(&fragment_relations);
443            let mut edges = builder.build();
444            control_stream_manager.inject_database_initial_barrier(
445                self.database_id,
446                job_infos,
447                &mut state_table_committed_epochs,
448                &mut state_table_log_epochs,
449                &mut edges,
450                &stream_actors,
451                &mut source_splits,
452                &mut background_jobs,
453                subscription_info,
454                false,
455                &self.control.hummock_version_stats,
456            )?
457        };
458        match result {
459            Ok(initial_barrier_collector) => {
460                info!(node_to_collect = ?initial_barrier_collector, database_id = ?self.database_id, "database enter initializing");
461                status.stage = DatabaseRecoveringStage::Initializing {
462                    initial_barrier_collector: initial_barrier_collector.into(),
463                };
464            }
465            Err(e) => {
466                warn!(
467                    database_id = self.database_id.database_id,
468                    e = %e.as_report(),
469                    "failed to inject initial barrier"
470                );
471                let (backoff_future, reset_request_id) = status.next_retry();
472                let remaining_workers =
473                    control_stream_manager.reset_database(self.database_id, reset_request_id);
474                status.metrics.recovery_failure_cnt.inc();
475                status.stage = DatabaseRecoveringStage::Resetting {
476                    remaining_workers,
477                    reset_resps: Default::default(),
478                    reset_request_id,
479                    backoff_future: Some(backoff_future),
480                };
481            }
482        }
483    }
484
485    pub(crate) fn remove(self) {
486        self.control
487            .databases
488            .remove(&self.database_id)
489            .expect("should exist");
490    }
491}
492
493pub(crate) struct EnterRunning;
494
495impl DatabaseStatusAction<'_, EnterRunning> {
496    pub(crate) fn enter(self) {
497        info!(database_id = ?self.database_id, "database enter running");
498        let event_log_manager_ref = self.control.env.event_log_manager_ref();
499        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
500            EventRecovery::database_recovery_success(self.database_id.database_id),
501        )]);
502        let database_status = self
503            .control
504            .databases
505            .get_mut(&self.database_id)
506            .expect("should exist");
507        match database_status {
508            DatabaseCheckpointControlStatus::Running(_) => {
509                unreachable!("should not enter running again")
510            }
511            DatabaseCheckpointControlStatus::Recovering(state) => {
512                let temp_place_holder = DatabaseRecoveringStage::Resetting {
513                    remaining_workers: Default::default(),
514                    reset_resps: Default::default(),
515                    reset_request_id: 0,
516                    backoff_future: None,
517                };
518                match state.metrics.recovery_timer.take() {
519                    Some(recovery_timer) => {
520                        recovery_timer.observe_duration();
521                    }
522                    _ => {
523                        if cfg!(debug_assertions) {
524                            panic!(
525                                "take database {} recovery latency for twice",
526                                self.database_id
527                            )
528                        } else {
529                            warn!(database_id = %self.database_id,"failed to take recovery latency")
530                        }
531                    }
532                }
533                match replace(&mut state.stage, temp_place_holder) {
534                    DatabaseRecoveringStage::Resetting { .. } => {
535                        unreachable!("can only enter running during initializing")
536                    }
537                    DatabaseRecoveringStage::Initializing {
538                        initial_barrier_collector,
539                    } => {
540                        *database_status = DatabaseCheckpointControlStatus::Running(
541                            initial_barrier_collector.finish(),
542                        );
543                    }
544                }
545            }
546        }
547    }
548}