risingwave_meta/backup_restore/
restore.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use futures::TryStreamExt;
20use risingwave_backup::MetaSnapshotId;
21use risingwave_backup::error::{BackupError, BackupResult};
22use risingwave_backup::meta_snapshot::Metadata;
23use risingwave_backup::storage::{MetaSnapshotStorage, MetaSnapshotStorageRef};
24use risingwave_common::config::{MetaBackend, ObjectStoreConfig};
25use risingwave_hummock_sdk::version::HummockVersion;
26use risingwave_hummock_sdk::{
27    HummockRawObjectId, try_get_object_id_from_path, version_checkpoint_path,
28};
29use risingwave_object_store::object::build_remote_object_store;
30use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
31use risingwave_pb::hummock::{PbHummockVersionCheckpoint, PbHummockVersionCheckpointEnvelope};
32use thiserror_ext::AsReport;
33
34use crate::backup_restore::restore_impl::v2::{LoaderV2, WriterModelV2ToMetaStoreV2};
35use crate::backup_restore::restore_impl::{Loader, Writer};
36use crate::backup_restore::utils::{get_backup_store, get_meta_store};
37use crate::controller::SqlMetaStore;
38use crate::hummock::compress_payload;
39
40/// Command-line arguments for restore.
41#[derive(clap::Args, Debug, Clone)]
42pub struct RestoreOpts {
43    /// Id of snapshot used to restore. Available snapshots can be found in
44    /// <`storage_directory>/manifest.json`.
45    #[clap(long)]
46    pub meta_snapshot_id: u64,
47    /// Type of meta store to restore.
48    #[clap(long, value_enum, default_value_t = MetaBackend::Mem)]
49    pub meta_store_type: MetaBackend,
50    #[clap(long, default_value_t = String::from(""))]
51    pub sql_endpoint: String,
52    /// Username of sql backend, required when meta backend set to MySQL or PostgreSQL.
53    #[clap(long, default_value = "")]
54    pub sql_username: String,
55    /// Password of sql backend, required when meta backend set to MySQL or PostgreSQL.
56    #[clap(long, default_value = "")]
57    pub sql_password: String,
58    /// Database of sql backend, required when meta backend set to MySQL or PostgreSQL.
59    #[clap(long, default_value = "")]
60    pub sql_database: String,
61    /// Params for the URL connection, such as `sslmode=disable`.
62    /// Example: `param1=value1&param2=value2`
63    #[clap(long, required = false)]
64    pub sql_url_params: Option<String>,
65    /// Url of storage to fetch meta snapshot from.
66    #[clap(long)]
67    pub backup_storage_url: String,
68    /// Directory of storage to fetch meta snapshot from.
69    #[clap(long, default_value_t = String::from("backup"))]
70    pub backup_storage_directory: String,
71    /// Url of storage to restore hummock version to.
72    #[clap(long)]
73    pub hummock_storage_url: String,
74    /// Directory of storage to restore hummock version to.
75    #[clap(long, default_value_t = String::from("hummock_001"))]
76    pub hummock_storage_directory: String,
77    /// Print the target snapshot, but won't restore to meta store.
78    #[clap(long, default_value_t = false)]
79    pub dry_run: bool,
80    /// The read timeout for object store
81    #[clap(long, default_value_t = 600000)]
82    pub read_attempt_timeout_ms: u64,
83    /// The maximum number of read retry attempts for the object store.
84    #[clap(long, default_value_t = 3)]
85    pub read_retry_attempts: u64,
86    #[clap(long, default_value_t = false)]
87    /// When enabled, some system parameters of in the restored meta store will be overwritten.
88    /// Specifically, system parameters `state_store`, `data_directory`, `backup_storage_url` and `backup_storage_directory` will be overwritten
89    /// with the specified opts `hummock_storage_url`, `hummock_storage_directory`, `overwrite_backup_storage_url` and `overwrite_backup_storage_directory`.
90    pub overwrite_hummock_storage_endpoint: bool,
91    #[clap(long, required = false)]
92    pub overwrite_backup_storage_url: Option<String>,
93    #[clap(long, required = false)]
94    pub overwrite_backup_storage_directory: Option<String>,
95    /// Verify that all referenced objects exist in object store.
96    #[clap(long, default_value_t = false)]
97    pub validate_integrity: bool,
98}
99
100async fn restore_hummock_version(
101    hummock_storage_url: &str,
102    hummock_storage_directory: &str,
103    hummock_version: &HummockVersion,
104) -> BackupResult<()> {
105    let object_store = Arc::new(
106        build_remote_object_store(
107            hummock_storage_url,
108            Arc::new(ObjectStoreMetrics::unused()),
109            "Version Checkpoint",
110            Arc::new(ObjectStoreConfig::default()),
111        )
112        .await,
113    );
114    let checkpoint_path = version_checkpoint_path(hummock_storage_directory);
115    let checkpoint = PbHummockVersionCheckpoint {
116        version: Some(hummock_version.into()),
117        // Ignore stale objects. Full GC will clear them.
118        stale_objects: Default::default(),
119    };
120    use anyhow::Context;
121    use prost::Message;
122    use risingwave_common::config::CheckpointCompression;
123
124    // Use zstd compression by default. The next checkpoint written by the cluster
125    // will use the configured compression algorithm from the config file.
126    let compression = CheckpointCompression::Zstd;
127    let raw_bytes = checkpoint.encode_to_vec();
128    let compressed = compress_payload(compression, &raw_bytes)
129        .context("zstd compression failed")
130        .map_err(BackupError::Other)?;
131    let checksum = crate::hummock::xxhash64_checksum(&compressed);
132    let envelope = PbHummockVersionCheckpointEnvelope {
133        compression_algorithm: compression as i32,
134        payload: compressed,
135        checksum: Some(checksum),
136    };
137    let buf = envelope.encode_to_vec();
138    object_store
139        .upload(&checkpoint_path, buf.into())
140        .await
141        .map_err(|e| BackupError::StateStorage(e.into()))?;
142    Ok(())
143}
144
145/// Restores a meta store.
146/// Uses `meta_store` and `backup_store` if provided.
147/// Otherwise creates them based on `opts`.
148async fn restore_impl(
149    opts: RestoreOpts,
150    meta_store: Option<SqlMetaStore>,
151    backup_store: Option<MetaSnapshotStorageRef>,
152) -> BackupResult<()> {
153    if cfg!(not(test)) {
154        assert!(meta_store.is_none());
155        assert!(backup_store.is_none());
156    }
157    let meta_store = match meta_store {
158        None => get_meta_store(opts.clone()).await?,
159        Some(m) => m,
160    };
161    let backup_store = match backup_store {
162        None => get_backup_store(opts.clone()).await?,
163        Some(b) => b,
164    };
165    let target_id = opts.meta_snapshot_id;
166    let snapshot_list = &backup_store.manifest().await.snapshot_metadata;
167    let snapshot = match snapshot_list.iter().find(|m| m.id == target_id) {
168        None => {
169            return Err(BackupError::Other(anyhow::anyhow!(
170                "snapshot id {} not found",
171                target_id
172            )));
173        }
174        Some(s) => s,
175    };
176
177    if opts.validate_integrity {
178        tracing::info!("Start integrity validation.");
179        validate_integrity(
180            snapshot.objects.clone(),
181            &opts.hummock_storage_url,
182            &opts.hummock_storage_directory,
183        )
184        .await
185        .inspect_err(|_| tracing::error!("Fail integrity validation."))?;
186        tracing::info!("Succeed integrity validation.");
187    }
188
189    let format_version = snapshot.format_version;
190    if format_version < 2 {
191        unimplemented!("not supported: write model V1 to meta store V2");
192    } else {
193        dispatch(
194            target_id,
195            &opts,
196            LoaderV2::new(backup_store),
197            WriterModelV2ToMetaStoreV2::new(meta_store),
198        )
199        .await?;
200    }
201
202    Ok(())
203}
204
205async fn dispatch<L: Loader<S>, W: Writer<S>, S: Metadata>(
206    target_id: MetaSnapshotId,
207    opts: &RestoreOpts,
208    loader: L,
209    writer: W,
210) -> BackupResult<()> {
211    // Validate parameters.
212    if opts.overwrite_hummock_storage_endpoint
213        && (opts.overwrite_backup_storage_url.is_none()
214            || opts.overwrite_backup_storage_directory.is_none())
215    {
216        return Err(BackupError::Other(anyhow::anyhow!("overwrite_hummock_storage_endpoint, overwrite_backup_storage_url, overwrite_backup_storage_directory must be set simultaneously".to_owned())));
217    }
218
219    // Restore meta store.
220    let target_snapshot = loader.load(target_id).await?;
221    if !opts.overwrite_hummock_storage_endpoint {
222        let storage_url = target_snapshot.metadata.storage_url()?;
223        if storage_url != opts.hummock_storage_url {
224            return Err(BackupError::Other(anyhow::anyhow!(
225                "storage_url mismatch: {} {}",
226                storage_url,
227                opts.hummock_storage_url
228            )));
229        }
230        let storage_directory = target_snapshot.metadata.storage_directory()?;
231        if storage_directory != opts.hummock_storage_directory {
232            return Err(BackupError::Other(anyhow::anyhow!(
233                "storage_directory mismatch: {} {}",
234                storage_directory,
235                opts.hummock_storage_directory
236            )));
237        }
238    }
239
240    if opts.dry_run {
241        tracing::info!("Complete dry run.");
242        return Ok(());
243    }
244    let hummock_version = target_snapshot.metadata.hummock_version_ref().clone();
245    writer.write(target_snapshot).await?;
246    if opts.overwrite_hummock_storage_endpoint {
247        writer
248            .overwrite(
249                &format!("hummock+{}", opts.hummock_storage_url),
250                &opts.hummock_storage_directory,
251                opts.overwrite_backup_storage_url.as_ref().unwrap(),
252                opts.overwrite_backup_storage_directory.as_ref().unwrap(),
253            )
254            .await?;
255    }
256
257    // Restore object store.
258    restore_hummock_version(
259        &opts.hummock_storage_url,
260        &opts.hummock_storage_directory,
261        &hummock_version,
262    )
263    .await?;
264    Ok(())
265}
266
267pub async fn restore(opts: RestoreOpts) -> BackupResult<()> {
268    tracing::info!("restore with opts: {:#?}", opts);
269    let result = restore_impl(opts, None, None).await;
270    match &result {
271        Ok(_) => {
272            tracing::info!("command succeeded");
273        }
274        Err(e) => {
275            tracing::warn!(error = %e.as_report(), "command failed");
276        }
277    }
278    result
279}
280
281async fn validate_integrity(
282    mut object_ids: HashSet<HummockRawObjectId>,
283    hummock_storage_url: &str,
284    hummock_storage_directory: &str,
285) -> BackupResult<()> {
286    tracing::info!("expect {} objects", object_ids.len());
287    let object_store = Arc::new(
288        build_remote_object_store(
289            hummock_storage_url,
290            Arc::new(ObjectStoreMetrics::unused()),
291            "Version Checkpoint",
292            Arc::new(ObjectStoreConfig::default()),
293        )
294        .await,
295    );
296    let mut iter = object_store
297        .list(hummock_storage_directory, None, None)
298        .await?;
299    while let Some(obj) = iter.try_next().await? {
300        let Some(obj_id) = try_get_object_id_from_path(&obj.key) else {
301            continue;
302        };
303        if object_ids.remove(&obj_id.as_raw()) && object_ids.is_empty() {
304            break;
305        }
306    }
307    if object_ids.is_empty() {
308        return Ok(());
309    }
310    Err(BackupError::Other(anyhow!(
311        "referenced objects not found in object store: {:?}",
312        object_ids
313    )))
314}