risingwave_meta/backup_restore/
restore.rs1use std::collections::HashSet;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use futures::TryStreamExt;
20use risingwave_backup::MetaSnapshotId;
21use risingwave_backup::error::{BackupError, BackupResult};
22use risingwave_backup::meta_snapshot::Metadata;
23use risingwave_backup::storage::{MetaSnapshotStorage, MetaSnapshotStorageRef};
24use risingwave_common::config::{MetaBackend, ObjectStoreConfig};
25use risingwave_hummock_sdk::version::HummockVersion;
26use risingwave_hummock_sdk::{
27 HummockRawObjectId, try_get_object_id_from_path, version_checkpoint_path,
28};
29use risingwave_object_store::object::build_remote_object_store;
30use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
31use risingwave_pb::hummock::PbHummockVersionCheckpoint;
32use thiserror_ext::AsReport;
33
34use crate::backup_restore::restore_impl::v2::{LoaderV2, WriterModelV2ToMetaStoreV2};
35use crate::backup_restore::restore_impl::{Loader, Writer};
36use crate::backup_restore::utils::{get_backup_store, get_meta_store};
37use crate::controller::SqlMetaStore;
38
39#[derive(clap::Args, Debug, Clone)]
41pub struct RestoreOpts {
42 #[clap(long)]
45 pub meta_snapshot_id: u64,
46 #[clap(long, value_enum, default_value_t = MetaBackend::Mem)]
48 pub meta_store_type: MetaBackend,
49 #[clap(long, default_value_t = String::from(""))]
50 pub sql_endpoint: String,
51 #[clap(long, default_value = "")]
53 pub sql_username: String,
54 #[clap(long, default_value = "")]
56 pub sql_password: String,
57 #[clap(long, default_value = "")]
59 pub sql_database: String,
60 #[clap(long, required = false)]
63 pub sql_url_params: Option<String>,
64 #[clap(long)]
66 pub backup_storage_url: String,
67 #[clap(long, default_value_t = String::from("backup"))]
69 pub backup_storage_directory: String,
70 #[clap(long)]
72 pub hummock_storage_url: String,
73 #[clap(long, default_value_t = String::from("hummock_001"))]
75 pub hummock_storage_directory: String,
76 #[clap(long, default_value_t = false)]
78 pub dry_run: bool,
79 #[clap(long, default_value_t = 600000)]
81 pub read_attempt_timeout_ms: u64,
82 #[clap(long, default_value_t = 3)]
84 pub read_retry_attempts: u64,
85 #[clap(long, default_value_t = false)]
86 pub overwrite_hummock_storage_endpoint: bool,
90 #[clap(long, required = false)]
91 pub overwrite_backup_storage_url: Option<String>,
92 #[clap(long, required = false)]
93 pub overwrite_backup_storage_directory: Option<String>,
94 #[clap(long, default_value_t = false)]
96 pub validate_integrity: bool,
97}
98
99async fn restore_hummock_version(
100 hummock_storage_url: &str,
101 hummock_storage_directory: &str,
102 hummock_version: &HummockVersion,
103) -> BackupResult<()> {
104 let object_store = Arc::new(
105 build_remote_object_store(
106 hummock_storage_url,
107 Arc::new(ObjectStoreMetrics::unused()),
108 "Version Checkpoint",
109 Arc::new(ObjectStoreConfig::default()),
110 )
111 .await,
112 );
113 let checkpoint_path = version_checkpoint_path(hummock_storage_directory);
114 let checkpoint = PbHummockVersionCheckpoint {
115 version: Some(hummock_version.into()),
116 stale_objects: Default::default(),
118 };
119 use prost::Message;
120 let buf = checkpoint.encode_to_vec();
121 object_store
122 .upload(&checkpoint_path, buf.into())
123 .await
124 .map_err(|e| BackupError::StateStorage(e.into()))?;
125 Ok(())
126}
127
128async fn restore_impl(
132 opts: RestoreOpts,
133 meta_store: Option<SqlMetaStore>,
134 backup_store: Option<MetaSnapshotStorageRef>,
135) -> BackupResult<()> {
136 if cfg!(not(test)) {
137 assert!(meta_store.is_none());
138 assert!(backup_store.is_none());
139 }
140 let meta_store = match meta_store {
141 None => get_meta_store(opts.clone()).await?,
142 Some(m) => m,
143 };
144 let backup_store = match backup_store {
145 None => get_backup_store(opts.clone()).await?,
146 Some(b) => b,
147 };
148 let target_id = opts.meta_snapshot_id;
149 let snapshot_list = &backup_store.manifest().snapshot_metadata;
150 let snapshot = match snapshot_list.iter().find(|m| m.id == target_id) {
151 None => {
152 return Err(BackupError::Other(anyhow::anyhow!(
153 "snapshot id {} not found",
154 target_id
155 )));
156 }
157 Some(s) => s,
158 };
159
160 if opts.validate_integrity {
161 tracing::info!("Start integrity validation.");
162 validate_integrity(
163 snapshot.objects.clone(),
164 &opts.hummock_storage_url,
165 &opts.hummock_storage_directory,
166 )
167 .await
168 .inspect_err(|_| tracing::error!("Fail integrity validation."))?;
169 tracing::info!("Succeed integrity validation.");
170 }
171
172 let format_version = snapshot.format_version;
173 if format_version < 2 {
174 unimplemented!("not supported: write model V1 to meta store V2");
175 } else {
176 dispatch(
177 target_id,
178 &opts,
179 LoaderV2::new(backup_store),
180 WriterModelV2ToMetaStoreV2::new(meta_store.to_owned()),
181 )
182 .await?;
183 }
184
185 Ok(())
186}
187
188async fn dispatch<L: Loader<S>, W: Writer<S>, S: Metadata>(
189 target_id: MetaSnapshotId,
190 opts: &RestoreOpts,
191 loader: L,
192 writer: W,
193) -> BackupResult<()> {
194 if opts.overwrite_hummock_storage_endpoint
196 && (opts.overwrite_backup_storage_url.is_none()
197 || opts.overwrite_backup_storage_directory.is_none())
198 {
199 return Err(BackupError::Other(anyhow::anyhow!("overwrite_hummock_storage_endpoint, overwrite_backup_storage_url, overwrite_backup_storage_directory must be set simultaneously".to_owned())));
200 }
201
202 let target_snapshot = loader.load(target_id).await?;
204 if opts.dry_run {
205 tracing::info!("Complete dry run.");
206 return Ok(());
207 }
208 let hummock_version = target_snapshot.metadata.hummock_version_ref().clone();
209 writer.write(target_snapshot).await?;
210 if opts.overwrite_hummock_storage_endpoint {
211 writer
212 .overwrite(
213 &format!("hummock+{}", opts.hummock_storage_url),
214 &opts.hummock_storage_directory,
215 opts.overwrite_backup_storage_url.as_ref().unwrap(),
216 opts.overwrite_backup_storage_directory.as_ref().unwrap(),
217 )
218 .await?;
219 }
220
221 restore_hummock_version(
223 &opts.hummock_storage_url,
224 &opts.hummock_storage_directory,
225 &hummock_version,
226 )
227 .await?;
228 Ok(())
229}
230
231pub async fn restore(opts: RestoreOpts) -> BackupResult<()> {
232 tracing::info!("restore with opts: {:#?}", opts);
233 let result = restore_impl(opts, None, None).await;
234 match &result {
235 Ok(_) => {
236 tracing::info!("command succeeded");
237 }
238 Err(e) => {
239 tracing::warn!(error = %e.as_report(), "command failed");
240 }
241 }
242 result
243}
244
245async fn validate_integrity(
246 mut object_ids: HashSet<HummockRawObjectId>,
247 hummock_storage_url: &str,
248 hummock_storage_directory: &str,
249) -> BackupResult<()> {
250 tracing::info!("expect {} objects", object_ids.len());
251 let object_store = Arc::new(
252 build_remote_object_store(
253 hummock_storage_url,
254 Arc::new(ObjectStoreMetrics::unused()),
255 "Version Checkpoint",
256 Arc::new(ObjectStoreConfig::default()),
257 )
258 .await,
259 );
260 let mut iter = object_store
261 .list(hummock_storage_directory, None, None)
262 .await?;
263 while let Some(obj) = iter.try_next().await? {
264 let Some(obj_id) = try_get_object_id_from_path(&obj.key) else {
265 continue;
266 };
267 if object_ids.remove(&obj_id.as_raw()) && object_ids.is_empty() {
268 break;
269 }
270 }
271 if object_ids.is_empty() {
272 return Ok(());
273 }
274 Err(BackupError::Other(anyhow!(
275 "referenced objects not found in object store: {:?}",
276 object_ids
277 )))
278}