risingwave_backup/
storage.rs1use std::collections::HashSet;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::config::ObjectStoreConfig;
20use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
21use risingwave_object_store::object::{
22 InMemObjectStore, MonitoredObjectStore, ObjectError, ObjectStoreImpl, ObjectStoreRef,
23};
24use tokio::sync::RwLock;
25
26use crate::meta_snapshot::{MetaSnapshot, Metadata};
27use crate::{
28 BackupError, BackupResult, MetaSnapshotId, MetaSnapshotManifest, MetaSnapshotMetadata,
29};
30
31pub type MetaSnapshotStorageRef = Arc<ObjectStoreMetaSnapshotStorage>;
32
33#[async_trait::async_trait]
34pub trait MetaSnapshotStorage: 'static + Sync + Send {
35 async fn create<S: Metadata>(
37 &self,
38 snapshot: &MetaSnapshot<S>,
39 remarks: Option<String>,
40 ) -> BackupResult<()>;
41
42 async fn get<S: Metadata>(&self, id: MetaSnapshotId) -> BackupResult<MetaSnapshot<S>>;
44
45 async fn manifest(&self) -> Arc<MetaSnapshotManifest>;
47
48 async fn refresh_manifest(&self) -> BackupResult<()>;
50
51 async fn delete(&self, ids: &[MetaSnapshotId]) -> BackupResult<()>;
53}
54
55#[derive(Clone)]
56pub struct ObjectStoreMetaSnapshotStorage {
57 path: String,
58 store: ObjectStoreRef,
59 manifest: Arc<RwLock<Arc<MetaSnapshotManifest>>>,
60}
61
62impl ObjectStoreMetaSnapshotStorage {
64 pub async fn new(path: &str, store: ObjectStoreRef) -> BackupResult<Self> {
65 let instance = Self {
66 path: path.to_owned(),
67 store,
68 manifest: Default::default(),
69 };
70 instance.refresh_manifest().await?;
71 Ok(instance)
72 }
73
74 async fn update_manifest(
75 &self,
76 update: impl FnOnce(MetaSnapshotManifest) -> MetaSnapshotManifest,
77 ) -> BackupResult<()> {
78 let mut guard = self.manifest.write().await;
79 let new_manifest = update((**guard).clone());
80 let bytes =
81 serde_json::to_vec(&new_manifest).map_err(|e| BackupError::Encoding(e.into()))?;
82 self.store
83 .upload(&self.get_manifest_path(), bytes.into())
84 .await?;
85 *guard = Arc::new(new_manifest);
86 Ok(())
87 }
88
89 async fn get_manifest(&self) -> BackupResult<Option<MetaSnapshotManifest>> {
90 let manifest_path = self.get_manifest_path();
91 let bytes = match self.store.read(&manifest_path, ..).await {
92 Ok(bytes) => bytes,
93 Err(e) => {
94 if e.is_object_not_found_error() {
95 return Ok(None);
96 }
97 return Err(e.into());
98 }
99 };
100 let manifest: MetaSnapshotManifest =
101 serde_json::from_slice(&bytes).map_err(|e| BackupError::Encoding(e.into()))?;
102 Ok(Some(manifest))
103 }
104
105 fn get_manifest_path(&self) -> String {
106 format!("{}/manifest.json", self.path)
107 }
108
109 fn get_snapshot_path(&self, id: MetaSnapshotId) -> String {
110 format!("{}/{}.snapshot", self.path, id)
111 }
112
113 #[allow(dead_code)]
114 fn get_snapshot_id_from_path(path: &str) -> MetaSnapshotId {
115 let split = path.split(&['/', '.']).collect_vec();
116 debug_assert!(split.len() > 2);
117 debug_assert!(split[split.len() - 1] == "snapshot");
118 split[split.len() - 2]
119 .parse::<MetaSnapshotId>()
120 .expect("valid meta snapshot id")
121 }
122}
123
124#[async_trait::async_trait]
125impl MetaSnapshotStorage for ObjectStoreMetaSnapshotStorage {
126 async fn create<S: Metadata>(
127 &self,
128 snapshot: &MetaSnapshot<S>,
129 remarks: Option<String>,
130 ) -> BackupResult<()> {
131 let path = self.get_snapshot_path(snapshot.id);
132 self.store.upload(&path, snapshot.encode()?.into()).await?;
133 self.update_manifest(|mut manifest: MetaSnapshotManifest| {
134 manifest.manifest_id += 1;
135 manifest.snapshot_metadata.push(MetaSnapshotMetadata::new(
136 snapshot.id,
137 snapshot.metadata.hummock_version_ref(),
138 snapshot.format_version,
139 remarks,
140 snapshot.metadata.table_change_log_object_ids().into_iter(),
141 ));
142 manifest
143 })
144 .await?;
145 Ok(())
146 }
147
148 async fn get<S: Metadata>(&self, id: MetaSnapshotId) -> BackupResult<MetaSnapshot<S>> {
149 let path = self.get_snapshot_path(id);
150 let data = self.store.read(&path, ..).await?;
151 MetaSnapshot::decode(&data)
152 }
153
154 async fn manifest(&self) -> Arc<MetaSnapshotManifest> {
155 self.manifest.read().await.clone()
156 }
157
158 async fn refresh_manifest(&self) -> BackupResult<()> {
159 if let Some(manifest) = self.get_manifest().await? {
160 let mut guard = self.manifest.write().await;
161 if manifest.manifest_id > guard.manifest_id {
162 *guard = Arc::new(manifest);
163 }
164 }
165 Ok(())
166 }
167
168 async fn delete(&self, ids: &[MetaSnapshotId]) -> BackupResult<()> {
169 let to_delete: HashSet<MetaSnapshotId> = HashSet::from_iter(ids.iter().cloned());
170 self.update_manifest(|mut manifest: MetaSnapshotManifest| {
171 manifest.manifest_id += 1;
172 manifest
173 .snapshot_metadata
174 .retain(|m| !to_delete.contains(&m.id));
175 manifest
176 })
177 .await?;
178 let paths = ids
179 .iter()
180 .map(|id| self.get_snapshot_path(*id))
181 .collect_vec();
182 self.store.delete_objects(&paths).await?;
183 Ok(())
184 }
185}
186
187impl From<ObjectError> for BackupError {
188 fn from(e: ObjectError) -> Self {
189 BackupError::BackupStorage(e.into())
190 }
191}
192
193pub async fn unused() -> ObjectStoreMetaSnapshotStorage {
195 ObjectStoreMetaSnapshotStorage::new(
196 "",
197 Arc::new(ObjectStoreImpl::InMem(MonitoredObjectStore::new(
198 InMemObjectStore::for_test(),
199 Arc::new(ObjectStoreMetrics::unused()),
200 Arc::new(ObjectStoreConfig::default()),
201 ))),
202 )
203 .await
204 .unwrap()
205}