risingwave_backup/
storage.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::config::ObjectStoreConfig;
20use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
21use risingwave_object_store::object::{
22    InMemObjectStore, MonitoredObjectStore, ObjectError, ObjectStoreImpl, ObjectStoreRef,
23};
24use tokio::sync::RwLock;
25
26use crate::meta_snapshot::{MetaSnapshot, Metadata};
27use crate::{
28    BackupError, BackupResult, MetaSnapshotId, MetaSnapshotManifest, MetaSnapshotMetadata,
29};
30
31pub type MetaSnapshotStorageRef = Arc<ObjectStoreMetaSnapshotStorage>;
32
33#[async_trait::async_trait]
34pub trait MetaSnapshotStorage: 'static + Sync + Send {
35    /// Creates a snapshot.
36    async fn create<S: Metadata>(
37        &self,
38        snapshot: &MetaSnapshot<S>,
39        remarks: Option<String>,
40    ) -> BackupResult<()>;
41
42    /// Gets a snapshot by id.
43    async fn get<S: Metadata>(&self, id: MetaSnapshotId) -> BackupResult<MetaSnapshot<S>>;
44
45    /// Gets local snapshot manifest.
46    async fn manifest(&self) -> Arc<MetaSnapshotManifest>;
47
48    /// Refreshes local snapshot manifest.
49    async fn refresh_manifest(&self) -> BackupResult<()>;
50
51    /// Deletes snapshots by ids.
52    async fn delete(&self, ids: &[MetaSnapshotId]) -> BackupResult<()>;
53}
54
55#[derive(Clone)]
56pub struct ObjectStoreMetaSnapshotStorage {
57    path: String,
58    store: ObjectStoreRef,
59    manifest: Arc<RwLock<Arc<MetaSnapshotManifest>>>,
60}
61
62// TODO #6482: purge stale snapshots that is not in manifest.
63impl ObjectStoreMetaSnapshotStorage {
64    pub async fn new(path: &str, store: ObjectStoreRef) -> BackupResult<Self> {
65        let instance = Self {
66            path: path.to_owned(),
67            store,
68            manifest: Default::default(),
69        };
70        instance.refresh_manifest().await?;
71        Ok(instance)
72    }
73
74    async fn update_manifest(
75        &self,
76        update: impl FnOnce(MetaSnapshotManifest) -> MetaSnapshotManifest,
77    ) -> BackupResult<()> {
78        let mut guard = self.manifest.write().await;
79        let new_manifest = update((**guard).clone());
80        let bytes =
81            serde_json::to_vec(&new_manifest).map_err(|e| BackupError::Encoding(e.into()))?;
82        self.store
83            .upload(&self.get_manifest_path(), bytes.into())
84            .await?;
85        *guard = Arc::new(new_manifest);
86        Ok(())
87    }
88
89    async fn get_manifest(&self) -> BackupResult<Option<MetaSnapshotManifest>> {
90        let manifest_path = self.get_manifest_path();
91        let bytes = match self.store.read(&manifest_path, ..).await {
92            Ok(bytes) => bytes,
93            Err(e) => {
94                if e.is_object_not_found_error() {
95                    return Ok(None);
96                }
97                return Err(e.into());
98            }
99        };
100        let manifest: MetaSnapshotManifest =
101            serde_json::from_slice(&bytes).map_err(|e| BackupError::Encoding(e.into()))?;
102        Ok(Some(manifest))
103    }
104
105    fn get_manifest_path(&self) -> String {
106        format!("{}/manifest.json", self.path)
107    }
108
109    fn get_snapshot_path(&self, id: MetaSnapshotId) -> String {
110        format!("{}/{}.snapshot", self.path, id)
111    }
112
113    #[allow(dead_code)]
114    fn get_snapshot_id_from_path(path: &str) -> MetaSnapshotId {
115        let split = path.split(&['/', '.']).collect_vec();
116        debug_assert!(split.len() > 2);
117        debug_assert!(split[split.len() - 1] == "snapshot");
118        split[split.len() - 2]
119            .parse::<MetaSnapshotId>()
120            .expect("valid meta snapshot id")
121    }
122}
123
124#[async_trait::async_trait]
125impl MetaSnapshotStorage for ObjectStoreMetaSnapshotStorage {
126    async fn create<S: Metadata>(
127        &self,
128        snapshot: &MetaSnapshot<S>,
129        remarks: Option<String>,
130    ) -> BackupResult<()> {
131        let path = self.get_snapshot_path(snapshot.id);
132        self.store.upload(&path, snapshot.encode()?.into()).await?;
133        self.update_manifest(|mut manifest: MetaSnapshotManifest| {
134            manifest.manifest_id += 1;
135            manifest.snapshot_metadata.push(MetaSnapshotMetadata::new(
136                snapshot.id,
137                snapshot.metadata.hummock_version_ref(),
138                snapshot.format_version,
139                remarks,
140                snapshot.metadata.table_change_log_object_ids().into_iter(),
141            ));
142            manifest
143        })
144        .await?;
145        Ok(())
146    }
147
148    async fn get<S: Metadata>(&self, id: MetaSnapshotId) -> BackupResult<MetaSnapshot<S>> {
149        let path = self.get_snapshot_path(id);
150        let data = self.store.read(&path, ..).await?;
151        MetaSnapshot::decode(&data)
152    }
153
154    async fn manifest(&self) -> Arc<MetaSnapshotManifest> {
155        self.manifest.read().await.clone()
156    }
157
158    async fn refresh_manifest(&self) -> BackupResult<()> {
159        if let Some(manifest) = self.get_manifest().await? {
160            let mut guard = self.manifest.write().await;
161            if manifest.manifest_id > guard.manifest_id {
162                *guard = Arc::new(manifest);
163            }
164        }
165        Ok(())
166    }
167
168    async fn delete(&self, ids: &[MetaSnapshotId]) -> BackupResult<()> {
169        let to_delete: HashSet<MetaSnapshotId> = HashSet::from_iter(ids.iter().cloned());
170        self.update_manifest(|mut manifest: MetaSnapshotManifest| {
171            manifest.manifest_id += 1;
172            manifest
173                .snapshot_metadata
174                .retain(|m| !to_delete.contains(&m.id));
175            manifest
176        })
177        .await?;
178        let paths = ids
179            .iter()
180            .map(|id| self.get_snapshot_path(*id))
181            .collect_vec();
182        self.store.delete_objects(&paths).await?;
183        Ok(())
184    }
185}
186
187impl From<ObjectError> for BackupError {
188    fn from(e: ObjectError) -> Self {
189        BackupError::BackupStorage(e.into())
190    }
191}
192
193// #[cfg(test)]
194pub async fn unused() -> ObjectStoreMetaSnapshotStorage {
195    ObjectStoreMetaSnapshotStorage::new(
196        "",
197        Arc::new(ObjectStoreImpl::InMem(MonitoredObjectStore::new(
198            InMemObjectStore::for_test(),
199            Arc::new(ObjectStoreMetrics::unused()),
200            Arc::new(ObjectStoreConfig::default()),
201        ))),
202    )
203    .await
204    .unwrap()
205}