risingwave_meta/backup_restore/
restore.rs1use std::collections::HashSet;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use futures::TryStreamExt;
20use risingwave_backup::MetaSnapshotId;
21use risingwave_backup::error::{BackupError, BackupResult};
22use risingwave_backup::meta_snapshot::Metadata;
23use risingwave_backup::storage::{MetaSnapshotStorage, MetaSnapshotStorageRef};
24use risingwave_common::config::{MetaBackend, ObjectStoreConfig};
25use risingwave_hummock_sdk::version::HummockVersion;
26use risingwave_hummock_sdk::{HummockSstableObjectId, OBJECT_SUFFIX, version_checkpoint_path};
27use risingwave_object_store::object::build_remote_object_store;
28use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
29use risingwave_pb::hummock::PbHummockVersionCheckpoint;
30use thiserror_ext::AsReport;
31
32use crate::backup_restore::restore_impl::v2::{LoaderV2, WriterModelV2ToMetaStoreV2};
33use crate::backup_restore::restore_impl::{Loader, Writer};
34use crate::backup_restore::utils::{get_backup_store, get_meta_store};
35use crate::controller::SqlMetaStore;
36
37#[derive(clap::Args, Debug, Clone)]
39pub struct RestoreOpts {
40 #[clap(long)]
43 pub meta_snapshot_id: u64,
44 #[clap(long, value_enum, default_value_t = MetaBackend::Mem)]
46 pub meta_store_type: MetaBackend,
47 #[clap(long, default_value_t = String::from(""))]
48 pub sql_endpoint: String,
49 #[clap(long, default_value = "")]
51 pub sql_username: String,
52 #[clap(long, default_value = "")]
54 pub sql_password: String,
55 #[clap(long, default_value = "")]
57 pub sql_database: String,
58 #[clap(long)]
60 pub backup_storage_url: String,
61 #[clap(long, default_value_t = String::from("backup"))]
63 pub backup_storage_directory: String,
64 #[clap(long)]
66 pub hummock_storage_url: String,
67 #[clap(long, default_value_t = String::from("hummock_001"))]
69 pub hummock_storage_directory: String,
70 #[clap(long, default_value_t = false)]
72 pub dry_run: bool,
73 #[clap(long, default_value_t = 600000)]
75 pub read_attempt_timeout_ms: u64,
76 #[clap(long, default_value_t = 3)]
78 pub read_retry_attempts: u64,
79 #[clap(long, default_value_t = false)]
80 pub overwrite_hummock_storage_endpoint: bool,
84 #[clap(long, required = false)]
85 pub overwrite_backup_storage_url: Option<String>,
86 #[clap(long, required = false)]
87 pub overwrite_backup_storage_directory: Option<String>,
88 #[clap(long, default_value_t = false)]
90 pub validate_integrity: bool,
91}
92
93async fn restore_hummock_version(
94 hummock_storage_url: &str,
95 hummock_storage_directory: &str,
96 hummock_version: &HummockVersion,
97) -> BackupResult<()> {
98 let object_store = Arc::new(
99 build_remote_object_store(
100 hummock_storage_url,
101 Arc::new(ObjectStoreMetrics::unused()),
102 "Version Checkpoint",
103 Arc::new(ObjectStoreConfig::default()),
104 )
105 .await,
106 );
107 let checkpoint_path = version_checkpoint_path(hummock_storage_directory);
108 let checkpoint = PbHummockVersionCheckpoint {
109 version: Some(hummock_version.into()),
110 stale_objects: Default::default(),
112 };
113 use prost::Message;
114 let buf = checkpoint.encode_to_vec();
115 object_store
116 .upload(&checkpoint_path, buf.into())
117 .await
118 .map_err(|e| BackupError::StateStorage(e.into()))?;
119 Ok(())
120}
121
122async fn restore_impl(
126 opts: RestoreOpts,
127 meta_store: Option<SqlMetaStore>,
128 backup_store: Option<MetaSnapshotStorageRef>,
129) -> BackupResult<()> {
130 if cfg!(not(test)) {
131 assert!(meta_store.is_none());
132 assert!(backup_store.is_none());
133 }
134 let meta_store = match meta_store {
135 None => get_meta_store(opts.clone()).await?,
136 Some(m) => m,
137 };
138 let backup_store = match backup_store {
139 None => get_backup_store(opts.clone()).await?,
140 Some(b) => b,
141 };
142 let target_id = opts.meta_snapshot_id;
143 let snapshot_list = &backup_store.manifest().snapshot_metadata;
144 let snapshot = match snapshot_list.iter().find(|m| m.id == target_id) {
145 None => {
146 return Err(BackupError::Other(anyhow::anyhow!(
147 "snapshot id {} not found",
148 target_id
149 )));
150 }
151 Some(s) => s,
152 };
153
154 if opts.validate_integrity {
155 tracing::info!("Start integrity validation.");
156 validate_integrity(
157 snapshot.ssts.clone(),
158 &opts.hummock_storage_url,
159 &opts.hummock_storage_directory,
160 )
161 .await
162 .inspect_err(|_| tracing::error!("Fail integrity validation."))?;
163 tracing::info!("Succeed integrity validation.");
164 }
165
166 let format_version = snapshot.format_version;
167 if format_version < 2 {
168 unimplemented!("not supported: write model V1 to meta store V2");
169 } else {
170 dispatch(
171 target_id,
172 &opts,
173 LoaderV2::new(backup_store),
174 WriterModelV2ToMetaStoreV2::new(meta_store.to_owned()),
175 )
176 .await?;
177 }
178
179 Ok(())
180}
181
182async fn dispatch<L: Loader<S>, W: Writer<S>, S: Metadata>(
183 target_id: MetaSnapshotId,
184 opts: &RestoreOpts,
185 loader: L,
186 writer: W,
187) -> BackupResult<()> {
188 if opts.overwrite_hummock_storage_endpoint
190 && (opts.overwrite_backup_storage_url.is_none()
191 || opts.overwrite_backup_storage_directory.is_none())
192 {
193 return Err(BackupError::Other(anyhow::anyhow!("overwrite_hummock_storage_endpoint, overwrite_backup_storage_url, overwrite_backup_storage_directory must be set simultaneously".to_owned())));
194 }
195
196 let target_snapshot = loader.load(target_id).await?;
198 if opts.dry_run {
199 tracing::info!("Complete dry run.");
200 return Ok(());
201 }
202 let hummock_version = target_snapshot.metadata.hummock_version_ref().clone();
203 writer.write(target_snapshot).await?;
204 if opts.overwrite_hummock_storage_endpoint {
205 writer
206 .overwrite(
207 &format!("hummock+{}", opts.hummock_storage_url),
208 &opts.hummock_storage_directory,
209 opts.overwrite_backup_storage_url.as_ref().unwrap(),
210 opts.overwrite_backup_storage_directory.as_ref().unwrap(),
211 )
212 .await?;
213 }
214
215 restore_hummock_version(
217 &opts.hummock_storage_url,
218 &opts.hummock_storage_directory,
219 &hummock_version,
220 )
221 .await?;
222 Ok(())
223}
224
225pub async fn restore(opts: RestoreOpts) -> BackupResult<()> {
226 tracing::info!("restore with opts: {:#?}", opts);
227 let result = restore_impl(opts, None, None).await;
228 match &result {
229 Ok(_) => {
230 tracing::info!("command succeeded");
231 }
232 Err(e) => {
233 tracing::warn!(error = %e.as_report(), "command failed");
234 }
235 }
236 result
237}
238
239async fn validate_integrity(
240 mut object_ids: HashSet<HummockSstableObjectId>,
241 hummock_storage_url: &str,
242 hummock_storage_directory: &str,
243) -> BackupResult<()> {
244 fn try_get_object_id_from_path(path: &str) -> Option<HummockSstableObjectId> {
245 let split: Vec<_> = path.split(&['/', '.']).collect();
246 if split.len() <= 2 {
247 return None;
248 }
249 if split[split.len() - 1] != OBJECT_SUFFIX {
250 return None;
251 }
252 let id = split[split.len() - 2]
253 .parse::<HummockSstableObjectId>()
254 .unwrap_or_else(|_| panic!("expect valid sst id, got {}", split[split.len() - 2]));
255 Some(id)
256 }
257 tracing::info!("expect {} objects", object_ids.len());
258 let object_store = Arc::new(
259 build_remote_object_store(
260 hummock_storage_url,
261 Arc::new(ObjectStoreMetrics::unused()),
262 "Version Checkpoint",
263 Arc::new(ObjectStoreConfig::default()),
264 )
265 .await,
266 );
267 let mut iter = object_store
268 .list(hummock_storage_directory, None, None)
269 .await?;
270 while let Some(obj) = iter.try_next().await? {
271 let Some(obj_id) = try_get_object_id_from_path(&obj.key) else {
272 continue;
273 };
274 if object_ids.remove(&obj_id) && object_ids.is_empty() {
275 break;
276 }
277 }
278 if object_ids.is_empty() {
279 return Ok(());
280 }
281 Err(BackupError::Other(anyhow!(
282 "referenced objects not found in object store: {:?}",
283 object_ids
284 )))
285}