risingwave_meta/backup_restore/
backup_manager.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17use std::time::Instant;
18
19use arc_swap::ArcSwap;
20use risingwave_backup::error::BackupError;
21use risingwave_backup::storage::{MetaSnapshotStorage, ObjectStoreMetaSnapshotStorage};
22use risingwave_backup::{MetaBackupJobId, MetaSnapshotId, MetaSnapshotManifest};
23use risingwave_common::bail;
24use risingwave_common::config::ObjectStoreConfig;
25use risingwave_common::system_param::reader::SystemParamsRead;
26use risingwave_hummock_sdk::HummockRawObjectId;
27use risingwave_object_store::object::build_remote_object_store;
28use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
29use risingwave_pb::backup_service::{BackupJobStatus, MetaBackupManifestId};
30use risingwave_pb::meta::subscribe_response::{Info, Operation};
31use thiserror_ext::AsReport;
32use tokio::task::JoinHandle;
33
34use crate::MetaResult;
35use crate::backup_restore::meta_snapshot_builder;
36use crate::backup_restore::metrics::BackupManagerMetrics;
37use crate::hummock::sequence::next_meta_backup_id;
38use crate::hummock::{HummockManagerRef, HummockVersionSafePoint};
39use crate::manager::{LocalNotification, MetaSrvEnv};
40use crate::rpc::metrics::MetaMetrics;
41
42pub enum BackupJobResult {
43    Succeeded,
44    Failed(BackupError),
45}
46
47/// `BackupJobHandle` tracks running job.
48struct BackupJobHandle {
49    job_id: u64,
50    #[expect(dead_code)]
51    hummock_version_safe_point: HummockVersionSafePoint,
52    start_time: Instant,
53}
54
55impl BackupJobHandle {
56    pub fn new(job_id: u64, hummock_version_safe_point: HummockVersionSafePoint) -> Self {
57        Self {
58            job_id,
59            hummock_version_safe_point,
60            start_time: Instant::now(),
61        }
62    }
63}
64
65pub type BackupManagerRef = Arc<BackupManager>;
66/// (url, dir)
67type StoreConfig = (String, String);
68
69/// `BackupManager` manages lifecycle of all existent backups and the running backup job.
70pub struct BackupManager {
71    env: MetaSrvEnv,
72    hummock_manager: HummockManagerRef,
73    backup_store: ArcSwap<(ObjectStoreMetaSnapshotStorage, StoreConfig)>,
74    /// Tracks the running backup job. Concurrent jobs is not supported.
75    running_job_handle: tokio::sync::Mutex<Option<BackupJobHandle>>,
76    metrics: BackupManagerMetrics,
77    meta_metrics: Arc<MetaMetrics>,
78    /// (job id, status, message)
79    latest_job_info: ArcSwap<(MetaBackupJobId, BackupJobStatus, String)>,
80}
81
82impl BackupManager {
83    pub async fn new(
84        env: MetaSrvEnv,
85        hummock_manager: HummockManagerRef,
86        metrics: Arc<MetaMetrics>,
87        store_url: &str,
88        store_dir: &str,
89    ) -> MetaResult<Arc<Self>> {
90        let store_config = (store_url.to_owned(), store_dir.to_owned());
91        let store = create_snapshot_store(
92            &store_config,
93            metrics.object_store_metric.clone(),
94            &env.opts.object_store_config,
95        )
96        .await?;
97        tracing::info!(
98            "backup manager initialized: url={}, dir={}",
99            store_config.0,
100            store_config.1
101        );
102        let instance = Arc::new(Self::with_store(
103            env.clone(),
104            hummock_manager,
105            metrics,
106            (store, store_config),
107        ));
108        let (local_notification_tx, mut local_notification_rx) =
109            tokio::sync::mpsc::unbounded_channel();
110        env.notification_manager()
111            .insert_local_sender(local_notification_tx);
112        let this = instance.clone();
113        tokio::spawn(async move {
114            loop {
115                match local_notification_rx.recv().await {
116                    Some(notification) => {
117                        if let LocalNotification::SystemParamsChange(p) = notification {
118                            let new_config = (
119                                p.backup_storage_url().to_owned(),
120                                p.backup_storage_directory().to_owned(),
121                            );
122                            this.handle_new_config(new_config).await;
123                        }
124                    }
125                    None => {
126                        return;
127                    }
128                }
129            }
130        });
131        Ok(instance)
132    }
133
134    async fn handle_new_config(&self, new_config: StoreConfig) {
135        if self.backup_store.load().1 == new_config {
136            return;
137        }
138        if let Err(e) = self.set_store(new_config.clone()).await {
139            // Retry is driven by periodic system params notification.
140            tracing::warn!(
141                url = &new_config.0,
142                dir = &new_config.1,
143                error = %e.as_report(),
144                "failed to apply new backup config",
145            );
146        }
147    }
148
149    fn with_store(
150        env: MetaSrvEnv,
151        hummock_manager: HummockManagerRef,
152        meta_metrics: Arc<MetaMetrics>,
153        backup_store: (ObjectStoreMetaSnapshotStorage, StoreConfig),
154    ) -> Self {
155        Self {
156            env,
157            hummock_manager,
158            backup_store: ArcSwap::from_pointee(backup_store),
159            running_job_handle: tokio::sync::Mutex::new(None),
160            metrics: BackupManagerMetrics::default(),
161            meta_metrics,
162            latest_job_info: ArcSwap::from_pointee((0, BackupJobStatus::NotFound, "".into())),
163        }
164    }
165
166    pub async fn set_store(&self, config: StoreConfig) -> MetaResult<()> {
167        let new_store = create_snapshot_store(
168            &config,
169            self.meta_metrics.object_store_metric.clone(),
170            &self.env.opts.object_store_config,
171        )
172        .await?;
173        tracing::info!(
174            "new backup config is applied: url={}, dir={}",
175            config.0,
176            config.1
177        );
178        self.backup_store.store(Arc::new((new_store, config)));
179        Ok(())
180    }
181
182    #[cfg(test)]
183    pub async fn for_test(env: MetaSrvEnv, hummock_manager: HummockManagerRef) -> Self {
184        Self::with_store(
185            env,
186            hummock_manager,
187            Arc::new(MetaMetrics::default()),
188            (
189                risingwave_backup::storage::unused().await,
190                StoreConfig::default(),
191            ),
192        )
193    }
194
195    /// Starts a backup job in background. It's non-blocking.
196    /// Returns job id.
197    pub async fn start_backup_job(
198        self: &Arc<Self>,
199        remarks: Option<String>,
200    ) -> MetaResult<MetaBackupJobId> {
201        let mut guard = self.running_job_handle.lock().await;
202        if let Some(job) = (*guard).as_ref() {
203            bail!(format!(
204                "concurrent backup job is not supported: existent job {}",
205                job.job_id
206            ));
207        }
208        // The reasons to limit number of meta snapshot are:
209        // 1. limit size of `MetaSnapshotManifest`, which is kept in memory by
210        // `ObjectStoreMetaSnapshotStorage`.
211        // 2. limit number of pinned SSTs returned by
212        // `list_pinned_ssts`, which subsequently is used by GC.
213        const MAX_META_SNAPSHOT_NUM: usize = 100;
214        let current_number = self
215            .backup_store
216            .load()
217            .0
218            .manifest()
219            .await
220            .snapshot_metadata
221            .len();
222        if current_number > MAX_META_SNAPSHOT_NUM {
223            bail!(format!(
224                "too many existent meta snapshots, expect at most {}",
225                MAX_META_SNAPSHOT_NUM
226            ))
227        }
228
229        let job_id = next_meta_backup_id(&self.env).await?;
230        self.latest_job_info
231            .store(Arc::new((job_id, BackupJobStatus::Running, "".into())));
232        let hummock_version_safe_point = self.hummock_manager.register_safe_point().await;
233        // Ideally `BackupWorker` and its r/w IO can be made external to meta node.
234        // The justification of keeping `BackupWorker` in meta node are:
235        // - It makes meta node the only writer of backup storage, which eases implementation.
236        // - It's likely meta store is deployed in the same node with meta node.
237        // - IO volume of metadata snapshot is not expected to be large.
238        // - Backup job is not expected to be frequent.
239        BackupWorker::new(self.clone()).start(job_id, remarks);
240        let job_handle = BackupJobHandle::new(job_id, hummock_version_safe_point);
241        *guard = Some(job_handle);
242        self.metrics.job_count.inc();
243        Ok(job_id)
244    }
245
246    pub fn get_backup_job_status(&self, job_id: MetaBackupJobId) -> (BackupJobStatus, String) {
247        let last = self.latest_job_info.load();
248        if last.0 == job_id {
249            return (last.1, last.2.clone());
250        }
251        (BackupJobStatus::NotFound, "".into())
252    }
253
254    async fn finish_backup_job(&self, job_id: MetaBackupJobId, job_result: BackupJobResult) {
255        // `job_handle` holds `hummock_version_safe_point` until the job is completed.
256        let job_handle = self
257            .take_job_handle_by_job_id(job_id)
258            .await
259            .expect("job id should match");
260        let job_latency = job_handle.start_time.elapsed().as_secs_f64();
261        match job_result {
262            BackupJobResult::Succeeded => {
263                self.metrics.job_latency_success.observe(job_latency);
264                tracing::info!("succeeded backup job {}", job_id);
265                self.env
266                    .notification_manager()
267                    .notify_hummock_without_version(
268                        Operation::Update,
269                        Info::MetaBackupManifestId(MetaBackupManifestId {
270                            id: self.backup_store.load().0.manifest().await.manifest_id,
271                        }),
272                    );
273                self.latest_job_info.store(Arc::new((
274                    job_id,
275                    BackupJobStatus::Succeeded,
276                    "".into(),
277                )));
278            }
279            BackupJobResult::Failed(e) => {
280                self.metrics.job_latency_failure.observe(job_latency);
281                let message = format!("failed backup job {}: {}", job_id, e.as_report());
282                tracing::warn!(message);
283                self.latest_job_info
284                    .store(Arc::new((job_id, BackupJobStatus::Failed, message)));
285            }
286        }
287    }
288
289    async fn take_job_handle_by_job_id(&self, job_id: u64) -> Option<BackupJobHandle> {
290        let mut guard = self.running_job_handle.lock().await;
291        match (*guard).as_ref() {
292            None => {
293                return None;
294            }
295            Some(job_handle) => {
296                if job_handle.job_id != job_id {
297                    return None;
298                }
299            }
300        }
301        guard.take()
302    }
303
304    /// Deletes existent backups from backup storage.
305    pub async fn delete_backups(&self, ids: &[MetaSnapshotId]) -> MetaResult<()> {
306        self.backup_store.load().0.delete(ids).await?;
307        self.env
308            .notification_manager()
309            .notify_hummock_without_version(
310                Operation::Update,
311                Info::MetaBackupManifestId(MetaBackupManifestId {
312                    id: self.backup_store.load().0.manifest().await.manifest_id,
313                }),
314            );
315        Ok(())
316    }
317
318    /// List id of all objects required by backups.
319    pub async fn list_pinned_object_ids(&self) -> HashSet<HummockRawObjectId> {
320        self.backup_store
321            .load()
322            .0
323            .manifest()
324            .await
325            .snapshot_metadata
326            .iter()
327            .flat_map(|s| s.objects.iter().copied())
328            .collect()
329    }
330
331    pub async fn manifest(&self) -> Arc<MetaSnapshotManifest> {
332        self.backup_store.load().0.manifest().await
333    }
334}
335
336/// `BackupWorker` creates a database snapshot.
337struct BackupWorker {
338    backup_manager: BackupManagerRef,
339}
340
341impl BackupWorker {
342    fn new(backup_manager: BackupManagerRef) -> Self {
343        Self { backup_manager }
344    }
345
346    fn start(self, job_id: u64, remarks: Option<String>) -> JoinHandle<()> {
347        let backup_manager_clone = self.backup_manager.clone();
348        let job = async move {
349            let hummock_manager = backup_manager_clone.hummock_manager.clone();
350            let hummock_version_builder = async move {
351                hummock_manager
352                    .on_current_version(|version| version.clone())
353                    .await
354            };
355            let meta_store = backup_manager_clone.env.meta_store();
356            let mut snapshot_builder =
357                meta_snapshot_builder::MetaSnapshotV2Builder::new(meta_store);
358            // Reuse job id as snapshot id.
359            snapshot_builder
360                .build(job_id, hummock_version_builder)
361                .await?;
362            let snapshot = snapshot_builder.finish()?;
363            backup_manager_clone
364                .backup_store
365                .load()
366                .0
367                .create(&snapshot, remarks)
368                .await?;
369            Ok(BackupJobResult::Succeeded)
370        };
371        tokio::spawn(async move {
372            let job_result = job.await.unwrap_or_else(BackupJobResult::Failed);
373            self.backup_manager
374                .finish_backup_job(job_id, job_result)
375                .await;
376        })
377    }
378}
379
380async fn create_snapshot_store(
381    config: &StoreConfig,
382    metric: Arc<ObjectStoreMetrics>,
383    object_store_config: &ObjectStoreConfig,
384) -> MetaResult<ObjectStoreMetaSnapshotStorage> {
385    let object_store = Arc::new(
386        build_remote_object_store(
387            &config.0,
388            metric,
389            "Meta Backup",
390            Arc::new(object_store_config.clone()),
391        )
392        .await,
393    );
394    let store = ObjectStoreMetaSnapshotStorage::new(&config.1, object_store).await?;
395    Ok(store)
396}