risingwave_meta/backup_restore/restore.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_backup::MetaSnapshotId;
18use risingwave_backup::error::{BackupError, BackupResult};
19use risingwave_backup::meta_snapshot::Metadata;
20use risingwave_backup::storage::{MetaSnapshotStorage, MetaSnapshotStorageRef};
21use risingwave_common::config::{MetaBackend, ObjectStoreConfig};
22use risingwave_hummock_sdk::version::HummockVersion;
23use risingwave_hummock_sdk::version_checkpoint_path;
24use risingwave_object_store::object::build_remote_object_store;
25use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
26use risingwave_pb::hummock::PbHummockVersionCheckpoint;
27use thiserror_ext::AsReport;
28
29use crate::backup_restore::restore_impl::v2::{LoaderV2, WriterModelV2ToMetaStoreV2};
30use crate::backup_restore::restore_impl::{Loader, Writer};
31use crate::backup_restore::utils::{get_backup_store, get_meta_store};
32use crate::controller::SqlMetaStore;
33
34/// Command-line arguments for restore.
35#[derive(clap::Args, Debug, Clone)]
36pub struct RestoreOpts {
37 /// Id of snapshot used to restore. Available snapshots can be found in
38 /// <`storage_directory>/manifest.json`.
39 #[clap(long)]
40 pub meta_snapshot_id: u64,
41 /// Type of meta store to restore.
42 #[clap(long, value_enum, default_value_t = MetaBackend::Mem)]
43 pub meta_store_type: MetaBackend,
44 #[clap(long, default_value_t = String::from(""))]
45 pub sql_endpoint: String,
46 /// Username of sql backend, required when meta backend set to MySQL or PostgreSQL.
47 #[clap(long, default_value = "")]
48 pub sql_username: String,
49 /// Password of sql backend, required when meta backend set to MySQL or PostgreSQL.
50 #[clap(long, default_value = "")]
51 pub sql_password: String,
52 /// Database of sql backend, required when meta backend set to MySQL or PostgreSQL.
53 #[clap(long, default_value = "")]
54 pub sql_database: String,
55 /// Url of storage to fetch meta snapshot from.
56 #[clap(long)]
57 pub backup_storage_url: String,
58 /// Directory of storage to fetch meta snapshot from.
59 #[clap(long, default_value_t = String::from("backup"))]
60 pub backup_storage_directory: String,
61 /// Url of storage to restore hummock version to.
62 #[clap(long)]
63 pub hummock_storage_url: String,
64 /// Directory of storage to restore hummock version to.
65 #[clap(long, default_value_t = String::from("hummock_001"))]
66 pub hummock_storage_directory: String,
67 /// Print the target snapshot, but won't restore to meta store.
68 #[clap(long)]
69 pub dry_run: bool,
70 /// The read timeout for object store
71 #[clap(long, default_value_t = 600000)]
72 pub read_attempt_timeout_ms: u64,
73 /// The maximum number of read retry attempts for the object store.
74 #[clap(long, default_value_t = 3)]
75 pub read_retry_attempts: u64,
76 #[clap(long, default_value_t = false)]
77 /// When enabled, some system parameters of in the restored meta store will be overwritten.
78 /// Specifically, system parameters `state_store`, `data_directory`, `backup_storage_url` and `backup_storage_directory` will be overwritten
79 /// with the specified opts `hummock_storage_url`, `hummock_storage_directory`, `overwrite_backup_storage_url` and `overwrite_backup_storage_directory`.
80 pub overwrite_hummock_storage_endpoint: bool,
81 #[clap(long, required = false)]
82 pub overwrite_backup_storage_url: Option<String>,
83 #[clap(long, required = false)]
84 pub overwrite_backup_storage_directory: Option<String>,
85}
86
87async fn restore_hummock_version(
88 hummock_storage_url: &str,
89 hummock_storage_directory: &str,
90 hummock_version: &HummockVersion,
91) -> BackupResult<()> {
92 let object_store = Arc::new(
93 build_remote_object_store(
94 hummock_storage_url,
95 Arc::new(ObjectStoreMetrics::unused()),
96 "Version Checkpoint",
97 Arc::new(ObjectStoreConfig::default()),
98 )
99 .await,
100 );
101 let checkpoint_path = version_checkpoint_path(hummock_storage_directory);
102 let checkpoint = PbHummockVersionCheckpoint {
103 version: Some(hummock_version.into()),
104 // Ignore stale objects. Full GC will clear them.
105 stale_objects: Default::default(),
106 };
107 use prost::Message;
108 let buf = checkpoint.encode_to_vec();
109 object_store
110 .upload(&checkpoint_path, buf.into())
111 .await
112 .map_err(|e| BackupError::StateStorage(e.into()))?;
113 Ok(())
114}
115
116/// Restores a meta store.
117/// Uses `meta_store` and `backup_store` if provided.
118/// Otherwise creates them based on `opts`.
119async fn restore_impl(
120 opts: RestoreOpts,
121 meta_store: Option<SqlMetaStore>,
122 backup_store: Option<MetaSnapshotStorageRef>,
123) -> BackupResult<()> {
124 if cfg!(not(test)) {
125 assert!(meta_store.is_none());
126 assert!(backup_store.is_none());
127 }
128 let meta_store = match meta_store {
129 None => get_meta_store(opts.clone()).await?,
130 Some(m) => m,
131 };
132 let backup_store = match backup_store {
133 None => get_backup_store(opts.clone()).await?,
134 Some(b) => b,
135 };
136 let target_id = opts.meta_snapshot_id;
137 let snapshot_list = &backup_store.manifest().snapshot_metadata;
138 if !snapshot_list.iter().any(|m| m.id == target_id) {
139 return Err(BackupError::Other(anyhow::anyhow!(
140 "snapshot id {} not found",
141 target_id
142 )));
143 }
144
145 let format_version = match snapshot_list.iter().find(|m| m.id == target_id) {
146 None => {
147 return Err(BackupError::Other(anyhow::anyhow!(
148 "snapshot id {} not found",
149 target_id
150 )));
151 }
152 Some(s) => s.format_version,
153 };
154 if format_version < 2 {
155 unimplemented!("not supported: write model V1 to meta store V2");
156 } else {
157 dispatch(
158 target_id,
159 &opts,
160 LoaderV2::new(backup_store),
161 WriterModelV2ToMetaStoreV2::new(meta_store.to_owned()),
162 )
163 .await?;
164 }
165
166 Ok(())
167}
168
169async fn dispatch<L: Loader<S>, W: Writer<S>, S: Metadata>(
170 target_id: MetaSnapshotId,
171 opts: &RestoreOpts,
172 loader: L,
173 writer: W,
174) -> BackupResult<()> {
175 // Validate parameters.
176 if opts.overwrite_hummock_storage_endpoint
177 && (opts.overwrite_backup_storage_url.is_none()
178 || opts.overwrite_backup_storage_directory.is_none())
179 {
180 return Err(BackupError::Other(anyhow::anyhow!("overwrite_hummock_storage_endpoint, overwrite_backup_storage_url, overwrite_backup_storage_directory must be set simultaneously".to_owned())));
181 }
182
183 // Restore meta store.
184 let target_snapshot = loader.load(target_id).await?;
185 if opts.dry_run {
186 return Ok(());
187 }
188 let hummock_version = target_snapshot.metadata.hummock_version_ref().clone();
189 writer.write(target_snapshot).await?;
190 if opts.overwrite_hummock_storage_endpoint {
191 writer
192 .overwrite(
193 &format!("hummock+{}", opts.hummock_storage_url),
194 &opts.hummock_storage_directory,
195 opts.overwrite_backup_storage_url.as_ref().unwrap(),
196 opts.overwrite_backup_storage_directory.as_ref().unwrap(),
197 )
198 .await?;
199 }
200
201 // Restore object store.
202 restore_hummock_version(
203 &opts.hummock_storage_url,
204 &opts.hummock_storage_directory,
205 &hummock_version,
206 )
207 .await?;
208 Ok(())
209}
210
211pub async fn restore(opts: RestoreOpts) -> BackupResult<()> {
212 tracing::info!("restore with opts: {:#?}", opts);
213 let result = restore_impl(opts, None, None).await;
214 match &result {
215 Ok(_) => {
216 tracing::info!("command succeeded");
217 }
218 Err(e) => {
219 tracing::warn!(error = %e.as_report(), "command failed");
220 }
221 }
222 result
223}
224
225// #[cfg(test)]
226// mod tests {
227//
228// // use risingwave_backup::meta_snapshot_v1::MetaSnapshotV1;
229// // use risingwave_common::config::{MetaBackend, SystemConfig};
230// // use risingwave_pb::meta::SystemParams;
231// //
232// // use crate::backup_restore::RestoreOpts;
233//
234// // type MetaSnapshot = MetaSnapshotV1;
235//
236// // fn get_restore_opts() -> RestoreOpts {
237// // RestoreOpts {
238// // meta_snapshot_id: 1,
239// // meta_store_type: MetaBackend::Mem,
240// // sql_endpoint: "".to_string(),
241// // sql_username: "".to_string(),
242// // sql_password: "".to_string(),
243// // sql_database: "".to_string(),
244// // backup_storage_url: "memory".to_string(),
245// // backup_storage_directory: "".to_string(),
246// // hummock_storage_url: "memory".to_string(),
247// // hummock_storage_directory: "".to_string(),
248// // dry_run: false,
249// // read_attempt_timeout_ms: 60000,
250// // read_retry_attempts: 3,
251// // }
252// // }
253//
254// // fn get_system_params() -> SystemParams {
255// // SystemParams {
256// // state_store: Some("state_store".into()),
257// // data_directory: Some("data_directory".into()),
258// // use_new_object_prefix_strategy: Some(true),
259// // backup_storage_url: Some("backup_storage_url".into()),
260// // backup_storage_directory: Some("backup_storage_directory".into()),
261// // ..SystemConfig::default().into_init_system_params()
262// // }
263// // }
264//
265// // TODO: support in-memory sql restore tests.
266// // #[tokio::test]
267// // async fn test_restore_basic() {
268// // let opts = get_restore_opts();
269// // let backup_store = get_backup_store(opts.clone()).await.unwrap();
270// // let nonempty_meta_store = get_meta_store(opts.clone()).await.unwrap();
271// // dispatch_meta_store!(nonempty_meta_store.clone(), store, {
272// // let stats = HummockVersionStats::default();
273// // stats.insert(&store).await.unwrap();
274// // });
275// // let empty_meta_store = get_meta_store(opts.clone()).await.unwrap();
276// // let system_param = get_system_params();
277// // let snapshot = MetaSnapshot {
278// // id: opts.meta_snapshot_id,
279// // metadata: ClusterMetadata {
280// // hummock_version: {
281// // let mut version = HummockVersion::default();
282// // version.id = HummockVersionId::new(123);
283// // version
284// // },
285// // system_param: system_param.clone(),
286// // ..Default::default()
287// // },
288// // ..Default::default()
289// // };
290// //
291// // // target snapshot not found
292// // restore_impl(opts.clone(), None, Some(backup_store.clone()))
293// // .await
294// // .unwrap_err();
295// //
296// // backup_store.create(&snapshot, None).await.unwrap();
297// // restore_impl(opts.clone(), None, Some(backup_store.clone()))
298// // .await
299// // .unwrap();
300// //
301// // // target meta store not empty
302// // restore_impl(
303// // opts.clone(),
304// // Some(nonempty_meta_store),
305// // Some(backup_store.clone()),
306// // )
307// // .await
308// // .unwrap_err();
309// //
310// // restore_impl(
311// // opts.clone(),
312// // Some(empty_meta_store.clone()),
313// // Some(backup_store.clone()),
314// // )
315// // .await
316// // .unwrap();
317// //
318// // dispatch_meta_store!(empty_meta_store, store, {
319// // let restored_system_param = SystemParams::get(&store).await.unwrap().unwrap();
320// // assert_eq!(restored_system_param, system_param);
321// // });
322// // }
323// //
324// // #[tokio::test]
325// // async fn test_restore_default_cf() {
326// // let opts = get_restore_opts();
327// // let backup_store = get_backup_store(opts.clone()).await.unwrap();
328// // let snapshot = MetaSnapshot {
329// // id: opts.meta_snapshot_id,
330// // metadata: ClusterMetadata {
331// // default_cf: HashMap::from([(vec![1u8, 2u8], memcomparable::to_vec(&10).unwrap())]),
332// // system_param: get_system_params(),
333// // ..Default::default()
334// // },
335// // ..Default::default()
336// // };
337// // backup_store.create(&snapshot, None).await.unwrap();
338// //
339// // // `snapshot_2` is a superset of `snapshot`
340// // let mut snapshot_2 = MetaSnapshot {
341// // id: snapshot.id + 1,
342// // ..snapshot.clone()
343// // };
344// // snapshot_2
345// // .metadata
346// // .default_cf
347// // .insert(vec![1u8, 2u8], memcomparable::to_vec(&10).unwrap());
348// // snapshot_2
349// // .metadata
350// // .default_cf
351// // .insert(vec![10u8, 20u8], memcomparable::to_vec(&10).unwrap());
352// // backup_store.create(&snapshot_2, None).await.unwrap();
353// // let empty_meta_store = get_meta_store(opts.clone()).await.unwrap();
354// // restore_impl(
355// // opts.clone(),
356// // Some(empty_meta_store.clone()),
357// // Some(backup_store.clone()),
358// // )
359// // .await
360// // .unwrap();
361// // dispatch_meta_store!(empty_meta_store, store, {
362// // let mut kvs = store
363// // .list_cf(DEFAULT_COLUMN_FAMILY)
364// // .await
365// // .unwrap()
366// // .into_iter()
367// // .map(|(_, v)| v)
368// // .collect_vec();
369// // kvs.sort();
370// // assert_eq!(
371// // kvs,
372// // vec![
373// // memcomparable::to_vec(&10).unwrap(),
374// // memcomparable::to_vec(&10).unwrap()
375// // ]
376// // );
377// // });
378// // }
379// //
380// // #[tokio::test]
381// // #[should_panic]
382// // async fn test_sanity_check_superset_requirement() {
383// // let opts = get_restore_opts();
384// // let backup_store = get_backup_store(opts.clone()).await.unwrap();
385// // let snapshot = MetaSnapshot {
386// // id: opts.meta_snapshot_id,
387// // metadata: ClusterMetadata {
388// // default_cf: HashMap::from([(vec![1u8, 2u8], memcomparable::to_vec(&10).unwrap())]),
389// // system_param: get_system_params(),
390// // ..Default::default()
391// // },
392// // ..Default::default()
393// // };
394// // backup_store.create(&snapshot, None).await.unwrap();
395// //
396// // // violate superset requirement
397// // let mut snapshot_2 = MetaSnapshot {
398// // id: snapshot.id + 1,
399// // ..Default::default()
400// // };
401// // snapshot_2
402// // .metadata
403// // .default_cf
404// // .insert(vec![10u8, 20u8], memcomparable::to_vec(&1).unwrap());
405// // backup_store.create(&snapshot_2, None).await.unwrap();
406// // restore_impl(opts.clone(), None, Some(backup_store.clone()))
407// // .await
408// // .unwrap();
409// // }
410// //
411// // #[tokio::test]
412// // #[should_panic]
413// // async fn test_sanity_check_monotonicity_requirement() {
414// // let opts = get_restore_opts();
415// // let backup_store = get_backup_store(opts.clone()).await.unwrap();
416// // let snapshot = MetaSnapshot {
417// // id: opts.meta_snapshot_id,
418// // metadata: ClusterMetadata {
419// // default_cf: HashMap::from([(vec![1u8, 2u8], memcomparable::to_vec(&10).unwrap())]),
420// // system_param: get_system_params(),
421// // ..Default::default()
422// // },
423// // ..Default::default()
424// // };
425// // backup_store.create(&snapshot, None).await.unwrap();
426// //
427// // // violate monotonicity requirement
428// // let mut snapshot_2 = MetaSnapshot {
429// // id: snapshot.id + 1,
430// // ..Default::default()
431// // };
432// // snapshot_2
433// // .metadata
434// // .default_cf
435// // .insert(vec![1u8, 2u8], memcomparable::to_vec(&9).unwrap());
436// // backup_store.create(&snapshot_2, None).await.unwrap();
437// // restore_impl(opts.clone(), None, Some(backup_store.clone()))
438// // .await
439// // .unwrap();
440// // }
441// //
442// // #[tokio::test]
443// // async fn test_dry_run() {
444// // let mut opts = get_restore_opts();
445// // assert!(!opts.dry_run);
446// // opts.dry_run = true;
447// // let backup_store = get_backup_store(opts.clone()).await.unwrap();
448// // let empty_meta_store = get_meta_store(opts.clone()).await.unwrap();
449// // let system_param = get_system_params();
450// // let snapshot = MetaSnapshot {
451// // id: opts.meta_snapshot_id,
452// // metadata: ClusterMetadata {
453// // default_cf: HashMap::from([
454// // (
455// // "some_key_1".as_bytes().to_vec(),
456// // memcomparable::to_vec(&10).unwrap(),
457// // ),
458// // (
459// // "some_key_2".as_bytes().to_vec(),
460// // memcomparable::to_vec(&"some_value_2".to_string()).unwrap(),
461// // ),
462// // ]),
463// // hummock_version: {
464// // let mut version = HummockVersion::default();
465// // version.id = HummockVersionId::new(123);
466// // version
467// // },
468// // system_param: system_param.clone(),
469// // ..Default::default()
470// // },
471// // ..Default::default()
472// // };
473// // backup_store.create(&snapshot, None).await.unwrap();
474// // restore_impl(
475// // opts.clone(),
476// // Some(empty_meta_store.clone()),
477// // Some(backup_store.clone()),
478// // )
479// // .await
480// // .unwrap();
481// //
482// // dispatch_meta_store!(empty_meta_store, store, {
483// // assert!(SystemParams::get(&store).await.unwrap().is_none());
484// // });
485// // }
486// }