risingwave_meta/backup_restore/restore_impl/
v2.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp;
16
17use itertools::Itertools;
18use risingwave_backup::MetaSnapshotId;
19use risingwave_backup::error::{BackupError, BackupResult};
20use risingwave_backup::meta_snapshot::MetaSnapshot;
21use risingwave_backup::meta_snapshot_v2::{MetaSnapshotV2, MetadataV2};
22use risingwave_backup::storage::{MetaSnapshotStorage, MetaSnapshotStorageRef};
23use sea_orm::{DbErr, EntityTrait};
24
25use crate::backup_restore::restore_impl::{Loader, Writer};
26use crate::controller::SqlMetaStore;
27
28pub struct LoaderV2 {
29    backup_store: MetaSnapshotStorageRef,
30}
31
32impl LoaderV2 {
33    pub fn new(backup_store: MetaSnapshotStorageRef) -> Self {
34        Self { backup_store }
35    }
36}
37
38#[async_trait::async_trait]
39impl Loader<MetadataV2> for LoaderV2 {
40    async fn load(&self, target_id: MetaSnapshotId) -> BackupResult<MetaSnapshot<MetadataV2>> {
41        let snapshot_list = &self.backup_store.manifest().await.snapshot_metadata;
42        let mut target_snapshot: MetaSnapshotV2 = self.backup_store.get(target_id).await?;
43        tracing::debug!(
44            "snapshot {} before rewrite:\n{}",
45            target_id,
46            target_snapshot
47        );
48        let newest_id = snapshot_list
49            .iter()
50            .map(|m| m.id)
51            .max()
52            .expect("should exist");
53        assert!(
54            newest_id >= target_id,
55            "newest_id={}, target_id={}",
56            newest_id,
57            target_id
58        );
59
60        // validate and rewrite seq
61        if newest_id > target_id {
62            let newest_snapshot: MetaSnapshotV2 = self.backup_store.get(newest_id).await?;
63            for seq in &target_snapshot.metadata.hummock_sequences {
64                let newest = newest_snapshot
65                    .metadata
66                    .hummock_sequences
67                    .iter()
68                    .find(|s| s.name == seq.name)
69                    .unwrap_or_else(|| {
70                        panic!(
71                            "violate superset requirement. Hummock sequence name {}",
72                            seq.name
73                        )
74                    });
75                assert!(newest.seq >= seq.seq, "violate monotonicity requirement");
76            }
77            target_snapshot.metadata.hummock_sequences = newest_snapshot.metadata.hummock_sequences;
78            tracing::info!(
79                "snapshot {} is rewritten by snapshot {}:\n",
80                target_id,
81                newest_id,
82            );
83            tracing::debug!("{target_snapshot}");
84        }
85        Ok(target_snapshot)
86    }
87}
88
89pub struct WriterModelV2ToMetaStoreV2 {
90    meta_store: SqlMetaStore,
91}
92
93impl WriterModelV2ToMetaStoreV2 {
94    pub fn new(meta_store: SqlMetaStore) -> Self {
95        Self { meta_store }
96    }
97}
98
99#[async_trait::async_trait]
100impl Writer<MetadataV2> for WriterModelV2ToMetaStoreV2 {
101    async fn write(&self, target_snapshot: MetaSnapshot<MetadataV2>) -> BackupResult<()> {
102        let metadata = target_snapshot.metadata;
103        let db = &self.meta_store.conn;
104        ensure_all_meta_store_tables_are_empty(db).await?;
105        insert_models(metadata.seaql_migrations.clone(), db).await?;
106        insert_models(metadata.clusters.clone(), db).await?;
107        insert_models(metadata.version_stats.clone(), db).await?;
108        insert_models(metadata.compaction_configs.clone(), db).await?;
109        insert_models(metadata.hummock_sequences.clone(), db).await?;
110        insert_models(metadata.workers.clone(), db).await?;
111        insert_models(metadata.worker_properties.clone(), db).await?;
112        insert_models(metadata.users.clone(), db).await?;
113        // The sort is required to pass table's foreign key check.
114        use risingwave_meta_model::object::ObjectType;
115        insert_models(
116            metadata
117                .objects
118                .iter()
119                .sorted_by(|a, b| match (a.obj_type, b.obj_type) {
120                    (ObjectType::Database, ObjectType::Database) => a.oid.cmp(&b.oid),
121                    (ObjectType::Database, _) => cmp::Ordering::Less,
122                    (_, ObjectType::Database) => cmp::Ordering::Greater,
123                    (ObjectType::Schema, ObjectType::Schema) => a.oid.cmp(&b.oid),
124                    (ObjectType::Schema, _) => cmp::Ordering::Less,
125                    (_, ObjectType::Schema) => cmp::Ordering::Greater,
126                    (_, _) => a.oid.cmp(&b.oid),
127                })
128                .cloned(),
129            db,
130        )
131        .await?;
132        insert_models(
133            metadata
134                .user_privileges
135                .iter()
136                .sorted_by_key(|u| u.id)
137                .cloned(),
138            db,
139        )
140        .await?;
141        insert_models(metadata.object_dependencies.clone(), db).await?;
142        insert_models(metadata.databases.clone(), db).await?;
143        insert_models(metadata.schemas.clone(), db).await?;
144        insert_models(metadata.streaming_jobs.clone(), db).await?;
145        insert_models(metadata.fragments.clone(), db).await?;
146        insert_models(metadata.fragment_relation.clone(), db).await?;
147        insert_models(metadata.connections.clone(), db).await?;
148        insert_models(metadata.sources.clone(), db).await?;
149        insert_models(metadata.tables.clone(), db).await?;
150        insert_models(metadata.sinks.clone(), db).await?;
151        insert_models(metadata.views.clone(), db).await?;
152        insert_models(metadata.indexes.clone(), db).await?;
153        insert_models(metadata.functions.clone(), db).await?;
154        insert_models(metadata.system_parameters.clone(), db).await?;
155        insert_models(metadata.catalog_versions.clone(), db).await?;
156        insert_models(metadata.subscriptions.clone(), db).await?;
157        insert_models(metadata.session_parameters.clone(), db).await?;
158        insert_models(metadata.secrets.clone(), db).await?;
159        insert_models(metadata.exactly_once_iceberg_sinks.clone(), db).await?;
160        insert_models(metadata.iceberg_tables.clone(), db).await?;
161        insert_models(metadata.iceberg_namespace_properties.clone(), db).await?;
162        insert_models(metadata.user_default_privilege.clone(), db).await?;
163        insert_models(metadata.fragment_splits.clone(), db).await?;
164        insert_models(metadata.pending_sink_state.clone(), db).await?;
165        insert_models(metadata.refresh_jobs.clone(), db).await?;
166        insert_models(metadata.cdc_table_snapshot_splits.clone(), db).await?;
167        insert_models(metadata.hummock_table_change_logs.clone(), db).await?;
168        // update_auto_inc must be called last.
169        update_auto_inc(&metadata, db).await?;
170        Ok(())
171    }
172
173    async fn overwrite(
174        &self,
175        new_storage_url: &str,
176        new_storage_dir: &str,
177        new_backup_url: &str,
178        new_backup_dir: &str,
179    ) -> BackupResult<()> {
180        let kvs = [
181            ("state_store", new_storage_url),
182            ("data_directory", new_storage_dir),
183            ("backup_storage_url", new_backup_url),
184            ("backup_storage_directory", new_backup_dir),
185        ];
186        for (k, v) in kvs {
187            let Some(model) = risingwave_meta_model::system_parameter::Entity::find_by_id(k)
188                .one(&self.meta_store.conn)
189                .await
190                .map_err(map_db_err)?
191            else {
192                return Err(BackupError::MetaStorage(
193                    anyhow::anyhow!("{k} not found in system_parameter table").into(),
194                ));
195            };
196            let mut kv: risingwave_meta_model::system_parameter::ActiveModel = model.into();
197            kv.value = sea_orm::ActiveValue::Set(v.to_owned());
198            risingwave_meta_model::system_parameter::Entity::update(kv)
199                .exec(&self.meta_store.conn)
200                .await
201                .map_err(map_db_err)?;
202        }
203        Ok(())
204    }
205}
206
207async fn ensure_all_meta_store_tables_are_empty(
208    db: &impl sea_orm::ConnectionTrait,
209) -> BackupResult<()> {
210    macro_rules! ensure_entity_empty {
211        ($($entity_mod:ident),* $(,)?) => {
212            $(
213                if risingwave_meta_model::$entity_mod::Entity::find()
214                    .one(db)
215                    .await
216                    .map_err(map_db_err)?
217                    .is_some()
218                {
219                    return Err(BackupError::NonemptyMetaStorage);
220                }
221            )*
222        };
223    }
224
225    risingwave_meta_model::for_all_meta_model_entities!(ensure_entity_empty);
226    Ok(())
227}
228
229fn map_db_err(e: DbErr) -> BackupError {
230    BackupError::MetaStorage(e.into())
231}
232
233#[macro_export]
234macro_rules! for_all_auto_increment {
235    ($metadata:ident, $db:ident, $macro:ident) => {
236        $macro! ($metadata, $db,
237            {"worker", workers, worker_id},
238            {"object", objects, oid},
239            {"user", users, user_id},
240            {"user_privilege", user_privileges, id},
241            {"fragment", fragments, fragment_id},
242            {"object_dependency", object_dependencies, id}
243        )
244    };
245}
246
247macro_rules! reset_sql_sequence {
248    ($metadata:ident, $db:ident, $( {$table:expr, $model:ident, $id_field:ident} ),*) => {
249        $(
250        match $db.get_database_backend() {
251            sea_orm::DbBackend::MySql => {
252                if let Some(v) = $metadata.$model.iter().map(|w| w.$id_field + 1).max() {
253                    $db.execute(sea_orm::Statement::from_string(
254                        sea_orm::DatabaseBackend::MySql,
255                        format!("ALTER TABLE {} AUTO_INCREMENT = {};", $table, v),
256                    ))
257                    .await
258                    .map_err(map_db_err)?;
259                }
260            }
261            sea_orm::DbBackend::Postgres => {
262                $db.execute(sea_orm::Statement::from_string(
263                    sea_orm::DatabaseBackend::Postgres,
264                    format!("SELECT setval('{}_{}_seq', (SELECT MAX({}) FROM \"{}\"));", $table, stringify!($id_field), stringify!($id_field), $table),
265                ))
266                .await
267                .map_err(map_db_err)?;
268            }
269            sea_orm::DbBackend::Sqlite => {}
270            }
271        )*
272    };
273}
274
275/// Fixes `auto_increment` fields.
276async fn update_auto_inc(
277    metadata: &MetadataV2,
278    db: &impl sea_orm::ConnectionTrait,
279) -> BackupResult<()> {
280    for_all_auto_increment!(metadata, db, reset_sql_sequence);
281    Ok(())
282}
283
284async fn insert_models<S, A>(
285    models: impl IntoIterator<Item = S>,
286    db: &impl sea_orm::ConnectionTrait,
287) -> BackupResult<()>
288where
289    S: sea_orm::ModelTrait + Sync + Send + Sized + sea_orm::IntoActiveModel<A>,
290    A: sea_orm::ActiveModelTrait + sea_orm::ActiveModelBehavior + Send + Sync + From<S>,
291    <<A as sea_orm::ActiveModelTrait>::Entity as sea_orm::EntityTrait>::Model:
292        sea_orm::IntoActiveModel<A>,
293{
294    use sea_orm::EntityTrait;
295    if <S as sea_orm::ModelTrait>::Entity::find()
296        .one(db)
297        .await
298        .map_err(map_db_err)?
299        .is_some()
300    {
301        return Err(BackupError::NonemptyMetaStorage);
302    }
303    for m in models {
304        m.into_active_model().insert(db).await.map_err(map_db_err)?;
305    }
306    Ok(())
307}