risingwave_meta/backup_restore/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_backup::error::{BackupError, BackupResult};
18use risingwave_backup::storage::{MetaSnapshotStorageRef, ObjectStoreMetaSnapshotStorage};
19use risingwave_common::config::{MetaBackend, MetaStoreConfig, ObjectStoreConfig};
20use risingwave_object_store::object::build_remote_object_store;
21use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
22
23use crate::MetaStoreBackend;
24use crate::backup_restore::RestoreOpts;
25use crate::controller::SqlMetaStore;
26
27// Code is copied from src/meta/src/rpc/server.rs. TODO #6482: extract method.
28pub async fn get_meta_store(opts: RestoreOpts) -> BackupResult<SqlMetaStore> {
29    let meta_store_backend = match opts.meta_store_type {
30        MetaBackend::Mem => MetaStoreBackend::Mem,
31        MetaBackend::Sql => MetaStoreBackend::Sql {
32            endpoint: opts.sql_endpoint,
33            config: MetaStoreConfig::default(),
34        },
35        MetaBackend::Sqlite => MetaStoreBackend::Sql {
36            endpoint: format!("sqlite://{}?mode=rwc", opts.sql_endpoint),
37            config: MetaStoreConfig::default(),
38        },
39        MetaBackend::Postgres => MetaStoreBackend::Sql {
40            endpoint: format!(
41                "postgres://{}:{}@{}/{}{}",
42                opts.sql_username,
43                opts.sql_password,
44                opts.sql_endpoint,
45                opts.sql_database,
46                if let Some(params) = &opts.sql_url_params
47                    && !params.is_empty()
48                {
49                    format!("?{}", params)
50                } else {
51                    "".to_owned()
52                }
53            ),
54            config: MetaStoreConfig::default(),
55        },
56        MetaBackend::Mysql => MetaStoreBackend::Sql {
57            endpoint: format!(
58                "mysql://{}:{}@{}/{}{}",
59                opts.sql_username,
60                opts.sql_password,
61                opts.sql_endpoint,
62                opts.sql_database,
63                if let Some(params) = &opts.sql_url_params
64                    && !params.is_empty()
65                {
66                    format!("?{}", params)
67                } else {
68                    "".to_owned()
69                }
70            ),
71            config: MetaStoreConfig::default(),
72        },
73    };
74
75    SqlMetaStore::connect(meta_store_backend)
76        .await
77        .map_err(|e| BackupError::MetaStorage(e.into()))
78}
79
80pub async fn get_backup_store(opts: RestoreOpts) -> BackupResult<MetaSnapshotStorageRef> {
81    let mut config = ObjectStoreConfig::default();
82    config.retry.read_attempt_timeout_ms = opts.read_attempt_timeout_ms;
83    config.retry.read_retry_attempts = opts.read_retry_attempts as usize;
84
85    let object_store = build_remote_object_store(
86        &opts.backup_storage_url,
87        Arc::new(ObjectStoreMetrics::unused()),
88        "Meta Backup",
89        Arc::new(config),
90    )
91    .await;
92    let backup_store =
93        ObjectStoreMetaSnapshotStorage::new(&opts.backup_storage_directory, Arc::new(object_store))
94            .await?;
95    Ok(Arc::new(backup_store))
96}