risingwave_meta/barrier/checkpoint/
recovery.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::mem::{replace, take};
17use std::sync::LazyLock;
18use std::task::{Context, Poll};
19
20use futures::FutureExt;
21use prometheus::{HistogramTimer, IntCounter};
22use risingwave_common::catalog::{DatabaseId, TableId};
23use risingwave_meta_model::WorkerId;
24use risingwave_pb::meta::event_log::{Event, EventRecovery};
25use risingwave_pb::stream_service::BarrierCompleteResponse;
26use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
27use thiserror_ext::AsReport;
28use tracing::{info, warn};
29
30use crate::MetaResult;
31use crate::barrier::DatabaseRuntimeInfoSnapshot;
32use crate::barrier::checkpoint::control::DatabaseCheckpointControlStatus;
33use crate::barrier::checkpoint::creating_job::CreatingStreamingJobControl;
34use crate::barrier::checkpoint::{BarrierWorkerState, CheckpointControl};
35use crate::barrier::complete_task::BarrierCompleteOutput;
36use crate::barrier::edge_builder::FragmentEdgeBuilder;
37use crate::barrier::rpc::{ControlStreamManager, DatabaseInitialBarrierCollector};
38use crate::barrier::worker::{
39    RetryBackoffFuture, RetryBackoffStrategy, get_retry_backoff_strategy,
40};
41use crate::rpc::metrics::GLOBAL_META_METRICS;
42
43/// We can treat each database as a state machine of 3 states: `Running`, `Resetting` and `Initializing`.
44/// The state transition can be triggered when receiving 3 variants of response: `ReportDatabaseFailure`, `BarrierComplete`, `DatabaseReset`.
45/// The logic of state transition can be summarized as followed:
46///
47/// `Running`
48///     - on `ReportDatabaseFailure`
49///         - wait for the inflight B`arrierCompletingTask` to finish if there is any, mark the database as blocked in command queue
50///         - send `ResetDatabaseRequest` with `reset_request_id` as 0 to all CNs, and save `reset_request_id` and the set of nodes that need to collect response.
51///         - enter `Resetting` state.
52///     - on `BarrierComplete`: update the `DatabaseCheckpointControl`.
53///     - on `DatabaseReset`: unreachable
54/// `Resetting`
55///     - on `ReportDatabaseFailure` or `BarrierComplete`: ignore
56///     - on `DatabaseReset`:
57///         - if the `reset_request_id` in the response is less than the saved `reset_request_id`, ignore
58///         - otherwise, mark the CN as collected.
59///         - when all CNs have collected the response:
60///             - load the database runtime info from catalog manager and fragment manager
61///             - inject the initial barrier to CNs, save the set of nodes that need to collect response
62///             - enter `Initializing` state
63/// `Initializing`
64///     - on `BarrierComplete`:
65///         - mark the CN as collected
66///         - when all CNs have collected the response: enter Running
67///     - on `ReportDatabaseFailure`
68///         - increment the previously saved `reset_request_id`, and send `ResetDatabaseRequest` to all CNs
69///         - enter `Resetting`
70///     - on `DatabaseReset`: unreachable
71enum DatabaseRecoveringStage {
72    Resetting {
73        remaining_workers: HashSet<WorkerId>,
74        reset_resps: HashMap<WorkerId, ResetDatabaseResponse>,
75        reset_request_id: u32,
76        backoff_future: Option<RetryBackoffFuture>,
77    },
78    Initializing {
79        initial_barrier_collector: Box<DatabaseInitialBarrierCollector>,
80    },
81}
82
83pub(crate) struct DatabaseRecoveringState {
84    stage: DatabaseRecoveringStage,
85    next_reset_request_id: u32,
86    retry_backoff_strategy: RetryBackoffStrategy,
87    metrics: DatabaseRecoveryMetrics,
88}
89
90pub(super) enum RecoveringStateAction {
91    EnterInitializing(HashMap<WorkerId, ResetDatabaseResponse>),
92    EnterRunning,
93}
94
95struct DatabaseRecoveryMetrics {
96    recovery_failure_cnt: IntCounter,
97    recovery_timer: Option<HistogramTimer>,
98}
99
100impl DatabaseRecoveryMetrics {
101    fn new(database_id: DatabaseId) -> Self {
102        let database_id_str = format!("database {}", database_id.database_id);
103        Self {
104            recovery_failure_cnt: GLOBAL_META_METRICS
105                .recovery_failure_cnt
106                .with_label_values(&[database_id_str.as_str()]),
107            recovery_timer: Some(
108                GLOBAL_META_METRICS
109                    .recovery_latency
110                    .with_label_values(&[database_id_str.as_str()])
111                    .start_timer(),
112            ),
113        }
114    }
115}
116
117const INITIAL_RESET_REQUEST_ID: u32 = 0;
118
119impl DatabaseRecoveringState {
120    pub(super) fn resetting(
121        database_id: DatabaseId,
122        control_stream_manager: &mut ControlStreamManager,
123    ) -> Self {
124        let mut retry_backoff_strategy = get_retry_backoff_strategy();
125        let backoff_future = retry_backoff_strategy.next().unwrap();
126        let metrics = DatabaseRecoveryMetrics::new(database_id);
127        metrics.recovery_failure_cnt.inc();
128
129        Self {
130            stage: DatabaseRecoveringStage::Resetting {
131                remaining_workers: control_stream_manager
132                    .reset_database(database_id, INITIAL_RESET_REQUEST_ID),
133                reset_resps: Default::default(),
134                reset_request_id: INITIAL_RESET_REQUEST_ID,
135                backoff_future: Some(backoff_future),
136            },
137            next_reset_request_id: INITIAL_RESET_REQUEST_ID + 1,
138            retry_backoff_strategy,
139            metrics,
140        }
141    }
142
143    fn next_retry(&mut self) -> (RetryBackoffFuture, u32) {
144        let backoff_future = self
145            .retry_backoff_strategy
146            .next()
147            .expect("should not be empty");
148        let request_id = self.next_reset_request_id;
149        self.next_reset_request_id += 1;
150        (backoff_future, request_id)
151    }
152
153    pub(super) fn barrier_collected(
154        &mut self,
155        database_id: DatabaseId,
156        resp: BarrierCompleteResponse,
157    ) {
158        match &mut self.stage {
159            DatabaseRecoveringStage::Resetting { .. } => {
160                // ignore the collected barrier on resetting or backoff
161            }
162            DatabaseRecoveringStage::Initializing {
163                initial_barrier_collector,
164            } => {
165                let worker_id = resp.worker_id as WorkerId;
166                initial_barrier_collector.collect_resp(resp);
167                info!(
168                    ?database_id,
169                    worker_id,
170                    remaining_workers = ?initial_barrier_collector,
171                    "initializing database barrier collected"
172                );
173            }
174        }
175    }
176
177    pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
178        match &mut self.stage {
179            DatabaseRecoveringStage::Resetting {
180                remaining_workers, ..
181            } => {
182                remaining_workers.remove(&worker_id);
183                true
184            }
185            DatabaseRecoveringStage::Initializing {
186                initial_barrier_collector,
187                ..
188            } => initial_barrier_collector.is_valid_after_worker_err(worker_id),
189        }
190    }
191
192    pub(super) fn on_reset_database_resp(
193        &mut self,
194        worker_id: WorkerId,
195        resp: ResetDatabaseResponse,
196    ) {
197        match &mut self.stage {
198            DatabaseRecoveringStage::Resetting {
199                remaining_workers,
200                reset_resps,
201                reset_request_id,
202                ..
203            } => {
204                if resp.reset_request_id < *reset_request_id {
205                    info!(
206                        database_id = resp.database_id,
207                        worker_id,
208                        received_request_id = resp.reset_request_id,
209                        ongoing_request_id = reset_request_id,
210                        "ignore stale reset response"
211                    );
212                } else {
213                    assert_eq!(resp.reset_request_id, *reset_request_id);
214                    assert!(remaining_workers.remove(&worker_id));
215                    reset_resps
216                        .try_insert(worker_id, resp)
217                        .expect("non-duplicate");
218                }
219            }
220            DatabaseRecoveringStage::Initializing { .. } => {
221                unreachable!("all reset resp should have been received in Resetting")
222            }
223        }
224    }
225
226    pub(super) fn poll_next_event(&mut self, cx: &mut Context<'_>) -> Poll<RecoveringStateAction> {
227        match &mut self.stage {
228            DatabaseRecoveringStage::Resetting {
229                remaining_workers,
230                reset_resps,
231                backoff_future: backoff_future_option,
232                ..
233            } => {
234                let pass_backoff = if let Some(backoff_future) = backoff_future_option {
235                    if backoff_future.poll_unpin(cx).is_ready() {
236                        *backoff_future_option = None;
237                        true
238                    } else {
239                        false
240                    }
241                } else {
242                    true
243                };
244                if pass_backoff && remaining_workers.is_empty() {
245                    return Poll::Ready(RecoveringStateAction::EnterInitializing(take(
246                        reset_resps,
247                    )));
248                }
249            }
250            DatabaseRecoveringStage::Initializing {
251                initial_barrier_collector,
252                ..
253            } => {
254                if initial_barrier_collector.is_collected() {
255                    return Poll::Ready(RecoveringStateAction::EnterRunning);
256                }
257            }
258        }
259        Poll::Pending
260    }
261
262    pub(super) fn database_state(
263        &self,
264    ) -> Option<(
265        &BarrierWorkerState,
266        &HashMap<TableId, CreatingStreamingJobControl>,
267    )> {
268        match &self.stage {
269            DatabaseRecoveringStage::Resetting { .. } => None,
270            DatabaseRecoveringStage::Initializing {
271                initial_barrier_collector,
272                ..
273            } => Some((initial_barrier_collector.database_state(), {
274                static EMPTY_CREATING_JOBS: LazyLock<
275                    HashMap<TableId, CreatingStreamingJobControl>,
276                > = LazyLock::new(HashMap::new);
277                &EMPTY_CREATING_JOBS
278            })),
279        }
280    }
281}
282
283pub(crate) struct DatabaseStatusAction<'a, A> {
284    control: &'a mut CheckpointControl,
285    database_id: DatabaseId,
286    pub(crate) action: A,
287}
288
289impl<A> DatabaseStatusAction<'_, A> {
290    pub(crate) fn database_id(&self) -> DatabaseId {
291        self.database_id
292    }
293}
294
295impl CheckpointControl {
296    pub(super) fn new_database_status_action<A>(
297        &mut self,
298        database_id: DatabaseId,
299        action: A,
300    ) -> DatabaseStatusAction<'_, A> {
301        DatabaseStatusAction {
302            control: self,
303            database_id,
304            action,
305        }
306    }
307}
308
309pub(crate) struct EnterReset;
310
311impl DatabaseStatusAction<'_, EnterReset> {
312    pub(crate) fn enter(
313        self,
314        barrier_complete_output: Option<BarrierCompleteOutput>,
315        control_stream_manager: &mut ControlStreamManager,
316    ) {
317        let event_log_manager_ref = self.control.env.event_log_manager_ref();
318        if let Some(output) = barrier_complete_output {
319            self.control.ack_completed(output);
320        }
321        let database_status = self
322            .control
323            .databases
324            .get_mut(&self.database_id)
325            .expect("should exist");
326        match database_status {
327            DatabaseCheckpointControlStatus::Running(_) => {
328                let reset_request_id = INITIAL_RESET_REQUEST_ID;
329                let remaining_workers =
330                    control_stream_manager.reset_database(self.database_id, reset_request_id);
331                let metrics = DatabaseRecoveryMetrics::new(self.database_id);
332                event_log_manager_ref.add_event_logs(vec![Event::Recovery(
333                    EventRecovery::database_recovery_start(self.database_id.database_id),
334                )]);
335                *database_status =
336                    DatabaseCheckpointControlStatus::Recovering(DatabaseRecoveringState {
337                        stage: DatabaseRecoveringStage::Resetting {
338                            remaining_workers,
339                            reset_resps: Default::default(),
340                            reset_request_id,
341                            backoff_future: None,
342                        },
343                        next_reset_request_id: reset_request_id + 1,
344                        retry_backoff_strategy: get_retry_backoff_strategy(),
345                        metrics,
346                    });
347            }
348            DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
349                DatabaseRecoveringStage::Resetting { .. } => {
350                    unreachable!("should not enter resetting again")
351                }
352                DatabaseRecoveringStage::Initializing { .. } => {
353                    event_log_manager_ref.add_event_logs(vec![Event::Recovery(
354                        EventRecovery::database_recovery_failure(self.database_id.database_id),
355                    )]);
356                    let (backoff_future, reset_request_id) = state.next_retry();
357                    let remaining_workers =
358                        control_stream_manager.reset_database(self.database_id, reset_request_id);
359                    state.metrics.recovery_failure_cnt.inc();
360                    state.stage = DatabaseRecoveringStage::Resetting {
361                        remaining_workers,
362                        reset_resps: Default::default(),
363                        reset_request_id,
364                        backoff_future: Some(backoff_future),
365                    };
366                }
367            },
368        }
369    }
370}
371
372impl CheckpointControl {
373    pub(crate) fn on_report_failure(
374        &mut self,
375        database_id: DatabaseId,
376        control_stream_manager: &mut ControlStreamManager,
377    ) -> Option<DatabaseStatusAction<'_, EnterReset>> {
378        let database_status = self.databases.get_mut(&database_id).expect("should exist");
379        match database_status {
380            DatabaseCheckpointControlStatus::Running(_) => {
381                Some(self.new_database_status_action(database_id, EnterReset))
382            }
383            DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
384                DatabaseRecoveringStage::Resetting { .. } => {
385                    // ignore reported failure during resetting or backoff.
386                    None
387                }
388                DatabaseRecoveringStage::Initializing { .. } => {
389                    warn!(database_id = database_id.database_id, "");
390                    let (backoff_future, reset_request_id) = state.next_retry();
391                    let remaining_workers =
392                        control_stream_manager.reset_database(database_id, reset_request_id);
393                    state.metrics.recovery_failure_cnt.inc();
394                    state.stage = DatabaseRecoveringStage::Resetting {
395                        remaining_workers,
396                        reset_resps: Default::default(),
397                        reset_request_id,
398                        backoff_future: Some(backoff_future),
399                    };
400                    None
401                }
402            },
403        }
404    }
405}
406
407pub(crate) struct EnterInitializing(pub(crate) HashMap<WorkerId, ResetDatabaseResponse>);
408
409impl DatabaseStatusAction<'_, EnterInitializing> {
410    pub(crate) fn control(&self) -> &CheckpointControl {
411        &*self.control
412    }
413
414    pub(crate) fn enter(
415        self,
416        runtime_info: DatabaseRuntimeInfoSnapshot,
417        control_stream_manager: &mut ControlStreamManager,
418    ) {
419        let database_status = self
420            .control
421            .databases
422            .get_mut(&self.database_id)
423            .expect("should exist");
424        let status = match database_status {
425            DatabaseCheckpointControlStatus::Running(_) => {
426                unreachable!("should not enter initializing when running")
427            }
428            DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
429                DatabaseRecoveringStage::Initializing { .. } => {
430                    unreachable!("can only enter initializing when resetting")
431                }
432                DatabaseRecoveringStage::Resetting { .. } => state,
433            },
434        };
435        let DatabaseRuntimeInfoSnapshot {
436            database_fragment_info,
437            mut state_table_committed_epochs,
438            subscription_info,
439            stream_actors,
440            fragment_relations,
441            mut source_splits,
442            mut background_jobs,
443        } = runtime_info;
444        let result: MetaResult<_> = try {
445            let mut builder = FragmentEdgeBuilder::new(database_fragment_info.fragment_infos());
446            builder.add_relations(&fragment_relations);
447            let mut edges = builder.build();
448            control_stream_manager.inject_database_initial_barrier(
449                self.database_id,
450                database_fragment_info,
451                &mut state_table_committed_epochs,
452                &mut edges,
453                &stream_actors,
454                &mut source_splits,
455                &mut background_jobs,
456                subscription_info,
457                false,
458                &self.control.hummock_version_stats,
459            )?
460        };
461        match result {
462            Ok(initial_barrier_collector) => {
463                info!(node_to_collect = ?initial_barrier_collector, database_id = ?self.database_id, "database enter initializing");
464                status.stage = DatabaseRecoveringStage::Initializing {
465                    initial_barrier_collector: initial_barrier_collector.into(),
466                };
467            }
468            Err(e) => {
469                warn!(
470                    database_id = self.database_id.database_id,
471                    e = %e.as_report(),
472                    "failed to inject initial barrier"
473                );
474                let (backoff_future, reset_request_id) = status.next_retry();
475                let remaining_workers =
476                    control_stream_manager.reset_database(self.database_id, reset_request_id);
477                status.metrics.recovery_failure_cnt.inc();
478                status.stage = DatabaseRecoveringStage::Resetting {
479                    remaining_workers,
480                    reset_resps: Default::default(),
481                    reset_request_id,
482                    backoff_future: Some(backoff_future),
483                };
484            }
485        }
486    }
487
488    pub(crate) fn remove(self) {
489        self.control
490            .databases
491            .remove(&self.database_id)
492            .expect("should exist");
493    }
494}
495
496pub(crate) struct EnterRunning;
497
498impl DatabaseStatusAction<'_, EnterRunning> {
499    pub(crate) fn enter(self) {
500        info!(database_id = ?self.database_id, "database enter running");
501        let event_log_manager_ref = self.control.env.event_log_manager_ref();
502        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
503            EventRecovery::database_recovery_success(self.database_id.database_id),
504        )]);
505        let database_status = self
506            .control
507            .databases
508            .get_mut(&self.database_id)
509            .expect("should exist");
510        match database_status {
511            DatabaseCheckpointControlStatus::Running(_) => {
512                unreachable!("should not enter running again")
513            }
514            DatabaseCheckpointControlStatus::Recovering(state) => {
515                let temp_place_holder = DatabaseRecoveringStage::Resetting {
516                    remaining_workers: Default::default(),
517                    reset_resps: Default::default(),
518                    reset_request_id: 0,
519                    backoff_future: None,
520                };
521                match state.metrics.recovery_timer.take() {
522                    Some(recovery_timer) => {
523                        recovery_timer.observe_duration();
524                    }
525                    _ => {
526                        if cfg!(debug_assertions) {
527                            panic!(
528                                "take database {} recovery latency for twice",
529                                self.database_id
530                            )
531                        } else {
532                            warn!(database_id = %self.database_id,"failed to take recovery latency")
533                        }
534                    }
535                }
536                match replace(&mut state.stage, temp_place_holder) {
537                    DatabaseRecoveringStage::Resetting { .. } => {
538                        unreachable!("can only enter running during initializing")
539                    }
540                    DatabaseRecoveringStage::Initializing {
541                        initial_barrier_collector,
542                    } => {
543                        *database_status = DatabaseCheckpointControlStatus::Running(
544                            initial_barrier_collector.finish(),
545                        );
546                    }
547                }
548            }
549        }
550    }
551}