risingwave_meta/backup_restore/
restore.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use futures::TryStreamExt;
20use risingwave_backup::MetaSnapshotId;
21use risingwave_backup::error::{BackupError, BackupResult};
22use risingwave_backup::meta_snapshot::Metadata;
23use risingwave_backup::storage::{MetaSnapshotStorage, MetaSnapshotStorageRef};
24use risingwave_common::config::{MetaBackend, ObjectStoreConfig};
25use risingwave_hummock_sdk::version::HummockVersion;
26use risingwave_hummock_sdk::{
27    HummockRawObjectId, try_get_object_id_from_path, version_checkpoint_path,
28};
29use risingwave_object_store::object::build_remote_object_store;
30use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
31use risingwave_pb::hummock::PbHummockVersionCheckpoint;
32use thiserror_ext::AsReport;
33
34use crate::backup_restore::restore_impl::v2::{LoaderV2, WriterModelV2ToMetaStoreV2};
35use crate::backup_restore::restore_impl::{Loader, Writer};
36use crate::backup_restore::utils::{get_backup_store, get_meta_store};
37use crate::controller::SqlMetaStore;
38
39/// Command-line arguments for restore.
40#[derive(clap::Args, Debug, Clone)]
41pub struct RestoreOpts {
42    /// Id of snapshot used to restore. Available snapshots can be found in
43    /// <`storage_directory>/manifest.json`.
44    #[clap(long)]
45    pub meta_snapshot_id: u64,
46    /// Type of meta store to restore.
47    #[clap(long, value_enum, default_value_t = MetaBackend::Mem)]
48    pub meta_store_type: MetaBackend,
49    #[clap(long, default_value_t = String::from(""))]
50    pub sql_endpoint: String,
51    /// Username of sql backend, required when meta backend set to MySQL or PostgreSQL.
52    #[clap(long, default_value = "")]
53    pub sql_username: String,
54    /// Password of sql backend, required when meta backend set to MySQL or PostgreSQL.
55    #[clap(long, default_value = "")]
56    pub sql_password: String,
57    /// Database of sql backend, required when meta backend set to MySQL or PostgreSQL.
58    #[clap(long, default_value = "")]
59    pub sql_database: String,
60    /// Params for the URL connection, such as `sslmode=disable`.
61    /// Example: `param1=value1&param2=value2`
62    #[clap(long, required = false)]
63    pub sql_url_params: Option<String>,
64    /// Url of storage to fetch meta snapshot from.
65    #[clap(long)]
66    pub backup_storage_url: String,
67    /// Directory of storage to fetch meta snapshot from.
68    #[clap(long, default_value_t = String::from("backup"))]
69    pub backup_storage_directory: String,
70    /// Url of storage to restore hummock version to.
71    #[clap(long)]
72    pub hummock_storage_url: String,
73    /// Directory of storage to restore hummock version to.
74    #[clap(long, default_value_t = String::from("hummock_001"))]
75    pub hummock_storage_directory: String,
76    /// Print the target snapshot, but won't restore to meta store.
77    #[clap(long, default_value_t = false)]
78    pub dry_run: bool,
79    /// The read timeout for object store
80    #[clap(long, default_value_t = 600000)]
81    pub read_attempt_timeout_ms: u64,
82    /// The maximum number of read retry attempts for the object store.
83    #[clap(long, default_value_t = 3)]
84    pub read_retry_attempts: u64,
85    #[clap(long, default_value_t = false)]
86    /// When enabled, some system parameters of in the restored meta store will be overwritten.
87    /// Specifically, system parameters `state_store`, `data_directory`, `backup_storage_url` and `backup_storage_directory` will be overwritten
88    /// with the specified opts `hummock_storage_url`, `hummock_storage_directory`, `overwrite_backup_storage_url` and `overwrite_backup_storage_directory`.
89    pub overwrite_hummock_storage_endpoint: bool,
90    #[clap(long, required = false)]
91    pub overwrite_backup_storage_url: Option<String>,
92    #[clap(long, required = false)]
93    pub overwrite_backup_storage_directory: Option<String>,
94    /// Verify that all referenced objects exist in object store.
95    #[clap(long, default_value_t = false)]
96    pub validate_integrity: bool,
97}
98
99async fn restore_hummock_version(
100    hummock_storage_url: &str,
101    hummock_storage_directory: &str,
102    hummock_version: &HummockVersion,
103) -> BackupResult<()> {
104    let object_store = Arc::new(
105        build_remote_object_store(
106            hummock_storage_url,
107            Arc::new(ObjectStoreMetrics::unused()),
108            "Version Checkpoint",
109            Arc::new(ObjectStoreConfig::default()),
110        )
111        .await,
112    );
113    let checkpoint_path = version_checkpoint_path(hummock_storage_directory);
114    let checkpoint = PbHummockVersionCheckpoint {
115        version: Some(hummock_version.into()),
116        // Ignore stale objects. Full GC will clear them.
117        stale_objects: Default::default(),
118    };
119    use prost::Message;
120    let buf = checkpoint.encode_to_vec();
121    object_store
122        .upload(&checkpoint_path, buf.into())
123        .await
124        .map_err(|e| BackupError::StateStorage(e.into()))?;
125    Ok(())
126}
127
128/// Restores a meta store.
129/// Uses `meta_store` and `backup_store` if provided.
130/// Otherwise creates them based on `opts`.
131async fn restore_impl(
132    opts: RestoreOpts,
133    meta_store: Option<SqlMetaStore>,
134    backup_store: Option<MetaSnapshotStorageRef>,
135) -> BackupResult<()> {
136    if cfg!(not(test)) {
137        assert!(meta_store.is_none());
138        assert!(backup_store.is_none());
139    }
140    let meta_store = match meta_store {
141        None => get_meta_store(opts.clone()).await?,
142        Some(m) => m,
143    };
144    let backup_store = match backup_store {
145        None => get_backup_store(opts.clone()).await?,
146        Some(b) => b,
147    };
148    let target_id = opts.meta_snapshot_id;
149    let snapshot_list = &backup_store.manifest().snapshot_metadata;
150    let snapshot = match snapshot_list.iter().find(|m| m.id == target_id) {
151        None => {
152            return Err(BackupError::Other(anyhow::anyhow!(
153                "snapshot id {} not found",
154                target_id
155            )));
156        }
157        Some(s) => s,
158    };
159
160    if opts.validate_integrity {
161        tracing::info!("Start integrity validation.");
162        validate_integrity(
163            snapshot.objects.clone(),
164            &opts.hummock_storage_url,
165            &opts.hummock_storage_directory,
166        )
167        .await
168        .inspect_err(|_| tracing::error!("Fail integrity validation."))?;
169        tracing::info!("Succeed integrity validation.");
170    }
171
172    let format_version = snapshot.format_version;
173    if format_version < 2 {
174        unimplemented!("not supported: write model V1 to meta store V2");
175    } else {
176        dispatch(
177            target_id,
178            &opts,
179            LoaderV2::new(backup_store),
180            WriterModelV2ToMetaStoreV2::new(meta_store.to_owned()),
181        )
182        .await?;
183    }
184
185    Ok(())
186}
187
188async fn dispatch<L: Loader<S>, W: Writer<S>, S: Metadata>(
189    target_id: MetaSnapshotId,
190    opts: &RestoreOpts,
191    loader: L,
192    writer: W,
193) -> BackupResult<()> {
194    // Validate parameters.
195    if opts.overwrite_hummock_storage_endpoint
196        && (opts.overwrite_backup_storage_url.is_none()
197            || opts.overwrite_backup_storage_directory.is_none())
198    {
199        return Err(BackupError::Other(anyhow::anyhow!("overwrite_hummock_storage_endpoint, overwrite_backup_storage_url, overwrite_backup_storage_directory must be set simultaneously".to_owned())));
200    }
201
202    // Restore meta store.
203    let target_snapshot = loader.load(target_id).await?;
204    if opts.dry_run {
205        tracing::info!("Complete dry run.");
206        return Ok(());
207    }
208    let hummock_version = target_snapshot.metadata.hummock_version_ref().clone();
209    writer.write(target_snapshot).await?;
210    if opts.overwrite_hummock_storage_endpoint {
211        writer
212            .overwrite(
213                &format!("hummock+{}", opts.hummock_storage_url),
214                &opts.hummock_storage_directory,
215                opts.overwrite_backup_storage_url.as_ref().unwrap(),
216                opts.overwrite_backup_storage_directory.as_ref().unwrap(),
217            )
218            .await?;
219    }
220
221    // Restore object store.
222    restore_hummock_version(
223        &opts.hummock_storage_url,
224        &opts.hummock_storage_directory,
225        &hummock_version,
226    )
227    .await?;
228    Ok(())
229}
230
231pub async fn restore(opts: RestoreOpts) -> BackupResult<()> {
232    tracing::info!("restore with opts: {:#?}", opts);
233    let result = restore_impl(opts, None, None).await;
234    match &result {
235        Ok(_) => {
236            tracing::info!("command succeeded");
237        }
238        Err(e) => {
239            tracing::warn!(error = %e.as_report(), "command failed");
240        }
241    }
242    result
243}
244
245async fn validate_integrity(
246    mut object_ids: HashSet<HummockRawObjectId>,
247    hummock_storage_url: &str,
248    hummock_storage_directory: &str,
249) -> BackupResult<()> {
250    tracing::info!("expect {} objects", object_ids.len());
251    let object_store = Arc::new(
252        build_remote_object_store(
253            hummock_storage_url,
254            Arc::new(ObjectStoreMetrics::unused()),
255            "Version Checkpoint",
256            Arc::new(ObjectStoreConfig::default()),
257        )
258        .await,
259    );
260    let mut iter = object_store
261        .list(hummock_storage_directory, None, None)
262        .await?;
263    while let Some(obj) = iter.try_next().await? {
264        let Some(obj_id) = try_get_object_id_from_path(&obj.key) else {
265            continue;
266        };
267        if object_ids.remove(&obj_id.as_raw()) && object_ids.is_empty() {
268            break;
269        }
270    }
271    if object_ids.is_empty() {
272        return Ok(());
273    }
274    Err(BackupError::Other(anyhow!(
275        "referenced objects not found in object store: {:?}",
276        object_ids
277    )))
278}