risingwave_meta/backup_restore/
utils.rs1use std::sync::Arc;
16
17use risingwave_backup::error::{BackupError, BackupResult};
18use risingwave_backup::storage::{MetaSnapshotStorageRef, ObjectStoreMetaSnapshotStorage};
19use risingwave_common::config::{MetaBackend, MetaStoreConfig, ObjectStoreConfig};
20use risingwave_object_store::object::build_remote_object_store;
21use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
22
23use crate::MetaStoreBackend;
24use crate::backup_restore::RestoreOpts;
25use crate::controller::SqlMetaStore;
26
27pub async fn get_meta_store(opts: RestoreOpts) -> BackupResult<SqlMetaStore> {
29 let meta_store_backend = match opts.meta_store_type {
30 MetaBackend::Mem => MetaStoreBackend::Mem,
31 MetaBackend::Sql => MetaStoreBackend::Sql {
32 endpoint: opts.sql_endpoint,
33 config: MetaStoreConfig::default(),
34 },
35 MetaBackend::Sqlite => MetaStoreBackend::Sql {
36 endpoint: format!("sqlite://{}?mode=rwc", opts.sql_endpoint),
37 config: MetaStoreConfig::default(),
38 },
39 MetaBackend::Postgres => MetaStoreBackend::Sql {
40 endpoint: format!(
41 "postgres://{}:{}@{}/{}{}",
42 opts.sql_username,
43 opts.sql_password,
44 opts.sql_endpoint,
45 opts.sql_database,
46 if let Some(params) = &opts.sql_url_params
47 && !params.is_empty()
48 {
49 format!("?{}", params)
50 } else {
51 "".to_owned()
52 }
53 ),
54 config: MetaStoreConfig::default(),
55 },
56 MetaBackend::Mysql => MetaStoreBackend::Sql {
57 endpoint: format!(
58 "mysql://{}:{}@{}/{}{}",
59 opts.sql_username,
60 opts.sql_password,
61 opts.sql_endpoint,
62 opts.sql_database,
63 if let Some(params) = &opts.sql_url_params
64 && !params.is_empty()
65 {
66 format!("?{}", params)
67 } else {
68 "".to_owned()
69 }
70 ),
71 config: MetaStoreConfig::default(),
72 },
73 };
74
75 SqlMetaStore::connect(meta_store_backend)
76 .await
77 .map_err(|e| BackupError::MetaStorage(e.into()))
78}
79
80pub async fn get_backup_store(opts: RestoreOpts) -> BackupResult<MetaSnapshotStorageRef> {
81 let mut config = ObjectStoreConfig::default();
82 config.retry.read_attempt_timeout_ms = opts.read_attempt_timeout_ms;
83 config.retry.read_retry_attempts = opts.read_retry_attempts as usize;
84
85 let object_store = build_remote_object_store(
86 &opts.backup_storage_url,
87 Arc::new(ObjectStoreMetrics::unused()),
88 "Meta Backup",
89 Arc::new(config),
90 )
91 .await;
92 let backup_store =
93 ObjectStoreMetaSnapshotStorage::new(&opts.backup_storage_directory, Arc::new(object_store))
94 .await?;
95 Ok(Arc::new(backup_store))
96}