risingwave_meta/backup_restore/
restore.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_backup::MetaSnapshotId;
18use risingwave_backup::error::{BackupError, BackupResult};
19use risingwave_backup::meta_snapshot::Metadata;
20use risingwave_backup::storage::{MetaSnapshotStorage, MetaSnapshotStorageRef};
21use risingwave_common::config::{MetaBackend, ObjectStoreConfig};
22use risingwave_hummock_sdk::version::HummockVersion;
23use risingwave_hummock_sdk::version_checkpoint_path;
24use risingwave_object_store::object::build_remote_object_store;
25use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
26use risingwave_pb::hummock::PbHummockVersionCheckpoint;
27use thiserror_ext::AsReport;
28
29use crate::backup_restore::restore_impl::v2::{LoaderV2, WriterModelV2ToMetaStoreV2};
30use crate::backup_restore::restore_impl::{Loader, Writer};
31use crate::backup_restore::utils::{get_backup_store, get_meta_store};
32use crate::controller::SqlMetaStore;
33
34/// Command-line arguments for restore.
35#[derive(clap::Args, Debug, Clone)]
36pub struct RestoreOpts {
37    /// Id of snapshot used to restore. Available snapshots can be found in
38    /// <`storage_directory>/manifest.json`.
39    #[clap(long)]
40    pub meta_snapshot_id: u64,
41    /// Type of meta store to restore.
42    #[clap(long, value_enum, default_value_t = MetaBackend::Mem)]
43    pub meta_store_type: MetaBackend,
44    #[clap(long, default_value_t = String::from(""))]
45    pub sql_endpoint: String,
46    /// Username of sql backend, required when meta backend set to MySQL or PostgreSQL.
47    #[clap(long, default_value = "")]
48    pub sql_username: String,
49    /// Password of sql backend, required when meta backend set to MySQL or PostgreSQL.
50    #[clap(long, default_value = "")]
51    pub sql_password: String,
52    /// Database of sql backend, required when meta backend set to MySQL or PostgreSQL.
53    #[clap(long, default_value = "")]
54    pub sql_database: String,
55    /// Url of storage to fetch meta snapshot from.
56    #[clap(long)]
57    pub backup_storage_url: String,
58    /// Directory of storage to fetch meta snapshot from.
59    #[clap(long, default_value_t = String::from("backup"))]
60    pub backup_storage_directory: String,
61    /// Url of storage to restore hummock version to.
62    #[clap(long)]
63    pub hummock_storage_url: String,
64    /// Directory of storage to restore hummock version to.
65    #[clap(long, default_value_t = String::from("hummock_001"))]
66    pub hummock_storage_directory: String,
67    /// Print the target snapshot, but won't restore to meta store.
68    #[clap(long)]
69    pub dry_run: bool,
70    /// The read timeout for object store
71    #[clap(long, default_value_t = 600000)]
72    pub read_attempt_timeout_ms: u64,
73    /// The maximum number of read retry attempts for the object store.
74    #[clap(long, default_value_t = 3)]
75    pub read_retry_attempts: u64,
76    #[clap(long, default_value_t = false)]
77    /// When enabled, some system parameters of in the restored meta store will be overwritten.
78    /// Specifically, system parameters `state_store`, `data_directory`, `backup_storage_url` and `backup_storage_directory` will be overwritten
79    /// with the specified opts `hummock_storage_url`, `hummock_storage_directory`, `overwrite_backup_storage_url` and `overwrite_backup_storage_directory`.
80    pub overwrite_hummock_storage_endpoint: bool,
81    #[clap(long, required = false)]
82    pub overwrite_backup_storage_url: Option<String>,
83    #[clap(long, required = false)]
84    pub overwrite_backup_storage_directory: Option<String>,
85}
86
87async fn restore_hummock_version(
88    hummock_storage_url: &str,
89    hummock_storage_directory: &str,
90    hummock_version: &HummockVersion,
91) -> BackupResult<()> {
92    let object_store = Arc::new(
93        build_remote_object_store(
94            hummock_storage_url,
95            Arc::new(ObjectStoreMetrics::unused()),
96            "Version Checkpoint",
97            Arc::new(ObjectStoreConfig::default()),
98        )
99        .await,
100    );
101    let checkpoint_path = version_checkpoint_path(hummock_storage_directory);
102    let checkpoint = PbHummockVersionCheckpoint {
103        version: Some(hummock_version.into()),
104        // Ignore stale objects. Full GC will clear them.
105        stale_objects: Default::default(),
106    };
107    use prost::Message;
108    let buf = checkpoint.encode_to_vec();
109    object_store
110        .upload(&checkpoint_path, buf.into())
111        .await
112        .map_err(|e| BackupError::StateStorage(e.into()))?;
113    Ok(())
114}
115
116/// Restores a meta store.
117/// Uses `meta_store` and `backup_store` if provided.
118/// Otherwise creates them based on `opts`.
119async fn restore_impl(
120    opts: RestoreOpts,
121    meta_store: Option<SqlMetaStore>,
122    backup_store: Option<MetaSnapshotStorageRef>,
123) -> BackupResult<()> {
124    if cfg!(not(test)) {
125        assert!(meta_store.is_none());
126        assert!(backup_store.is_none());
127    }
128    let meta_store = match meta_store {
129        None => get_meta_store(opts.clone()).await?,
130        Some(m) => m,
131    };
132    let backup_store = match backup_store {
133        None => get_backup_store(opts.clone()).await?,
134        Some(b) => b,
135    };
136    let target_id = opts.meta_snapshot_id;
137    let snapshot_list = &backup_store.manifest().snapshot_metadata;
138    if !snapshot_list.iter().any(|m| m.id == target_id) {
139        return Err(BackupError::Other(anyhow::anyhow!(
140            "snapshot id {} not found",
141            target_id
142        )));
143    }
144
145    let format_version = match snapshot_list.iter().find(|m| m.id == target_id) {
146        None => {
147            return Err(BackupError::Other(anyhow::anyhow!(
148                "snapshot id {} not found",
149                target_id
150            )));
151        }
152        Some(s) => s.format_version,
153    };
154    if format_version < 2 {
155        unimplemented!("not supported: write model V1 to meta store V2");
156    } else {
157        dispatch(
158            target_id,
159            &opts,
160            LoaderV2::new(backup_store),
161            WriterModelV2ToMetaStoreV2::new(meta_store.to_owned()),
162        )
163        .await?;
164    }
165
166    Ok(())
167}
168
169async fn dispatch<L: Loader<S>, W: Writer<S>, S: Metadata>(
170    target_id: MetaSnapshotId,
171    opts: &RestoreOpts,
172    loader: L,
173    writer: W,
174) -> BackupResult<()> {
175    // Validate parameters.
176    if opts.overwrite_hummock_storage_endpoint
177        && (opts.overwrite_backup_storage_url.is_none()
178            || opts.overwrite_backup_storage_directory.is_none())
179    {
180        return Err(BackupError::Other(anyhow::anyhow!("overwrite_hummock_storage_endpoint, overwrite_backup_storage_url, overwrite_backup_storage_directory must be set simultaneously".to_owned())));
181    }
182
183    // Restore meta store.
184    let target_snapshot = loader.load(target_id).await?;
185    if opts.dry_run {
186        return Ok(());
187    }
188    let hummock_version = target_snapshot.metadata.hummock_version_ref().clone();
189    writer.write(target_snapshot).await?;
190    if opts.overwrite_hummock_storage_endpoint {
191        writer
192            .overwrite(
193                &format!("hummock+{}", opts.hummock_storage_url),
194                &opts.hummock_storage_directory,
195                opts.overwrite_backup_storage_url.as_ref().unwrap(),
196                opts.overwrite_backup_storage_directory.as_ref().unwrap(),
197            )
198            .await?;
199    }
200
201    // Restore object store.
202    restore_hummock_version(
203        &opts.hummock_storage_url,
204        &opts.hummock_storage_directory,
205        &hummock_version,
206    )
207    .await?;
208    Ok(())
209}
210
211pub async fn restore(opts: RestoreOpts) -> BackupResult<()> {
212    tracing::info!("restore with opts: {:#?}", opts);
213    let result = restore_impl(opts, None, None).await;
214    match &result {
215        Ok(_) => {
216            tracing::info!("command succeeded");
217        }
218        Err(e) => {
219            tracing::warn!(error = %e.as_report(), "command failed");
220        }
221    }
222    result
223}
224
225// #[cfg(test)]
226// mod tests {
227//
228//     // use risingwave_backup::meta_snapshot_v1::MetaSnapshotV1;
229//     // use risingwave_common::config::{MetaBackend, SystemConfig};
230//     // use risingwave_pb::meta::SystemParams;
231//     //
232//     // use crate::backup_restore::RestoreOpts;
233//
234//     // type MetaSnapshot = MetaSnapshotV1;
235//
236//     // fn get_restore_opts() -> RestoreOpts {
237//     //     RestoreOpts {
238//     //         meta_snapshot_id: 1,
239//     //         meta_store_type: MetaBackend::Mem,
240//     //         sql_endpoint: "".to_string(),
241//     //         sql_username: "".to_string(),
242//     //         sql_password: "".to_string(),
243//     //         sql_database: "".to_string(),
244//     //         backup_storage_url: "memory".to_string(),
245//     //         backup_storage_directory: "".to_string(),
246//     //         hummock_storage_url: "memory".to_string(),
247//     //         hummock_storage_directory: "".to_string(),
248//     //         dry_run: false,
249//     //         read_attempt_timeout_ms: 60000,
250//     //         read_retry_attempts: 3,
251//     //     }
252//     // }
253//
254//     // fn get_system_params() -> SystemParams {
255//     //     SystemParams {
256//     //         state_store: Some("state_store".into()),
257//     //         data_directory: Some("data_directory".into()),
258//     //         use_new_object_prefix_strategy: Some(true),
259//     //         backup_storage_url: Some("backup_storage_url".into()),
260//     //         backup_storage_directory: Some("backup_storage_directory".into()),
261//     //         ..SystemConfig::default().into_init_system_params()
262//     //     }
263//     // }
264//
265//     // TODO: support in-memory sql restore tests.
266//     // #[tokio::test]
267//     // async fn test_restore_basic() {
268//     //     let opts = get_restore_opts();
269//     //     let backup_store = get_backup_store(opts.clone()).await.unwrap();
270//     //     let nonempty_meta_store = get_meta_store(opts.clone()).await.unwrap();
271//     //     dispatch_meta_store!(nonempty_meta_store.clone(), store, {
272//     //         let stats = HummockVersionStats::default();
273//     //         stats.insert(&store).await.unwrap();
274//     //     });
275//     //     let empty_meta_store = get_meta_store(opts.clone()).await.unwrap();
276//     //     let system_param = get_system_params();
277//     //     let snapshot = MetaSnapshot {
278//     //         id: opts.meta_snapshot_id,
279//     //         metadata: ClusterMetadata {
280//     //             hummock_version: {
281//     //                 let mut version = HummockVersion::default();
282//     //                 version.id = HummockVersionId::new(123);
283//     //                 version
284//     //             },
285//     //             system_param: system_param.clone(),
286//     //             ..Default::default()
287//     //         },
288//     //         ..Default::default()
289//     //     };
290//     //
291//     //     // target snapshot not found
292//     //     restore_impl(opts.clone(), None, Some(backup_store.clone()))
293//     //         .await
294//     //         .unwrap_err();
295//     //
296//     //     backup_store.create(&snapshot, None).await.unwrap();
297//     //     restore_impl(opts.clone(), None, Some(backup_store.clone()))
298//     //         .await
299//     //         .unwrap();
300//     //
301//     //     // target meta store not empty
302//     //     restore_impl(
303//     //         opts.clone(),
304//     //         Some(nonempty_meta_store),
305//     //         Some(backup_store.clone()),
306//     //     )
307//     //     .await
308//     //     .unwrap_err();
309//     //
310//     //     restore_impl(
311//     //         opts.clone(),
312//     //         Some(empty_meta_store.clone()),
313//     //         Some(backup_store.clone()),
314//     //     )
315//     //     .await
316//     //     .unwrap();
317//     //
318//     //     dispatch_meta_store!(empty_meta_store, store, {
319//     //         let restored_system_param = SystemParams::get(&store).await.unwrap().unwrap();
320//     //         assert_eq!(restored_system_param, system_param);
321//     //     });
322//     // }
323//     //
324//     // #[tokio::test]
325//     // async fn test_restore_default_cf() {
326//     //     let opts = get_restore_opts();
327//     //     let backup_store = get_backup_store(opts.clone()).await.unwrap();
328//     //     let snapshot = MetaSnapshot {
329//     //         id: opts.meta_snapshot_id,
330//     //         metadata: ClusterMetadata {
331//     //             default_cf: HashMap::from([(vec![1u8, 2u8], memcomparable::to_vec(&10).unwrap())]),
332//     //             system_param: get_system_params(),
333//     //             ..Default::default()
334//     //         },
335//     //         ..Default::default()
336//     //     };
337//     //     backup_store.create(&snapshot, None).await.unwrap();
338//     //
339//     //     // `snapshot_2` is a superset of `snapshot`
340//     //     let mut snapshot_2 = MetaSnapshot {
341//     //         id: snapshot.id + 1,
342//     //         ..snapshot.clone()
343//     //     };
344//     //     snapshot_2
345//     //         .metadata
346//     //         .default_cf
347//     //         .insert(vec![1u8, 2u8], memcomparable::to_vec(&10).unwrap());
348//     //     snapshot_2
349//     //         .metadata
350//     //         .default_cf
351//     //         .insert(vec![10u8, 20u8], memcomparable::to_vec(&10).unwrap());
352//     //     backup_store.create(&snapshot_2, None).await.unwrap();
353//     //     let empty_meta_store = get_meta_store(opts.clone()).await.unwrap();
354//     //     restore_impl(
355//     //         opts.clone(),
356//     //         Some(empty_meta_store.clone()),
357//     //         Some(backup_store.clone()),
358//     //     )
359//     //     .await
360//     //     .unwrap();
361//     //     dispatch_meta_store!(empty_meta_store, store, {
362//     //         let mut kvs = store
363//     //             .list_cf(DEFAULT_COLUMN_FAMILY)
364//     //             .await
365//     //             .unwrap()
366//     //             .into_iter()
367//     //             .map(|(_, v)| v)
368//     //             .collect_vec();
369//     //         kvs.sort();
370//     //         assert_eq!(
371//     //             kvs,
372//     //             vec![
373//     //                 memcomparable::to_vec(&10).unwrap(),
374//     //                 memcomparable::to_vec(&10).unwrap()
375//     //             ]
376//     //         );
377//     //     });
378//     // }
379//     //
380//     // #[tokio::test]
381//     // #[should_panic]
382//     // async fn test_sanity_check_superset_requirement() {
383//     //     let opts = get_restore_opts();
384//     //     let backup_store = get_backup_store(opts.clone()).await.unwrap();
385//     //     let snapshot = MetaSnapshot {
386//     //         id: opts.meta_snapshot_id,
387//     //         metadata: ClusterMetadata {
388//     //             default_cf: HashMap::from([(vec![1u8, 2u8], memcomparable::to_vec(&10).unwrap())]),
389//     //             system_param: get_system_params(),
390//     //             ..Default::default()
391//     //         },
392//     //         ..Default::default()
393//     //     };
394//     //     backup_store.create(&snapshot, None).await.unwrap();
395//     //
396//     //     // violate superset requirement
397//     //     let mut snapshot_2 = MetaSnapshot {
398//     //         id: snapshot.id + 1,
399//     //         ..Default::default()
400//     //     };
401//     //     snapshot_2
402//     //         .metadata
403//     //         .default_cf
404//     //         .insert(vec![10u8, 20u8], memcomparable::to_vec(&1).unwrap());
405//     //     backup_store.create(&snapshot_2, None).await.unwrap();
406//     //     restore_impl(opts.clone(), None, Some(backup_store.clone()))
407//     //         .await
408//     //         .unwrap();
409//     // }
410//     //
411//     // #[tokio::test]
412//     // #[should_panic]
413//     // async fn test_sanity_check_monotonicity_requirement() {
414//     //     let opts = get_restore_opts();
415//     //     let backup_store = get_backup_store(opts.clone()).await.unwrap();
416//     //     let snapshot = MetaSnapshot {
417//     //         id: opts.meta_snapshot_id,
418//     //         metadata: ClusterMetadata {
419//     //             default_cf: HashMap::from([(vec![1u8, 2u8], memcomparable::to_vec(&10).unwrap())]),
420//     //             system_param: get_system_params(),
421//     //             ..Default::default()
422//     //         },
423//     //         ..Default::default()
424//     //     };
425//     //     backup_store.create(&snapshot, None).await.unwrap();
426//     //
427//     //     // violate monotonicity requirement
428//     //     let mut snapshot_2 = MetaSnapshot {
429//     //         id: snapshot.id + 1,
430//     //         ..Default::default()
431//     //     };
432//     //     snapshot_2
433//     //         .metadata
434//     //         .default_cf
435//     //         .insert(vec![1u8, 2u8], memcomparable::to_vec(&9).unwrap());
436//     //     backup_store.create(&snapshot_2, None).await.unwrap();
437//     //     restore_impl(opts.clone(), None, Some(backup_store.clone()))
438//     //         .await
439//     //         .unwrap();
440//     // }
441//     //
442//     // #[tokio::test]
443//     // async fn test_dry_run() {
444//     //     let mut opts = get_restore_opts();
445//     //     assert!(!opts.dry_run);
446//     //     opts.dry_run = true;
447//     //     let backup_store = get_backup_store(opts.clone()).await.unwrap();
448//     //     let empty_meta_store = get_meta_store(opts.clone()).await.unwrap();
449//     //     let system_param = get_system_params();
450//     //     let snapshot = MetaSnapshot {
451//     //         id: opts.meta_snapshot_id,
452//     //         metadata: ClusterMetadata {
453//     //             default_cf: HashMap::from([
454//     //                 (
455//     //                     "some_key_1".as_bytes().to_vec(),
456//     //                     memcomparable::to_vec(&10).unwrap(),
457//     //                 ),
458//     //                 (
459//     //                     "some_key_2".as_bytes().to_vec(),
460//     //                     memcomparable::to_vec(&"some_value_2".to_string()).unwrap(),
461//     //                 ),
462//     //             ]),
463//     //             hummock_version: {
464//     //                 let mut version = HummockVersion::default();
465//     //                 version.id = HummockVersionId::new(123);
466//     //                 version
467//     //             },
468//     //             system_param: system_param.clone(),
469//     //             ..Default::default()
470//     //         },
471//     //         ..Default::default()
472//     //     };
473//     //     backup_store.create(&snapshot, None).await.unwrap();
474//     //     restore_impl(
475//     //         opts.clone(),
476//     //         Some(empty_meta_store.clone()),
477//     //         Some(backup_store.clone()),
478//     //     )
479//     //     .await
480//     //     .unwrap();
481//     //
482//     //     dispatch_meta_store!(empty_meta_store, store, {
483//     //         assert!(SystemParams::get(&store).await.unwrap().is_none());
484//     //     });
485//     // }
486// }