risingwave_meta/backup_restore/restore_impl/
v2.rs1use std::cmp;
16
17use itertools::Itertools;
18use risingwave_backup::MetaSnapshotId;
19use risingwave_backup::error::{BackupError, BackupResult};
20use risingwave_backup::meta_snapshot::MetaSnapshot;
21use risingwave_backup::meta_snapshot_v2::{MetaSnapshotV2, MetadataV2};
22use risingwave_backup::storage::{MetaSnapshotStorage, MetaSnapshotStorageRef};
23use sea_orm::{DbErr, EntityTrait};
24
25use crate::backup_restore::restore_impl::{Loader, Writer};
26use crate::controller::SqlMetaStore;
27
28pub struct LoaderV2 {
29 backup_store: MetaSnapshotStorageRef,
30}
31
32impl LoaderV2 {
33 pub fn new(backup_store: MetaSnapshotStorageRef) -> Self {
34 Self { backup_store }
35 }
36}
37
38#[async_trait::async_trait]
39impl Loader<MetadataV2> for LoaderV2 {
40 async fn load(&self, target_id: MetaSnapshotId) -> BackupResult<MetaSnapshot<MetadataV2>> {
41 let snapshot_list = &self.backup_store.manifest().await.snapshot_metadata;
42 let mut target_snapshot: MetaSnapshotV2 = self.backup_store.get(target_id).await?;
43 tracing::debug!(
44 "snapshot {} before rewrite:\n{}",
45 target_id,
46 target_snapshot
47 );
48 let newest_id = snapshot_list
49 .iter()
50 .map(|m| m.id)
51 .max()
52 .expect("should exist");
53 assert!(
54 newest_id >= target_id,
55 "newest_id={}, target_id={}",
56 newest_id,
57 target_id
58 );
59
60 if newest_id > target_id {
62 let newest_snapshot: MetaSnapshotV2 = self.backup_store.get(newest_id).await?;
63 for seq in &target_snapshot.metadata.hummock_sequences {
64 let newest = newest_snapshot
65 .metadata
66 .hummock_sequences
67 .iter()
68 .find(|s| s.name == seq.name)
69 .unwrap_or_else(|| {
70 panic!(
71 "violate superset requirement. Hummock sequence name {}",
72 seq.name
73 )
74 });
75 assert!(newest.seq >= seq.seq, "violate monotonicity requirement");
76 }
77 target_snapshot.metadata.hummock_sequences = newest_snapshot.metadata.hummock_sequences;
78 tracing::info!(
79 "snapshot {} is rewritten by snapshot {}:\n",
80 target_id,
81 newest_id,
82 );
83 tracing::debug!("{target_snapshot}");
84 }
85 Ok(target_snapshot)
86 }
87}
88
89pub struct WriterModelV2ToMetaStoreV2 {
90 meta_store: SqlMetaStore,
91}
92
93impl WriterModelV2ToMetaStoreV2 {
94 pub fn new(meta_store: SqlMetaStore) -> Self {
95 Self { meta_store }
96 }
97}
98
99#[async_trait::async_trait]
100impl Writer<MetadataV2> for WriterModelV2ToMetaStoreV2 {
101 async fn write(&self, target_snapshot: MetaSnapshot<MetadataV2>) -> BackupResult<()> {
102 let metadata = target_snapshot.metadata;
103 let db = &self.meta_store.conn;
104 insert_models(metadata.seaql_migrations.clone(), db).await?;
105 insert_models(metadata.clusters.clone(), db).await?;
106 insert_models(metadata.version_stats.clone(), db).await?;
107 insert_models(metadata.compaction_configs.clone(), db).await?;
108 insert_models(metadata.hummock_sequences.clone(), db).await?;
109 insert_models(metadata.workers.clone(), db).await?;
110 insert_models(metadata.worker_properties.clone(), db).await?;
111 insert_models(metadata.users.clone(), db).await?;
112 use risingwave_meta_model::object::ObjectType;
114 insert_models(
115 metadata
116 .objects
117 .iter()
118 .sorted_by(|a, b| match (a.obj_type, b.obj_type) {
119 (ObjectType::Database, ObjectType::Database) => a.oid.cmp(&b.oid),
120 (ObjectType::Database, _) => cmp::Ordering::Less,
121 (_, ObjectType::Database) => cmp::Ordering::Greater,
122 (ObjectType::Schema, ObjectType::Schema) => a.oid.cmp(&b.oid),
123 (ObjectType::Schema, _) => cmp::Ordering::Less,
124 (_, ObjectType::Schema) => cmp::Ordering::Greater,
125 (_, _) => a.oid.cmp(&b.oid),
126 })
127 .cloned(),
128 db,
129 )
130 .await?;
131 insert_models(
132 metadata
133 .user_privileges
134 .iter()
135 .sorted_by_key(|u| u.id)
136 .cloned(),
137 db,
138 )
139 .await?;
140 insert_models(metadata.object_dependencies.clone(), db).await?;
141 insert_models(metadata.databases.clone(), db).await?;
142 insert_models(metadata.schemas.clone(), db).await?;
143 insert_models(metadata.streaming_jobs.clone(), db).await?;
144 insert_models(metadata.fragments.clone(), db).await?;
145 insert_models(metadata.fragment_relation.clone(), db).await?;
146 insert_models(metadata.connections.clone(), db).await?;
147 insert_models(metadata.sources.clone(), db).await?;
148 insert_models(metadata.tables.clone(), db).await?;
149 insert_models(metadata.sinks.clone(), db).await?;
150 insert_models(metadata.views.clone(), db).await?;
151 insert_models(metadata.indexes.clone(), db).await?;
152 insert_models(metadata.functions.clone(), db).await?;
153 insert_models(metadata.system_parameters.clone(), db).await?;
154 insert_models(metadata.catalog_versions.clone(), db).await?;
155 insert_models(metadata.subscriptions.clone(), db).await?;
156 insert_models(metadata.session_parameters.clone(), db).await?;
157 insert_models(metadata.secrets.clone(), db).await?;
158 insert_models(metadata.exactly_once_iceberg_sinks.clone(), db).await?;
159 insert_models(metadata.iceberg_tables.clone(), db).await?;
160 insert_models(metadata.iceberg_namespace_properties.clone(), db).await?;
161 insert_models(metadata.user_default_privilege.clone(), db).await?;
162 insert_models(metadata.fragment_splits.clone(), db).await?;
163 insert_models(metadata.pending_sink_state.clone(), db).await?;
164 insert_models(metadata.refresh_jobs.clone(), db).await?;
165 insert_models(metadata.cdc_table_snapshot_splits.clone(), db).await?;
166 insert_models(metadata.hummock_table_change_logs.clone(), db).await?;
167 update_auto_inc(&metadata, db).await?;
169 Ok(())
170 }
171
172 async fn overwrite(
173 &self,
174 new_storage_url: &str,
175 new_storage_dir: &str,
176 new_backup_url: &str,
177 new_backup_dir: &str,
178 ) -> BackupResult<()> {
179 let kvs = [
180 ("state_store", new_storage_url),
181 ("data_directory", new_storage_dir),
182 ("backup_storage_url", new_backup_url),
183 ("backup_storage_directory", new_backup_dir),
184 ];
185 for (k, v) in kvs {
186 let Some(model) = risingwave_meta_model::system_parameter::Entity::find_by_id(k)
187 .one(&self.meta_store.conn)
188 .await
189 .map_err(map_db_err)?
190 else {
191 return Err(BackupError::MetaStorage(
192 anyhow::anyhow!("{k} not found in system_parameter table").into(),
193 ));
194 };
195 let mut kv: risingwave_meta_model::system_parameter::ActiveModel = model.into();
196 kv.value = sea_orm::ActiveValue::Set(v.to_owned());
197 risingwave_meta_model::system_parameter::Entity::update(kv)
198 .exec(&self.meta_store.conn)
199 .await
200 .map_err(map_db_err)?;
201 }
202 Ok(())
203 }
204}
205
206fn map_db_err(e: DbErr) -> BackupError {
207 BackupError::MetaStorage(e.into())
208}
209
210#[macro_export]
211macro_rules! for_all_auto_increment {
212 ($metadata:ident, $db:ident, $macro:ident) => {
213 $macro! ($metadata, $db,
214 {"worker", workers, worker_id},
215 {"object", objects, oid},
216 {"user", users, user_id},
217 {"user_privilege", user_privileges, id},
218 {"fragment", fragments, fragment_id},
219 {"object_dependency", object_dependencies, id}
220 )
221 };
222}
223
224macro_rules! reset_sql_sequence {
225 ($metadata:ident, $db:ident, $( {$table:expr, $model:ident, $id_field:ident} ),*) => {
226 $(
227 match $db.get_database_backend() {
228 sea_orm::DbBackend::MySql => {
229 if let Some(v) = $metadata.$model.iter().map(|w| w.$id_field + 1).max() {
230 $db.execute(sea_orm::Statement::from_string(
231 sea_orm::DatabaseBackend::MySql,
232 format!("ALTER TABLE {} AUTO_INCREMENT = {};", $table, v),
233 ))
234 .await
235 .map_err(map_db_err)?;
236 }
237 }
238 sea_orm::DbBackend::Postgres => {
239 $db.execute(sea_orm::Statement::from_string(
240 sea_orm::DatabaseBackend::Postgres,
241 format!("SELECT setval('{}_{}_seq', (SELECT MAX({}) FROM \"{}\"));", $table, stringify!($id_field), stringify!($id_field), $table),
242 ))
243 .await
244 .map_err(map_db_err)?;
245 }
246 sea_orm::DbBackend::Sqlite => {}
247 }
248 )*
249 };
250}
251
252async fn update_auto_inc(
254 metadata: &MetadataV2,
255 db: &impl sea_orm::ConnectionTrait,
256) -> BackupResult<()> {
257 for_all_auto_increment!(metadata, db, reset_sql_sequence);
258 Ok(())
259}
260
261async fn insert_models<S, A>(
262 models: impl IntoIterator<Item = S>,
263 db: &impl sea_orm::ConnectionTrait,
264) -> BackupResult<()>
265where
266 S: sea_orm::ModelTrait + Sync + Send + Sized + sea_orm::IntoActiveModel<A>,
267 A: sea_orm::ActiveModelTrait + sea_orm::ActiveModelBehavior + Send + Sync + From<S>,
268 <<A as sea_orm::ActiveModelTrait>::Entity as sea_orm::EntityTrait>::Model:
269 sea_orm::IntoActiveModel<A>,
270{
271 use sea_orm::EntityTrait;
272 if <S as sea_orm::ModelTrait>::Entity::find()
273 .one(db)
274 .await
275 .map_err(map_db_err)?
276 .is_some()
277 {
278 return Err(BackupError::NonemptyMetaStorage);
279 }
280 for m in models {
281 m.into_active_model().insert(db).await.map_err(map_db_err)?;
282 }
283 Ok(())
284}