risingwave_meta/backup_restore/
restore.rs1use std::collections::HashSet;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use futures::TryStreamExt;
20use risingwave_backup::MetaSnapshotId;
21use risingwave_backup::error::{BackupError, BackupResult};
22use risingwave_backup::meta_snapshot::Metadata;
23use risingwave_backup::storage::{MetaSnapshotStorage, MetaSnapshotStorageRef};
24use risingwave_common::config::{MetaBackend, ObjectStoreConfig};
25use risingwave_hummock_sdk::version::HummockVersion;
26use risingwave_hummock_sdk::{
27 HummockRawObjectId, try_get_object_id_from_path, version_checkpoint_path,
28};
29use risingwave_object_store::object::build_remote_object_store;
30use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
31use risingwave_pb::hummock::{PbHummockVersionCheckpoint, PbHummockVersionCheckpointEnvelope};
32use thiserror_ext::AsReport;
33
34use crate::backup_restore::restore_impl::v2::{LoaderV2, WriterModelV2ToMetaStoreV2};
35use crate::backup_restore::restore_impl::{Loader, Writer};
36use crate::backup_restore::utils::{get_backup_store, get_meta_store};
37use crate::controller::SqlMetaStore;
38use crate::hummock::compress_payload;
39
40#[derive(clap::Args, Debug, Clone)]
42pub struct RestoreOpts {
43 #[clap(long)]
46 pub meta_snapshot_id: u64,
47 #[clap(long, value_enum, default_value_t = MetaBackend::Mem)]
49 pub meta_store_type: MetaBackend,
50 #[clap(long, default_value_t = String::from(""))]
51 pub sql_endpoint: String,
52 #[clap(long, default_value = "")]
54 pub sql_username: String,
55 #[clap(long, default_value = "")]
57 pub sql_password: String,
58 #[clap(long, default_value = "")]
60 pub sql_database: String,
61 #[clap(long, required = false)]
64 pub sql_url_params: Option<String>,
65 #[clap(long)]
67 pub backup_storage_url: String,
68 #[clap(long, default_value_t = String::from("backup"))]
70 pub backup_storage_directory: String,
71 #[clap(long)]
73 pub hummock_storage_url: String,
74 #[clap(long, default_value_t = String::from("hummock_001"))]
76 pub hummock_storage_directory: String,
77 #[clap(long, default_value_t = false)]
79 pub dry_run: bool,
80 #[clap(long, default_value_t = 600000)]
82 pub read_attempt_timeout_ms: u64,
83 #[clap(long, default_value_t = 3)]
85 pub read_retry_attempts: u64,
86 #[clap(long, default_value_t = false)]
87 pub overwrite_hummock_storage_endpoint: bool,
91 #[clap(long, required = false)]
92 pub overwrite_backup_storage_url: Option<String>,
93 #[clap(long, required = false)]
94 pub overwrite_backup_storage_directory: Option<String>,
95 #[clap(long, default_value_t = false)]
97 pub validate_integrity: bool,
98}
99
100async fn restore_hummock_version(
101 hummock_storage_url: &str,
102 hummock_storage_directory: &str,
103 hummock_version: &HummockVersion,
104) -> BackupResult<()> {
105 let object_store = Arc::new(
106 build_remote_object_store(
107 hummock_storage_url,
108 Arc::new(ObjectStoreMetrics::unused()),
109 "Version Checkpoint",
110 Arc::new(ObjectStoreConfig::default()),
111 )
112 .await,
113 );
114 let checkpoint_path = version_checkpoint_path(hummock_storage_directory);
115 let checkpoint = PbHummockVersionCheckpoint {
116 version: Some(hummock_version.into()),
117 stale_objects: Default::default(),
119 };
120 use anyhow::Context;
121 use prost::Message;
122 use risingwave_common::config::CheckpointCompression;
123
124 let compression = CheckpointCompression::Zstd;
127 let raw_bytes = checkpoint.encode_to_vec();
128 let compressed = compress_payload(compression, &raw_bytes)
129 .context("zstd compression failed")
130 .map_err(BackupError::Other)?;
131 let checksum = crate::hummock::xxhash64_checksum(&compressed);
132 let envelope = PbHummockVersionCheckpointEnvelope {
133 compression_algorithm: compression as i32,
134 payload: compressed,
135 checksum: Some(checksum),
136 };
137 let buf = envelope.encode_to_vec();
138 object_store
139 .upload(&checkpoint_path, buf.into())
140 .await
141 .map_err(|e| BackupError::StateStorage(e.into()))?;
142 Ok(())
143}
144
145async fn restore_impl(
149 opts: RestoreOpts,
150 meta_store: Option<SqlMetaStore>,
151 backup_store: Option<MetaSnapshotStorageRef>,
152) -> BackupResult<()> {
153 if cfg!(not(test)) {
154 assert!(meta_store.is_none());
155 assert!(backup_store.is_none());
156 }
157 let meta_store = match meta_store {
158 None => get_meta_store(opts.clone()).await?,
159 Some(m) => m,
160 };
161 let backup_store = match backup_store {
162 None => get_backup_store(opts.clone()).await?,
163 Some(b) => b,
164 };
165 let target_id = opts.meta_snapshot_id;
166 let snapshot_list = &backup_store.manifest().await.snapshot_metadata;
167 let snapshot = match snapshot_list.iter().find(|m| m.id == target_id) {
168 None => {
169 return Err(BackupError::Other(anyhow::anyhow!(
170 "snapshot id {} not found",
171 target_id
172 )));
173 }
174 Some(s) => s,
175 };
176
177 if opts.validate_integrity {
178 tracing::info!("Start integrity validation.");
179 validate_integrity(
180 snapshot.objects.clone(),
181 &opts.hummock_storage_url,
182 &opts.hummock_storage_directory,
183 )
184 .await
185 .inspect_err(|_| tracing::error!("Fail integrity validation."))?;
186 tracing::info!("Succeed integrity validation.");
187 }
188
189 let format_version = snapshot.format_version;
190 if format_version < 2 {
191 unimplemented!("not supported: write model V1 to meta store V2");
192 } else {
193 dispatch(
194 target_id,
195 &opts,
196 LoaderV2::new(backup_store),
197 WriterModelV2ToMetaStoreV2::new(meta_store),
198 )
199 .await?;
200 }
201
202 Ok(())
203}
204
205async fn dispatch<L: Loader<S>, W: Writer<S>, S: Metadata>(
206 target_id: MetaSnapshotId,
207 opts: &RestoreOpts,
208 loader: L,
209 writer: W,
210) -> BackupResult<()> {
211 if opts.overwrite_hummock_storage_endpoint
213 && (opts.overwrite_backup_storage_url.is_none()
214 || opts.overwrite_backup_storage_directory.is_none())
215 {
216 return Err(BackupError::Other(anyhow::anyhow!("overwrite_hummock_storage_endpoint, overwrite_backup_storage_url, overwrite_backup_storage_directory must be set simultaneously".to_owned())));
217 }
218
219 let target_snapshot = loader.load(target_id).await?;
221 if !opts.overwrite_hummock_storage_endpoint {
222 let storage_url = target_snapshot.metadata.storage_url()?;
223 if storage_url != opts.hummock_storage_url {
224 return Err(BackupError::Other(anyhow::anyhow!(
225 "storage_url mismatch: {} {}",
226 storage_url,
227 opts.hummock_storage_url
228 )));
229 }
230 let storage_directory = target_snapshot.metadata.storage_directory()?;
231 if storage_directory != opts.hummock_storage_directory {
232 return Err(BackupError::Other(anyhow::anyhow!(
233 "storage_directory mismatch: {} {}",
234 storage_directory,
235 opts.hummock_storage_directory
236 )));
237 }
238 }
239
240 if opts.dry_run {
241 tracing::info!("Complete dry run.");
242 return Ok(());
243 }
244 let hummock_version = target_snapshot.metadata.hummock_version_ref().clone();
245 writer.write(target_snapshot).await?;
246 if opts.overwrite_hummock_storage_endpoint {
247 writer
248 .overwrite(
249 &format!("hummock+{}", opts.hummock_storage_url),
250 &opts.hummock_storage_directory,
251 opts.overwrite_backup_storage_url.as_ref().unwrap(),
252 opts.overwrite_backup_storage_directory.as_ref().unwrap(),
253 )
254 .await?;
255 }
256
257 restore_hummock_version(
259 &opts.hummock_storage_url,
260 &opts.hummock_storage_directory,
261 &hummock_version,
262 )
263 .await?;
264 Ok(())
265}
266
267pub async fn restore(opts: RestoreOpts) -> BackupResult<()> {
268 tracing::info!("restore with opts: {:#?}", opts);
269 let result = restore_impl(opts, None, None).await;
270 match &result {
271 Ok(_) => {
272 tracing::info!("command succeeded");
273 }
274 Err(e) => {
275 tracing::warn!(error = %e.as_report(), "command failed");
276 }
277 }
278 result
279}
280
281async fn validate_integrity(
282 mut object_ids: HashSet<HummockRawObjectId>,
283 hummock_storage_url: &str,
284 hummock_storage_directory: &str,
285) -> BackupResult<()> {
286 tracing::info!("expect {} objects", object_ids.len());
287 let object_store = Arc::new(
288 build_remote_object_store(
289 hummock_storage_url,
290 Arc::new(ObjectStoreMetrics::unused()),
291 "Version Checkpoint",
292 Arc::new(ObjectStoreConfig::default()),
293 )
294 .await,
295 );
296 let mut iter = object_store
297 .list(hummock_storage_directory, None, None)
298 .await?;
299 while let Some(obj) = iter.try_next().await? {
300 let Some(obj_id) = try_get_object_id_from_path(&obj.key) else {
301 continue;
302 };
303 if object_ids.remove(&obj_id.as_raw()) && object_ids.is_empty() {
304 break;
305 }
306 }
307 if object_ids.is_empty() {
308 return Ok(());
309 }
310 Err(BackupError::Other(anyhow!(
311 "referenced objects not found in object store: {:?}",
312 object_ids
313 )))
314}