risingwave_meta/backup_restore/
utils.rs
1use std::sync::Arc;
16
17use risingwave_backup::error::{BackupError, BackupResult};
18use risingwave_backup::storage::{MetaSnapshotStorageRef, ObjectStoreMetaSnapshotStorage};
19use risingwave_common::config::{MetaBackend, MetaStoreConfig, ObjectStoreConfig};
20use risingwave_object_store::object::build_remote_object_store;
21use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
22
23use crate::MetaStoreBackend;
24use crate::backup_restore::RestoreOpts;
25use crate::controller::SqlMetaStore;
26
27pub async fn get_meta_store(opts: RestoreOpts) -> BackupResult<SqlMetaStore> {
29 let meta_store_backend = match opts.meta_store_type {
30 MetaBackend::Mem => MetaStoreBackend::Mem,
31 MetaBackend::Sql => MetaStoreBackend::Sql {
32 endpoint: opts.sql_endpoint,
33 config: MetaStoreConfig::default(),
34 },
35 MetaBackend::Sqlite => MetaStoreBackend::Sql {
36 endpoint: format!("sqlite://{}?mode=rwc", opts.sql_endpoint),
37 config: MetaStoreConfig::default(),
38 },
39 MetaBackend::Postgres => MetaStoreBackend::Sql {
40 endpoint: format!(
41 "postgres://{}:{}@{}/{}",
42 opts.sql_username, opts.sql_password, opts.sql_endpoint, opts.sql_database
43 ),
44 config: MetaStoreConfig::default(),
45 },
46 MetaBackend::Mysql => MetaStoreBackend::Sql {
47 endpoint: format!(
48 "mysql://{}:{}@{}/{}",
49 opts.sql_username, opts.sql_password, opts.sql_endpoint, opts.sql_database
50 ),
51 config: MetaStoreConfig::default(),
52 },
53 };
54
55 SqlMetaStore::connect(meta_store_backend)
56 .await
57 .map_err(|e| BackupError::MetaStorage(e.into()))
58}
59
60pub async fn get_backup_store(opts: RestoreOpts) -> BackupResult<MetaSnapshotStorageRef> {
61 let mut config = ObjectStoreConfig::default();
62 config.retry.read_attempt_timeout_ms = opts.read_attempt_timeout_ms;
63 config.retry.read_retry_attempts = opts.read_retry_attempts as usize;
64
65 let object_store = build_remote_object_store(
66 &opts.backup_storage_url,
67 Arc::new(ObjectStoreMetrics::unused()),
68 "Meta Backup",
69 Arc::new(config),
70 )
71 .await;
72 let backup_store =
73 ObjectStoreMetaSnapshotStorage::new(&opts.backup_storage_directory, Arc::new(object_store))
74 .await?;
75 Ok(Arc::new(backup_store))
76}