risingwave_meta/backup_restore/
restore.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use futures::TryStreamExt;
20use risingwave_backup::MetaSnapshotId;
21use risingwave_backup::error::{BackupError, BackupResult};
22use risingwave_backup::meta_snapshot::Metadata;
23use risingwave_backup::storage::{MetaSnapshotStorage, MetaSnapshotStorageRef};
24use risingwave_common::config::{MetaBackend, ObjectStoreConfig};
25use risingwave_hummock_sdk::version::HummockVersion;
26use risingwave_hummock_sdk::{HummockSstableObjectId, OBJECT_SUFFIX, version_checkpoint_path};
27use risingwave_object_store::object::build_remote_object_store;
28use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
29use risingwave_pb::hummock::PbHummockVersionCheckpoint;
30use thiserror_ext::AsReport;
31
32use crate::backup_restore::restore_impl::v2::{LoaderV2, WriterModelV2ToMetaStoreV2};
33use crate::backup_restore::restore_impl::{Loader, Writer};
34use crate::backup_restore::utils::{get_backup_store, get_meta_store};
35use crate::controller::SqlMetaStore;
36
37/// Command-line arguments for restore.
38#[derive(clap::Args, Debug, Clone)]
39pub struct RestoreOpts {
40    /// Id of snapshot used to restore. Available snapshots can be found in
41    /// <`storage_directory>/manifest.json`.
42    #[clap(long)]
43    pub meta_snapshot_id: u64,
44    /// Type of meta store to restore.
45    #[clap(long, value_enum, default_value_t = MetaBackend::Mem)]
46    pub meta_store_type: MetaBackend,
47    #[clap(long, default_value_t = String::from(""))]
48    pub sql_endpoint: String,
49    /// Username of sql backend, required when meta backend set to MySQL or PostgreSQL.
50    #[clap(long, default_value = "")]
51    pub sql_username: String,
52    /// Password of sql backend, required when meta backend set to MySQL or PostgreSQL.
53    #[clap(long, default_value = "")]
54    pub sql_password: String,
55    /// Database of sql backend, required when meta backend set to MySQL or PostgreSQL.
56    #[clap(long, default_value = "")]
57    pub sql_database: String,
58    /// Url of storage to fetch meta snapshot from.
59    #[clap(long)]
60    pub backup_storage_url: String,
61    /// Directory of storage to fetch meta snapshot from.
62    #[clap(long, default_value_t = String::from("backup"))]
63    pub backup_storage_directory: String,
64    /// Url of storage to restore hummock version to.
65    #[clap(long)]
66    pub hummock_storage_url: String,
67    /// Directory of storage to restore hummock version to.
68    #[clap(long, default_value_t = String::from("hummock_001"))]
69    pub hummock_storage_directory: String,
70    /// Print the target snapshot, but won't restore to meta store.
71    #[clap(long, default_value_t = false)]
72    pub dry_run: bool,
73    /// The read timeout for object store
74    #[clap(long, default_value_t = 600000)]
75    pub read_attempt_timeout_ms: u64,
76    /// The maximum number of read retry attempts for the object store.
77    #[clap(long, default_value_t = 3)]
78    pub read_retry_attempts: u64,
79    #[clap(long, default_value_t = false)]
80    /// When enabled, some system parameters of in the restored meta store will be overwritten.
81    /// Specifically, system parameters `state_store`, `data_directory`, `backup_storage_url` and `backup_storage_directory` will be overwritten
82    /// with the specified opts `hummock_storage_url`, `hummock_storage_directory`, `overwrite_backup_storage_url` and `overwrite_backup_storage_directory`.
83    pub overwrite_hummock_storage_endpoint: bool,
84    #[clap(long, required = false)]
85    pub overwrite_backup_storage_url: Option<String>,
86    #[clap(long, required = false)]
87    pub overwrite_backup_storage_directory: Option<String>,
88    /// Verify that all referenced objects exist in object store.
89    #[clap(long, default_value_t = false)]
90    pub validate_integrity: bool,
91}
92
93async fn restore_hummock_version(
94    hummock_storage_url: &str,
95    hummock_storage_directory: &str,
96    hummock_version: &HummockVersion,
97) -> BackupResult<()> {
98    let object_store = Arc::new(
99        build_remote_object_store(
100            hummock_storage_url,
101            Arc::new(ObjectStoreMetrics::unused()),
102            "Version Checkpoint",
103            Arc::new(ObjectStoreConfig::default()),
104        )
105        .await,
106    );
107    let checkpoint_path = version_checkpoint_path(hummock_storage_directory);
108    let checkpoint = PbHummockVersionCheckpoint {
109        version: Some(hummock_version.into()),
110        // Ignore stale objects. Full GC will clear them.
111        stale_objects: Default::default(),
112    };
113    use prost::Message;
114    let buf = checkpoint.encode_to_vec();
115    object_store
116        .upload(&checkpoint_path, buf.into())
117        .await
118        .map_err(|e| BackupError::StateStorage(e.into()))?;
119    Ok(())
120}
121
122/// Restores a meta store.
123/// Uses `meta_store` and `backup_store` if provided.
124/// Otherwise creates them based on `opts`.
125async fn restore_impl(
126    opts: RestoreOpts,
127    meta_store: Option<SqlMetaStore>,
128    backup_store: Option<MetaSnapshotStorageRef>,
129) -> BackupResult<()> {
130    if cfg!(not(test)) {
131        assert!(meta_store.is_none());
132        assert!(backup_store.is_none());
133    }
134    let meta_store = match meta_store {
135        None => get_meta_store(opts.clone()).await?,
136        Some(m) => m,
137    };
138    let backup_store = match backup_store {
139        None => get_backup_store(opts.clone()).await?,
140        Some(b) => b,
141    };
142    let target_id = opts.meta_snapshot_id;
143    let snapshot_list = &backup_store.manifest().snapshot_metadata;
144    let snapshot = match snapshot_list.iter().find(|m| m.id == target_id) {
145        None => {
146            return Err(BackupError::Other(anyhow::anyhow!(
147                "snapshot id {} not found",
148                target_id
149            )));
150        }
151        Some(s) => s,
152    };
153
154    if opts.validate_integrity {
155        tracing::info!("Start integrity validation.");
156        validate_integrity(
157            snapshot.ssts.clone(),
158            &opts.hummock_storage_url,
159            &opts.hummock_storage_directory,
160        )
161        .await
162        .inspect_err(|_| tracing::error!("Fail integrity validation."))?;
163        tracing::info!("Succeed integrity validation.");
164    }
165
166    let format_version = snapshot.format_version;
167    if format_version < 2 {
168        unimplemented!("not supported: write model V1 to meta store V2");
169    } else {
170        dispatch(
171            target_id,
172            &opts,
173            LoaderV2::new(backup_store),
174            WriterModelV2ToMetaStoreV2::new(meta_store.to_owned()),
175        )
176        .await?;
177    }
178
179    Ok(())
180}
181
182async fn dispatch<L: Loader<S>, W: Writer<S>, S: Metadata>(
183    target_id: MetaSnapshotId,
184    opts: &RestoreOpts,
185    loader: L,
186    writer: W,
187) -> BackupResult<()> {
188    // Validate parameters.
189    if opts.overwrite_hummock_storage_endpoint
190        && (opts.overwrite_backup_storage_url.is_none()
191            || opts.overwrite_backup_storage_directory.is_none())
192    {
193        return Err(BackupError::Other(anyhow::anyhow!("overwrite_hummock_storage_endpoint, overwrite_backup_storage_url, overwrite_backup_storage_directory must be set simultaneously".to_owned())));
194    }
195
196    // Restore meta store.
197    let target_snapshot = loader.load(target_id).await?;
198    if opts.dry_run {
199        tracing::info!("Complete dry run.");
200        return Ok(());
201    }
202    let hummock_version = target_snapshot.metadata.hummock_version_ref().clone();
203    writer.write(target_snapshot).await?;
204    if opts.overwrite_hummock_storage_endpoint {
205        writer
206            .overwrite(
207                &format!("hummock+{}", opts.hummock_storage_url),
208                &opts.hummock_storage_directory,
209                opts.overwrite_backup_storage_url.as_ref().unwrap(),
210                opts.overwrite_backup_storage_directory.as_ref().unwrap(),
211            )
212            .await?;
213    }
214
215    // Restore object store.
216    restore_hummock_version(
217        &opts.hummock_storage_url,
218        &opts.hummock_storage_directory,
219        &hummock_version,
220    )
221    .await?;
222    Ok(())
223}
224
225pub async fn restore(opts: RestoreOpts) -> BackupResult<()> {
226    tracing::info!("restore with opts: {:#?}", opts);
227    let result = restore_impl(opts, None, None).await;
228    match &result {
229        Ok(_) => {
230            tracing::info!("command succeeded");
231        }
232        Err(e) => {
233            tracing::warn!(error = %e.as_report(), "command failed");
234        }
235    }
236    result
237}
238
239async fn validate_integrity(
240    mut object_ids: HashSet<HummockSstableObjectId>,
241    hummock_storage_url: &str,
242    hummock_storage_directory: &str,
243) -> BackupResult<()> {
244    fn try_get_object_id_from_path(path: &str) -> Option<HummockSstableObjectId> {
245        let split: Vec<_> = path.split(&['/', '.']).collect();
246        if split.len() <= 2 {
247            return None;
248        }
249        if split[split.len() - 1] != OBJECT_SUFFIX {
250            return None;
251        }
252        let id = split[split.len() - 2]
253            .parse::<HummockSstableObjectId>()
254            .unwrap_or_else(|_| panic!("expect valid sst id, got {}", split[split.len() - 2]));
255        Some(id)
256    }
257    tracing::info!("expect {} objects", object_ids.len());
258    let object_store = Arc::new(
259        build_remote_object_store(
260            hummock_storage_url,
261            Arc::new(ObjectStoreMetrics::unused()),
262            "Version Checkpoint",
263            Arc::new(ObjectStoreConfig::default()),
264        )
265        .await,
266    );
267    let mut iter = object_store
268        .list(hummock_storage_directory, None, None)
269        .await?;
270    while let Some(obj) = iter.try_next().await? {
271        let Some(obj_id) = try_get_object_id_from_path(&obj.key) else {
272            continue;
273        };
274        if object_ids.remove(&obj_id) && object_ids.is_empty() {
275            break;
276        }
277    }
278    if object_ids.is_empty() {
279        return Ok(());
280    }
281    Err(BackupError::Other(anyhow!(
282        "referenced objects not found in object store: {:?}",
283        object_ids
284    )))
285}