risingwave_meta_service/
backup_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Deref;
16
17use risingwave_pb::backup_service::backup_service_server::BackupService;
18use risingwave_pb::backup_service::{
19    BackupMetaRequest, BackupMetaResponse, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse,
20    GetBackupJobStatusRequest, GetBackupJobStatusResponse, GetMetaSnapshotManifestRequest,
21    GetMetaSnapshotManifestResponse,
22};
23use tonic::{Request, Response, Status};
24
25use crate::backup_restore::BackupManagerRef;
26
27pub struct BackupServiceImpl {
28    backup_manager: BackupManagerRef,
29}
30
31impl BackupServiceImpl {
32    pub fn new(backup_manager: BackupManagerRef) -> Self {
33        Self { backup_manager }
34    }
35}
36
37#[async_trait::async_trait]
38impl BackupService for BackupServiceImpl {
39    async fn backup_meta(
40        &self,
41        request: Request<BackupMetaRequest>,
42    ) -> Result<Response<BackupMetaResponse>, Status> {
43        let remarks = request.into_inner().remarks;
44        let job_id = self.backup_manager.start_backup_job(remarks).await?;
45        Ok(Response::new(BackupMetaResponse { job_id }))
46    }
47
48    async fn get_backup_job_status(
49        &self,
50        request: Request<GetBackupJobStatusRequest>,
51    ) -> Result<Response<GetBackupJobStatusResponse>, Status> {
52        let job_id = request.into_inner().job_id;
53        let (job_status, message) = self.backup_manager.get_backup_job_status(job_id);
54        Ok(Response::new(GetBackupJobStatusResponse {
55            job_id,
56            job_status: job_status as _,
57            message,
58        }))
59    }
60
61    async fn delete_meta_snapshot(
62        &self,
63        request: Request<DeleteMetaSnapshotRequest>,
64    ) -> Result<Response<DeleteMetaSnapshotResponse>, Status> {
65        let snapshot_ids = request.into_inner().snapshot_ids;
66        self.backup_manager.delete_backups(&snapshot_ids).await?;
67        Ok(Response::new(DeleteMetaSnapshotResponse {}))
68    }
69
70    async fn get_meta_snapshot_manifest(
71        &self,
72        _request: Request<GetMetaSnapshotManifestRequest>,
73    ) -> Result<Response<GetMetaSnapshotManifestResponse>, Status> {
74        Ok(Response::new(GetMetaSnapshotManifestResponse {
75            manifest: Some(self.backup_manager.manifest().deref().into()),
76        }))
77    }
78}