risingwave_meta/barrier/checkpoint/
recovery.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::mem::{replace, take};
17use std::task::{Context, Poll};
18
19use futures::FutureExt;
20use prometheus::{HistogramTimer, IntCounter};
21use risingwave_common::catalog::DatabaseId;
22use risingwave_common::id::JobId;
23use risingwave_meta_model::WorkerId;
24use risingwave_pb::meta::event_log::{Event, EventRecovery};
25use risingwave_pb::stream_service::BarrierCompleteResponse;
26use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
27use thiserror_ext::AsReport;
28use tracing::{info, warn};
29
30use crate::MetaResult;
31use crate::barrier::DatabaseRuntimeInfoSnapshot;
32use crate::barrier::checkpoint::control::DatabaseCheckpointControlStatus;
33use crate::barrier::checkpoint::creating_job::CreatingStreamingJobControl;
34use crate::barrier::checkpoint::{BarrierWorkerState, CheckpointControl};
35use crate::barrier::complete_task::BarrierCompleteOutput;
36use crate::barrier::edge_builder::FragmentEdgeBuilder;
37use crate::barrier::rpc::{ControlStreamManager, DatabaseInitialBarrierCollector};
38use crate::barrier::worker::{
39    RetryBackoffFuture, RetryBackoffStrategy, get_retry_backoff_strategy,
40};
41use crate::rpc::metrics::GLOBAL_META_METRICS;
42
43/// We can treat each database as a state machine of 3 states: `Running`, `Resetting` and `Initializing`.
44/// The state transition can be triggered when receiving 3 variants of response: `ReportDatabaseFailure`, `BarrierComplete`, `DatabaseReset`.
45/// The logic of state transition can be summarized as followed:
46///
47/// `Running`
48///     - on `ReportDatabaseFailure`
49///         - wait for the inflight B`arrierCompletingTask` to finish if there is any, mark the database as blocked in command queue
50///         - send `ResetDatabaseRequest` with `reset_request_id` as 0 to all CNs, and save `reset_request_id` and the set of nodes that need to collect response.
51///         - enter `Resetting` state.
52///     - on `BarrierComplete`: update the `DatabaseCheckpointControl`.
53///     - on `DatabaseReset`: unreachable
54/// `Resetting`
55///     - on `ReportDatabaseFailure` or `BarrierComplete`: ignore
56///     - on `DatabaseReset`:
57///         - if the `reset_request_id` in the response is less than the saved `reset_request_id`, ignore
58///         - otherwise, mark the CN as collected.
59///         - when all CNs have collected the response:
60///             - load the database runtime info from catalog manager and fragment manager
61///             - inject the initial barrier to CNs, save the set of nodes that need to collect response
62///             - enter `Initializing` state
63/// `Initializing`
64///     - on `BarrierComplete`:
65///         - mark the CN as collected
66///         - when all CNs have collected the response: enter Running
67///     - on `ReportDatabaseFailure`
68///         - increment the previously saved `reset_request_id`, and send `ResetDatabaseRequest` to all CNs
69///         - enter `Resetting`
70///     - on `DatabaseReset`: unreachable
71enum DatabaseRecoveringStage {
72    Resetting {
73        remaining_workers: HashSet<WorkerId>,
74        reset_resps: HashMap<WorkerId, ResetDatabaseResponse>,
75        reset_request_id: u32,
76        backoff_future: Option<RetryBackoffFuture>,
77    },
78    Initializing {
79        initial_barrier_collector: Box<DatabaseInitialBarrierCollector>,
80    },
81}
82
83pub(crate) struct DatabaseRecoveringState {
84    stage: DatabaseRecoveringStage,
85    next_reset_request_id: u32,
86    retry_backoff_strategy: RetryBackoffStrategy,
87    metrics: DatabaseRecoveryMetrics,
88}
89
90pub(super) enum RecoveringStateAction {
91    EnterInitializing(HashMap<WorkerId, ResetDatabaseResponse>),
92    EnterRunning,
93}
94
95struct DatabaseRecoveryMetrics {
96    recovery_failure_cnt: IntCounter,
97    recovery_timer: Option<HistogramTimer>,
98}
99
100impl DatabaseRecoveryMetrics {
101    fn new(database_id: DatabaseId) -> Self {
102        let database_id_str = format!("database {}", database_id);
103        Self {
104            recovery_failure_cnt: GLOBAL_META_METRICS
105                .recovery_failure_cnt
106                .with_label_values(&[database_id_str.as_str()]),
107            recovery_timer: Some(
108                GLOBAL_META_METRICS
109                    .recovery_latency
110                    .with_label_values(&[database_id_str.as_str()])
111                    .start_timer(),
112            ),
113        }
114    }
115}
116
117const INITIAL_RESET_REQUEST_ID: u32 = 0;
118
119impl DatabaseRecoveringState {
120    pub(super) fn resetting(
121        database_id: DatabaseId,
122        control_stream_manager: &mut ControlStreamManager,
123    ) -> Self {
124        let mut retry_backoff_strategy = get_retry_backoff_strategy();
125        let backoff_future = retry_backoff_strategy.next().unwrap();
126        let metrics = DatabaseRecoveryMetrics::new(database_id);
127        metrics.recovery_failure_cnt.inc();
128
129        Self {
130            stage: DatabaseRecoveringStage::Resetting {
131                remaining_workers: control_stream_manager
132                    .reset_database(database_id, INITIAL_RESET_REQUEST_ID),
133                reset_resps: Default::default(),
134                reset_request_id: INITIAL_RESET_REQUEST_ID,
135                backoff_future: Some(backoff_future),
136            },
137            next_reset_request_id: INITIAL_RESET_REQUEST_ID + 1,
138            retry_backoff_strategy,
139            metrics,
140        }
141    }
142
143    fn next_retry(&mut self) -> (RetryBackoffFuture, u32) {
144        let backoff_future = self
145            .retry_backoff_strategy
146            .next()
147            .expect("should not be empty");
148        let request_id = self.next_reset_request_id;
149        self.next_reset_request_id += 1;
150        (backoff_future, request_id)
151    }
152
153    pub(super) fn barrier_collected(
154        &mut self,
155        database_id: DatabaseId,
156        resp: BarrierCompleteResponse,
157    ) {
158        match &mut self.stage {
159            DatabaseRecoveringStage::Resetting { .. } => {
160                // ignore the collected barrier on resetting or backoff
161            }
162            DatabaseRecoveringStage::Initializing {
163                initial_barrier_collector,
164            } => {
165                let worker_id = resp.worker_id;
166                initial_barrier_collector.collect_resp(resp);
167                info!(
168                    ?database_id,
169                    %worker_id,
170                    remaining_workers = ?initial_barrier_collector,
171                    "initializing database barrier collected"
172                );
173            }
174        }
175    }
176
177    pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
178        match &mut self.stage {
179            DatabaseRecoveringStage::Resetting {
180                remaining_workers, ..
181            } => {
182                remaining_workers.remove(&worker_id);
183                true
184            }
185            DatabaseRecoveringStage::Initializing {
186                initial_barrier_collector,
187                ..
188            } => initial_barrier_collector.is_valid_after_worker_err(worker_id),
189        }
190    }
191
192    pub(super) fn on_reset_database_resp(
193        &mut self,
194        worker_id: WorkerId,
195        resp: ResetDatabaseResponse,
196    ) {
197        match &mut self.stage {
198            DatabaseRecoveringStage::Resetting {
199                remaining_workers,
200                reset_resps,
201                reset_request_id,
202                ..
203            } => {
204                if resp.reset_request_id < *reset_request_id {
205                    info!(
206                        database_id = %resp.database_id,
207                        %worker_id,
208                        received_request_id = resp.reset_request_id,
209                        ongoing_request_id = reset_request_id,
210                        "ignore stale reset response"
211                    );
212                } else {
213                    assert_eq!(resp.reset_request_id, *reset_request_id);
214                    assert!(remaining_workers.remove(&worker_id));
215                    reset_resps
216                        .try_insert(worker_id, resp)
217                        .expect("non-duplicate");
218                }
219            }
220            DatabaseRecoveringStage::Initializing { .. } => {
221                unreachable!("all reset resp should have been received in Resetting")
222            }
223        }
224    }
225
226    pub(super) fn poll_next_event(&mut self, cx: &mut Context<'_>) -> Poll<RecoveringStateAction> {
227        match &mut self.stage {
228            DatabaseRecoveringStage::Resetting {
229                remaining_workers,
230                reset_resps,
231                backoff_future: backoff_future_option,
232                ..
233            } => {
234                let pass_backoff = if let Some(backoff_future) = backoff_future_option {
235                    if backoff_future.poll_unpin(cx).is_ready() {
236                        *backoff_future_option = None;
237                        true
238                    } else {
239                        false
240                    }
241                } else {
242                    true
243                };
244                if pass_backoff && remaining_workers.is_empty() {
245                    return Poll::Ready(RecoveringStateAction::EnterInitializing(take(
246                        reset_resps,
247                    )));
248                }
249            }
250            DatabaseRecoveringStage::Initializing {
251                initial_barrier_collector,
252                ..
253            } => {
254                if initial_barrier_collector.is_collected() {
255                    return Poll::Ready(RecoveringStateAction::EnterRunning);
256                }
257            }
258        }
259        Poll::Pending
260    }
261
262    pub(super) fn database_state(
263        &self,
264    ) -> Option<(
265        &BarrierWorkerState,
266        &HashMap<JobId, CreatingStreamingJobControl>,
267    )> {
268        match &self.stage {
269            DatabaseRecoveringStage::Resetting { .. } => None,
270            DatabaseRecoveringStage::Initializing {
271                initial_barrier_collector,
272                ..
273            } => Some(initial_barrier_collector.database_state()),
274        }
275    }
276}
277
278pub(crate) struct DatabaseStatusAction<'a, A> {
279    control: &'a mut CheckpointControl,
280    database_id: DatabaseId,
281    pub(crate) action: A,
282}
283
284impl<A> DatabaseStatusAction<'_, A> {
285    pub(crate) fn database_id(&self) -> DatabaseId {
286        self.database_id
287    }
288}
289
290impl CheckpointControl {
291    pub(super) fn new_database_status_action<A>(
292        &mut self,
293        database_id: DatabaseId,
294        action: A,
295    ) -> DatabaseStatusAction<'_, A> {
296        DatabaseStatusAction {
297            control: self,
298            database_id,
299            action,
300        }
301    }
302}
303
304pub(crate) struct EnterReset;
305
306impl DatabaseStatusAction<'_, EnterReset> {
307    pub(crate) fn enter(
308        self,
309        barrier_complete_output: Option<BarrierCompleteOutput>,
310        control_stream_manager: &mut ControlStreamManager,
311    ) {
312        let event_log_manager_ref = self.control.env.event_log_manager_ref();
313        if let Some(output) = barrier_complete_output {
314            self.control.ack_completed(output);
315        }
316        let database_status = self
317            .control
318            .databases
319            .get_mut(&self.database_id)
320            .expect("should exist");
321        match database_status {
322            DatabaseCheckpointControlStatus::Running(_) => {
323                let reset_request_id = INITIAL_RESET_REQUEST_ID;
324                let remaining_workers =
325                    control_stream_manager.reset_database(self.database_id, reset_request_id);
326                let metrics = DatabaseRecoveryMetrics::new(self.database_id);
327                event_log_manager_ref.add_event_logs(vec![Event::Recovery(
328                    EventRecovery::database_recovery_start(self.database_id.as_raw_id()),
329                )]);
330                *database_status =
331                    DatabaseCheckpointControlStatus::Recovering(DatabaseRecoveringState {
332                        stage: DatabaseRecoveringStage::Resetting {
333                            remaining_workers,
334                            reset_resps: Default::default(),
335                            reset_request_id,
336                            backoff_future: None,
337                        },
338                        next_reset_request_id: reset_request_id + 1,
339                        retry_backoff_strategy: get_retry_backoff_strategy(),
340                        metrics,
341                    });
342            }
343            DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
344                DatabaseRecoveringStage::Resetting { .. } => {
345                    unreachable!("should not enter resetting again")
346                }
347                DatabaseRecoveringStage::Initializing { .. } => {
348                    event_log_manager_ref.add_event_logs(vec![Event::Recovery(
349                        EventRecovery::database_recovery_failure(self.database_id.as_raw_id()),
350                    )]);
351                    let (backoff_future, reset_request_id) = state.next_retry();
352                    let remaining_workers =
353                        control_stream_manager.reset_database(self.database_id, reset_request_id);
354                    state.metrics.recovery_failure_cnt.inc();
355                    state.stage = DatabaseRecoveringStage::Resetting {
356                        remaining_workers,
357                        reset_resps: Default::default(),
358                        reset_request_id,
359                        backoff_future: Some(backoff_future),
360                    };
361                }
362            },
363        }
364    }
365}
366
367impl CheckpointControl {
368    pub(crate) fn on_report_failure(
369        &mut self,
370        database_id: DatabaseId,
371        control_stream_manager: &mut ControlStreamManager,
372    ) -> Option<DatabaseStatusAction<'_, EnterReset>> {
373        let database_status = self.databases.get_mut(&database_id).expect("should exist");
374        match database_status {
375            DatabaseCheckpointControlStatus::Running(_) => {
376                Some(self.new_database_status_action(database_id, EnterReset))
377            }
378            DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
379                DatabaseRecoveringStage::Resetting { .. } => {
380                    // ignore reported failure during resetting or backoff.
381                    None
382                }
383                DatabaseRecoveringStage::Initializing { .. } => {
384                    warn!(database_id = %database_id, "");
385                    let (backoff_future, reset_request_id) = state.next_retry();
386                    let remaining_workers =
387                        control_stream_manager.reset_database(database_id, reset_request_id);
388                    state.metrics.recovery_failure_cnt.inc();
389                    state.stage = DatabaseRecoveringStage::Resetting {
390                        remaining_workers,
391                        reset_resps: Default::default(),
392                        reset_request_id,
393                        backoff_future: Some(backoff_future),
394                    };
395                    None
396                }
397            },
398        }
399    }
400}
401
402pub(crate) struct EnterInitializing(pub(crate) HashMap<WorkerId, ResetDatabaseResponse>);
403
404impl DatabaseStatusAction<'_, EnterInitializing> {
405    pub(crate) fn enter(
406        self,
407        runtime_info: DatabaseRuntimeInfoSnapshot,
408        control_stream_manager: &mut ControlStreamManager,
409    ) {
410        let database_status = self
411            .control
412            .databases
413            .get_mut(&self.database_id)
414            .expect("should exist");
415        let status = match database_status {
416            DatabaseCheckpointControlStatus::Running(_) => {
417                unreachable!("should not enter initializing when running")
418            }
419            DatabaseCheckpointControlStatus::Recovering(state) => match state.stage {
420                DatabaseRecoveringStage::Initializing { .. } => {
421                    unreachable!("can only enter initializing when resetting")
422                }
423                DatabaseRecoveringStage::Resetting { .. } => state,
424            },
425        };
426        let DatabaseRuntimeInfoSnapshot {
427            job_infos,
428            mut state_table_committed_epochs,
429            mut state_table_log_epochs,
430            mut mv_depended_subscriptions,
431            stream_actors,
432            fragment_relations,
433            mut source_splits,
434            mut background_jobs,
435            mut cdc_table_snapshot_split_assignment,
436        } = runtime_info;
437        let result: MetaResult<_> = try {
438            let mut builder = FragmentEdgeBuilder::new(
439                job_infos
440                    .values()
441                    .flat_map(|fragment_infos| fragment_infos.values()),
442                control_stream_manager,
443            );
444            builder.add_relations(&fragment_relations);
445            let mut edges = builder.build();
446            control_stream_manager.inject_database_initial_barrier(
447                self.database_id,
448                job_infos,
449                &mut state_table_committed_epochs,
450                &mut state_table_log_epochs,
451                &mut edges,
452                &stream_actors,
453                &mut source_splits,
454                &mut background_jobs,
455                &mut mv_depended_subscriptions,
456                false,
457                &self.control.hummock_version_stats,
458                &mut cdc_table_snapshot_split_assignment,
459            )?
460        };
461        match result {
462            Ok(initial_barrier_collector) => {
463                info!(node_to_collect = ?initial_barrier_collector, database_id = ?self.database_id, "database enter initializing");
464                status.stage = DatabaseRecoveringStage::Initializing {
465                    initial_barrier_collector: initial_barrier_collector.into(),
466                };
467            }
468            Err(e) => {
469                warn!(
470                    database_id = %self.database_id,
471                    e = %e.as_report(),
472                    "failed to inject initial barrier"
473                );
474                let (backoff_future, reset_request_id) = status.next_retry();
475                let remaining_workers =
476                    control_stream_manager.reset_database(self.database_id, reset_request_id);
477                status.metrics.recovery_failure_cnt.inc();
478                status.stage = DatabaseRecoveringStage::Resetting {
479                    remaining_workers,
480                    reset_resps: Default::default(),
481                    reset_request_id,
482                    backoff_future: Some(backoff_future),
483                };
484            }
485        }
486    }
487
488    pub(crate) fn remove(self) {
489        self.control
490            .databases
491            .remove(&self.database_id)
492            .expect("should exist");
493        self.control
494            .env
495            .shared_actor_infos()
496            .remove_database(self.database_id);
497    }
498}
499
500pub(crate) struct EnterRunning;
501
502impl DatabaseStatusAction<'_, EnterRunning> {
503    pub(crate) fn enter(self) {
504        info!(database_id = ?self.database_id, "database enter running");
505        let event_log_manager_ref = self.control.env.event_log_manager_ref();
506        event_log_manager_ref.add_event_logs(vec![Event::Recovery(
507            EventRecovery::database_recovery_success(self.database_id.as_raw_id()),
508        )]);
509        let database_status = self
510            .control
511            .databases
512            .get_mut(&self.database_id)
513            .expect("should exist");
514        match database_status {
515            DatabaseCheckpointControlStatus::Running(_) => {
516                unreachable!("should not enter running again")
517            }
518            DatabaseCheckpointControlStatus::Recovering(state) => {
519                let temp_place_holder = DatabaseRecoveringStage::Resetting {
520                    remaining_workers: Default::default(),
521                    reset_resps: Default::default(),
522                    reset_request_id: 0,
523                    backoff_future: None,
524                };
525                match state.metrics.recovery_timer.take() {
526                    Some(recovery_timer) => {
527                        recovery_timer.observe_duration();
528                    }
529                    _ => {
530                        if cfg!(debug_assertions) {
531                            panic!(
532                                "take database {} recovery latency for twice",
533                                self.database_id
534                            )
535                        } else {
536                            warn!(database_id = %self.database_id,"failed to take recovery latency")
537                        }
538                    }
539                }
540                match replace(&mut state.stage, temp_place_holder) {
541                    DatabaseRecoveringStage::Resetting { .. } => {
542                        unreachable!("can only enter running during initializing")
543                    }
544                    DatabaseRecoveringStage::Initializing {
545                        initial_barrier_collector,
546                    } => {
547                        *database_status = DatabaseCheckpointControlStatus::Running(
548                            initial_barrier_collector.finish(),
549                        );
550                    }
551                }
552            }
553        }
554    }
555}