risingwave_backup/
storage.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::config::ObjectStoreConfig;
20use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
21use risingwave_object_store::object::{
22    InMemObjectStore, MonitoredObjectStore, ObjectError, ObjectStoreImpl, ObjectStoreRef,
23};
24
25use crate::meta_snapshot::{MetaSnapshot, Metadata};
26use crate::{
27    BackupError, BackupResult, MetaSnapshotId, MetaSnapshotManifest, MetaSnapshotMetadata,
28};
29
30pub type MetaSnapshotStorageRef = Arc<ObjectStoreMetaSnapshotStorage>;
31
32#[async_trait::async_trait]
33pub trait MetaSnapshotStorage: 'static + Sync + Send {
34    /// Creates a snapshot.
35    async fn create<S: Metadata>(
36        &self,
37        snapshot: &MetaSnapshot<S>,
38        remarks: Option<String>,
39    ) -> BackupResult<()>;
40
41    /// Gets a snapshot by id.
42    async fn get<S: Metadata>(&self, id: MetaSnapshotId) -> BackupResult<MetaSnapshot<S>>;
43
44    /// Gets local snapshot manifest.
45    fn manifest(&self) -> Arc<MetaSnapshotManifest>;
46
47    /// Refreshes local snapshot manifest.
48    async fn refresh_manifest(&self) -> BackupResult<()>;
49
50    /// Deletes snapshots by ids.
51    async fn delete(&self, ids: &[MetaSnapshotId]) -> BackupResult<()>;
52}
53
54#[derive(Clone)]
55pub struct ObjectStoreMetaSnapshotStorage {
56    path: String,
57    store: ObjectStoreRef,
58    manifest: Arc<parking_lot::RwLock<Arc<MetaSnapshotManifest>>>,
59}
60
61// TODO #6482: purge stale snapshots that is not in manifest.
62impl ObjectStoreMetaSnapshotStorage {
63    pub async fn new(path: &str, store: ObjectStoreRef) -> BackupResult<Self> {
64        let instance = Self {
65            path: path.to_owned(),
66            store,
67            manifest: Default::default(),
68        };
69        instance.refresh_manifest().await?;
70        Ok(instance)
71    }
72
73    async fn update_manifest(&self, new_manifest: MetaSnapshotManifest) -> BackupResult<()> {
74        let bytes =
75            serde_json::to_vec(&new_manifest).map_err(|e| BackupError::Encoding(e.into()))?;
76        self.store
77            .upload(&self.get_manifest_path(), bytes.into())
78            .await?;
79        *self.manifest.write() = Arc::new(new_manifest);
80        Ok(())
81    }
82
83    async fn get_manifest(&self) -> BackupResult<Option<MetaSnapshotManifest>> {
84        let manifest_path = self.get_manifest_path();
85        let bytes = match self.store.read(&manifest_path, ..).await {
86            Ok(bytes) => bytes,
87            Err(e) => {
88                if e.is_object_not_found_error() {
89                    return Ok(None);
90                }
91                return Err(e.into());
92            }
93        };
94        let manifest: MetaSnapshotManifest =
95            serde_json::from_slice(&bytes).map_err(|e| BackupError::Encoding(e.into()))?;
96        Ok(Some(manifest))
97    }
98
99    fn get_manifest_path(&self) -> String {
100        format!("{}/manifest.json", self.path)
101    }
102
103    fn get_snapshot_path(&self, id: MetaSnapshotId) -> String {
104        format!("{}/{}.snapshot", self.path, id)
105    }
106
107    #[allow(dead_code)]
108    fn get_snapshot_id_from_path(path: &str) -> MetaSnapshotId {
109        let split = path.split(&['/', '.']).collect_vec();
110        debug_assert!(split.len() > 2);
111        debug_assert!(split[split.len() - 1] == "snapshot");
112        split[split.len() - 2]
113            .parse::<MetaSnapshotId>()
114            .expect("valid meta snapshot id")
115    }
116}
117
118#[async_trait::async_trait]
119impl MetaSnapshotStorage for ObjectStoreMetaSnapshotStorage {
120    async fn create<S: Metadata>(
121        &self,
122        snapshot: &MetaSnapshot<S>,
123        remarks: Option<String>,
124    ) -> BackupResult<()> {
125        let path = self.get_snapshot_path(snapshot.id);
126        self.store.upload(&path, snapshot.encode()?.into()).await?;
127
128        // update manifest last
129        let mut new_manifest = (**self.manifest.read()).clone();
130        new_manifest.manifest_id += 1;
131        new_manifest
132            .snapshot_metadata
133            .push(MetaSnapshotMetadata::new(
134                snapshot.id,
135                snapshot.metadata.hummock_version_ref(),
136                snapshot.format_version,
137                remarks,
138            ));
139        self.update_manifest(new_manifest).await?;
140        Ok(())
141    }
142
143    async fn get<S: Metadata>(&self, id: MetaSnapshotId) -> BackupResult<MetaSnapshot<S>> {
144        let path = self.get_snapshot_path(id);
145        let data = self.store.read(&path, ..).await?;
146        MetaSnapshot::decode(&data)
147    }
148
149    fn manifest(&self) -> Arc<MetaSnapshotManifest> {
150        self.manifest.read().clone()
151    }
152
153    async fn refresh_manifest(&self) -> BackupResult<()> {
154        if let Some(manifest) = self.get_manifest().await? {
155            let mut guard = self.manifest.write();
156            if manifest.manifest_id > guard.manifest_id {
157                *guard = Arc::new(manifest);
158            }
159        }
160        Ok(())
161    }
162
163    async fn delete(&self, ids: &[MetaSnapshotId]) -> BackupResult<()> {
164        // update manifest first
165        let to_delete: HashSet<MetaSnapshotId> = HashSet::from_iter(ids.iter().cloned());
166        let mut new_manifest = (**self.manifest.read()).clone();
167        new_manifest.manifest_id += 1;
168        new_manifest
169            .snapshot_metadata
170            .retain(|m| !to_delete.contains(&m.id));
171        self.update_manifest(new_manifest).await?;
172
173        let paths = ids
174            .iter()
175            .map(|id| self.get_snapshot_path(*id))
176            .collect_vec();
177        self.store.delete_objects(&paths).await?;
178        Ok(())
179    }
180}
181
182impl From<ObjectError> for BackupError {
183    fn from(e: ObjectError) -> Self {
184        BackupError::BackupStorage(e.into())
185    }
186}
187
188// #[cfg(test)]
189pub async fn unused() -> ObjectStoreMetaSnapshotStorage {
190    ObjectStoreMetaSnapshotStorage::new(
191        "",
192        Arc::new(ObjectStoreImpl::InMem(MonitoredObjectStore::new(
193            InMemObjectStore::new(),
194            Arc::new(ObjectStoreMetrics::unused()),
195            Arc::new(ObjectStoreConfig::default()),
196        ))),
197    )
198    .await
199    .unwrap()
200}