risingwave_backup/
storage.rs1use std::collections::HashSet;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::config::ObjectStoreConfig;
20use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
21use risingwave_object_store::object::{
22 InMemObjectStore, MonitoredObjectStore, ObjectError, ObjectStoreImpl, ObjectStoreRef,
23};
24
25use crate::meta_snapshot::{MetaSnapshot, Metadata};
26use crate::{
27 BackupError, BackupResult, MetaSnapshotId, MetaSnapshotManifest, MetaSnapshotMetadata,
28};
29
30pub type MetaSnapshotStorageRef = Arc<ObjectStoreMetaSnapshotStorage>;
31
32#[async_trait::async_trait]
33pub trait MetaSnapshotStorage: 'static + Sync + Send {
34 async fn create<S: Metadata>(
36 &self,
37 snapshot: &MetaSnapshot<S>,
38 remarks: Option<String>,
39 ) -> BackupResult<()>;
40
41 async fn get<S: Metadata>(&self, id: MetaSnapshotId) -> BackupResult<MetaSnapshot<S>>;
43
44 fn manifest(&self) -> Arc<MetaSnapshotManifest>;
46
47 async fn refresh_manifest(&self) -> BackupResult<()>;
49
50 async fn delete(&self, ids: &[MetaSnapshotId]) -> BackupResult<()>;
52}
53
54#[derive(Clone)]
55pub struct ObjectStoreMetaSnapshotStorage {
56 path: String,
57 store: ObjectStoreRef,
58 manifest: Arc<parking_lot::RwLock<Arc<MetaSnapshotManifest>>>,
59}
60
61impl ObjectStoreMetaSnapshotStorage {
63 pub async fn new(path: &str, store: ObjectStoreRef) -> BackupResult<Self> {
64 let instance = Self {
65 path: path.to_owned(),
66 store,
67 manifest: Default::default(),
68 };
69 instance.refresh_manifest().await?;
70 Ok(instance)
71 }
72
73 async fn update_manifest(&self, new_manifest: MetaSnapshotManifest) -> BackupResult<()> {
74 let bytes =
75 serde_json::to_vec(&new_manifest).map_err(|e| BackupError::Encoding(e.into()))?;
76 self.store
77 .upload(&self.get_manifest_path(), bytes.into())
78 .await?;
79 *self.manifest.write() = Arc::new(new_manifest);
80 Ok(())
81 }
82
83 async fn get_manifest(&self) -> BackupResult<Option<MetaSnapshotManifest>> {
84 let manifest_path = self.get_manifest_path();
85 let bytes = match self.store.read(&manifest_path, ..).await {
86 Ok(bytes) => bytes,
87 Err(e) => {
88 if e.is_object_not_found_error() {
89 return Ok(None);
90 }
91 return Err(e.into());
92 }
93 };
94 let manifest: MetaSnapshotManifest =
95 serde_json::from_slice(&bytes).map_err(|e| BackupError::Encoding(e.into()))?;
96 Ok(Some(manifest))
97 }
98
99 fn get_manifest_path(&self) -> String {
100 format!("{}/manifest.json", self.path)
101 }
102
103 fn get_snapshot_path(&self, id: MetaSnapshotId) -> String {
104 format!("{}/{}.snapshot", self.path, id)
105 }
106
107 #[allow(dead_code)]
108 fn get_snapshot_id_from_path(path: &str) -> MetaSnapshotId {
109 let split = path.split(&['/', '.']).collect_vec();
110 debug_assert!(split.len() > 2);
111 debug_assert!(split[split.len() - 1] == "snapshot");
112 split[split.len() - 2]
113 .parse::<MetaSnapshotId>()
114 .expect("valid meta snapshot id")
115 }
116}
117
118#[async_trait::async_trait]
119impl MetaSnapshotStorage for ObjectStoreMetaSnapshotStorage {
120 async fn create<S: Metadata>(
121 &self,
122 snapshot: &MetaSnapshot<S>,
123 remarks: Option<String>,
124 ) -> BackupResult<()> {
125 let path = self.get_snapshot_path(snapshot.id);
126 self.store.upload(&path, snapshot.encode()?.into()).await?;
127
128 let mut new_manifest = (**self.manifest.read()).clone();
130 new_manifest.manifest_id += 1;
131 new_manifest
132 .snapshot_metadata
133 .push(MetaSnapshotMetadata::new(
134 snapshot.id,
135 snapshot.metadata.hummock_version_ref(),
136 snapshot.format_version,
137 remarks,
138 ));
139 self.update_manifest(new_manifest).await?;
140 Ok(())
141 }
142
143 async fn get<S: Metadata>(&self, id: MetaSnapshotId) -> BackupResult<MetaSnapshot<S>> {
144 let path = self.get_snapshot_path(id);
145 let data = self.store.read(&path, ..).await?;
146 MetaSnapshot::decode(&data)
147 }
148
149 fn manifest(&self) -> Arc<MetaSnapshotManifest> {
150 self.manifest.read().clone()
151 }
152
153 async fn refresh_manifest(&self) -> BackupResult<()> {
154 if let Some(manifest) = self.get_manifest().await? {
155 let mut guard = self.manifest.write();
156 if manifest.manifest_id > guard.manifest_id {
157 *guard = Arc::new(manifest);
158 }
159 }
160 Ok(())
161 }
162
163 async fn delete(&self, ids: &[MetaSnapshotId]) -> BackupResult<()> {
164 let to_delete: HashSet<MetaSnapshotId> = HashSet::from_iter(ids.iter().cloned());
166 let mut new_manifest = (**self.manifest.read()).clone();
167 new_manifest.manifest_id += 1;
168 new_manifest
169 .snapshot_metadata
170 .retain(|m| !to_delete.contains(&m.id));
171 self.update_manifest(new_manifest).await?;
172
173 let paths = ids
174 .iter()
175 .map(|id| self.get_snapshot_path(*id))
176 .collect_vec();
177 self.store.delete_objects(&paths).await?;
178 Ok(())
179 }
180}
181
182impl From<ObjectError> for BackupError {
183 fn from(e: ObjectError) -> Self {
184 BackupError::BackupStorage(e.into())
185 }
186}
187
188pub async fn unused() -> ObjectStoreMetaSnapshotStorage {
190 ObjectStoreMetaSnapshotStorage::new(
191 "",
192 Arc::new(ObjectStoreImpl::InMem(MonitoredObjectStore::new(
193 InMemObjectStore::new(),
194 Arc::new(ObjectStoreMetrics::unused()),
195 Arc::new(ObjectStoreConfig::default()),
196 ))),
197 )
198 .await
199 .unwrap()
200}