risingwave_meta/backup_restore/
backup_manager.rs1use std::collections::HashSet;
16use std::sync::Arc;
17use std::time::Instant;
18
19use arc_swap::ArcSwap;
20use risingwave_backup::error::BackupError;
21use risingwave_backup::storage::{MetaSnapshotStorage, ObjectStoreMetaSnapshotStorage};
22use risingwave_backup::{MetaBackupJobId, MetaSnapshotId, MetaSnapshotManifest};
23use risingwave_common::bail;
24use risingwave_common::config::ObjectStoreConfig;
25use risingwave_common::system_param::reader::SystemParamsRead;
26use risingwave_hummock_sdk::HummockRawObjectId;
27use risingwave_object_store::object::build_remote_object_store;
28use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
29use risingwave_pb::backup_service::{BackupJobStatus, MetaBackupManifestId};
30use risingwave_pb::meta::subscribe_response::{Info, Operation};
31use thiserror_ext::AsReport;
32use tokio::task::JoinHandle;
33
34use crate::MetaResult;
35use crate::backup_restore::meta_snapshot_builder;
36use crate::backup_restore::metrics::BackupManagerMetrics;
37use crate::hummock::sequence::next_meta_backup_id;
38use crate::hummock::{HummockManagerRef, HummockVersionSafePoint};
39use crate::manager::{LocalNotification, MetaSrvEnv};
40use crate::rpc::metrics::MetaMetrics;
41
42pub enum BackupJobResult {
43 Succeeded,
44 Failed(BackupError),
45}
46
47struct BackupJobHandle {
49 job_id: u64,
50 #[expect(dead_code)]
51 hummock_version_safe_point: HummockVersionSafePoint,
52 start_time: Instant,
53}
54
55impl BackupJobHandle {
56 pub fn new(job_id: u64, hummock_version_safe_point: HummockVersionSafePoint) -> Self {
57 Self {
58 job_id,
59 hummock_version_safe_point,
60 start_time: Instant::now(),
61 }
62 }
63}
64
65pub type BackupManagerRef = Arc<BackupManager>;
66type StoreConfig = (String, String);
68
69pub struct BackupManager {
71 env: MetaSrvEnv,
72 hummock_manager: HummockManagerRef,
73 backup_store: ArcSwap<(ObjectStoreMetaSnapshotStorage, StoreConfig)>,
74 running_job_handle: tokio::sync::Mutex<Option<BackupJobHandle>>,
76 metrics: BackupManagerMetrics,
77 meta_metrics: Arc<MetaMetrics>,
78 latest_job_info: ArcSwap<(MetaBackupJobId, BackupJobStatus, String)>,
80}
81
82impl BackupManager {
83 pub async fn new(
84 env: MetaSrvEnv,
85 hummock_manager: HummockManagerRef,
86 metrics: Arc<MetaMetrics>,
87 store_url: &str,
88 store_dir: &str,
89 ) -> MetaResult<Arc<Self>> {
90 let store_config = (store_url.to_owned(), store_dir.to_owned());
91 let store = create_snapshot_store(
92 &store_config,
93 metrics.object_store_metric.clone(),
94 &env.opts.object_store_config,
95 )
96 .await?;
97 tracing::info!(
98 "backup manager initialized: url={}, dir={}",
99 store_config.0,
100 store_config.1
101 );
102 let instance = Arc::new(Self::with_store(
103 env.clone(),
104 hummock_manager,
105 metrics,
106 (store, store_config),
107 ));
108 let (local_notification_tx, mut local_notification_rx) =
109 tokio::sync::mpsc::unbounded_channel();
110 env.notification_manager()
111 .insert_local_sender(local_notification_tx);
112 let this = instance.clone();
113 tokio::spawn(async move {
114 loop {
115 match local_notification_rx.recv().await {
116 Some(notification) => {
117 if let LocalNotification::SystemParamsChange(p) = notification {
118 let new_config = (
119 p.backup_storage_url().to_owned(),
120 p.backup_storage_directory().to_owned(),
121 );
122 this.handle_new_config(new_config).await;
123 }
124 }
125 None => {
126 return;
127 }
128 }
129 }
130 });
131 Ok(instance)
132 }
133
134 async fn handle_new_config(&self, new_config: StoreConfig) {
135 if self.backup_store.load().1 == new_config {
136 return;
137 }
138 if let Err(e) = self.set_store(new_config.clone()).await {
139 tracing::warn!(
141 url = &new_config.0,
142 dir = &new_config.1,
143 error = %e.as_report(),
144 "failed to apply new backup config",
145 );
146 }
147 }
148
149 fn with_store(
150 env: MetaSrvEnv,
151 hummock_manager: HummockManagerRef,
152 meta_metrics: Arc<MetaMetrics>,
153 backup_store: (ObjectStoreMetaSnapshotStorage, StoreConfig),
154 ) -> Self {
155 Self {
156 env,
157 hummock_manager,
158 backup_store: ArcSwap::from_pointee(backup_store),
159 running_job_handle: tokio::sync::Mutex::new(None),
160 metrics: BackupManagerMetrics::default(),
161 meta_metrics,
162 latest_job_info: ArcSwap::from_pointee((0, BackupJobStatus::NotFound, "".into())),
163 }
164 }
165
166 pub async fn set_store(&self, config: StoreConfig) -> MetaResult<()> {
167 let new_store = create_snapshot_store(
168 &config,
169 self.meta_metrics.object_store_metric.clone(),
170 &self.env.opts.object_store_config,
171 )
172 .await?;
173 tracing::info!(
174 "new backup config is applied: url={}, dir={}",
175 config.0,
176 config.1
177 );
178 self.backup_store.store(Arc::new((new_store, config)));
179 Ok(())
180 }
181
182 #[cfg(test)]
183 pub async fn for_test(env: MetaSrvEnv, hummock_manager: HummockManagerRef) -> Self {
184 Self::with_store(
185 env,
186 hummock_manager,
187 Arc::new(MetaMetrics::default()),
188 (
189 risingwave_backup::storage::unused().await,
190 StoreConfig::default(),
191 ),
192 )
193 }
194
195 pub async fn start_backup_job(
198 self: &Arc<Self>,
199 remarks: Option<String>,
200 ) -> MetaResult<MetaBackupJobId> {
201 let mut guard = self.running_job_handle.lock().await;
202 if let Some(job) = (*guard).as_ref() {
203 bail!(format!(
204 "concurrent backup job is not supported: existent job {}",
205 job.job_id
206 ));
207 }
208 const MAX_META_SNAPSHOT_NUM: usize = 100;
214 let current_number = self
215 .backup_store
216 .load()
217 .0
218 .manifest()
219 .await
220 .snapshot_metadata
221 .len();
222 if current_number > MAX_META_SNAPSHOT_NUM {
223 bail!(format!(
224 "too many existent meta snapshots, expect at most {}",
225 MAX_META_SNAPSHOT_NUM
226 ))
227 }
228
229 let job_id = next_meta_backup_id(&self.env).await?;
230 self.latest_job_info
231 .store(Arc::new((job_id, BackupJobStatus::Running, "".into())));
232 let hummock_version_safe_point = self.hummock_manager.register_safe_point().await;
233 BackupWorker::new(self.clone()).start(job_id, remarks);
240 let job_handle = BackupJobHandle::new(job_id, hummock_version_safe_point);
241 *guard = Some(job_handle);
242 self.metrics.job_count.inc();
243 Ok(job_id)
244 }
245
246 pub fn get_backup_job_status(&self, job_id: MetaBackupJobId) -> (BackupJobStatus, String) {
247 let last = self.latest_job_info.load();
248 if last.0 == job_id {
249 return (last.1, last.2.clone());
250 }
251 (BackupJobStatus::NotFound, "".into())
252 }
253
254 async fn finish_backup_job(&self, job_id: MetaBackupJobId, job_result: BackupJobResult) {
255 let job_handle = self
257 .take_job_handle_by_job_id(job_id)
258 .await
259 .expect("job id should match");
260 let job_latency = job_handle.start_time.elapsed().as_secs_f64();
261 match job_result {
262 BackupJobResult::Succeeded => {
263 self.metrics.job_latency_success.observe(job_latency);
264 tracing::info!("succeeded backup job {}", job_id);
265 self.env
266 .notification_manager()
267 .notify_hummock_without_version(
268 Operation::Update,
269 Info::MetaBackupManifestId(MetaBackupManifestId {
270 id: self.backup_store.load().0.manifest().await.manifest_id,
271 }),
272 );
273 self.latest_job_info.store(Arc::new((
274 job_id,
275 BackupJobStatus::Succeeded,
276 "".into(),
277 )));
278 }
279 BackupJobResult::Failed(e) => {
280 self.metrics.job_latency_failure.observe(job_latency);
281 let message = format!("failed backup job {}: {}", job_id, e.as_report());
282 tracing::warn!(message);
283 self.latest_job_info
284 .store(Arc::new((job_id, BackupJobStatus::Failed, message)));
285 }
286 }
287 }
288
289 async fn take_job_handle_by_job_id(&self, job_id: u64) -> Option<BackupJobHandle> {
290 let mut guard = self.running_job_handle.lock().await;
291 match (*guard).as_ref() {
292 None => {
293 return None;
294 }
295 Some(job_handle) => {
296 if job_handle.job_id != job_id {
297 return None;
298 }
299 }
300 }
301 guard.take()
302 }
303
304 pub async fn delete_backups(&self, ids: &[MetaSnapshotId]) -> MetaResult<()> {
306 self.backup_store.load().0.delete(ids).await?;
307 self.env
308 .notification_manager()
309 .notify_hummock_without_version(
310 Operation::Update,
311 Info::MetaBackupManifestId(MetaBackupManifestId {
312 id: self.backup_store.load().0.manifest().await.manifest_id,
313 }),
314 );
315 Ok(())
316 }
317
318 pub async fn list_pinned_object_ids(&self) -> HashSet<HummockRawObjectId> {
320 self.backup_store
321 .load()
322 .0
323 .manifest()
324 .await
325 .snapshot_metadata
326 .iter()
327 .flat_map(|s| s.objects.iter().copied())
328 .collect()
329 }
330
331 pub async fn manifest(&self) -> Arc<MetaSnapshotManifest> {
332 self.backup_store.load().0.manifest().await
333 }
334}
335
336struct BackupWorker {
338 backup_manager: BackupManagerRef,
339}
340
341impl BackupWorker {
342 fn new(backup_manager: BackupManagerRef) -> Self {
343 Self { backup_manager }
344 }
345
346 fn start(self, job_id: u64, remarks: Option<String>) -> JoinHandle<()> {
347 let backup_manager_clone = self.backup_manager.clone();
348 let job = async move {
349 let hummock_manager = backup_manager_clone.hummock_manager.clone();
350 let hummock_version_builder = async move {
351 hummock_manager
352 .on_current_version(|version| version.clone())
353 .await
354 };
355 let meta_store = backup_manager_clone.env.meta_store();
356 let mut snapshot_builder =
357 meta_snapshot_builder::MetaSnapshotV2Builder::new(meta_store);
358 snapshot_builder
360 .build(job_id, hummock_version_builder)
361 .await?;
362 let snapshot = snapshot_builder.finish()?;
363 backup_manager_clone
364 .backup_store
365 .load()
366 .0
367 .create(&snapshot, remarks)
368 .await?;
369 Ok(BackupJobResult::Succeeded)
370 };
371 tokio::spawn(async move {
372 let job_result = job.await.unwrap_or_else(BackupJobResult::Failed);
373 self.backup_manager
374 .finish_backup_job(job_id, job_result)
375 .await;
376 })
377 }
378}
379
380async fn create_snapshot_store(
381 config: &StoreConfig,
382 metric: Arc<ObjectStoreMetrics>,
383 object_store_config: &ObjectStoreConfig,
384) -> MetaResult<ObjectStoreMetaSnapshotStorage> {
385 let object_store = Arc::new(
386 build_remote_object_store(
387 &config.0,
388 metric,
389 "Meta Backup",
390 Arc::new(object_store_config.clone()),
391 )
392 .await,
393 );
394 let store = ObjectStoreMetaSnapshotStorage::new(&config.1, object_store).await?;
395 Ok(store)
396}