risingwave_storage/hummock/
backup_reader.rs1use std::collections::{HashMap, HashSet};
16use std::future::Future;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::time::Duration;
20
21use arc_swap::ArcSwap;
22use futures::FutureExt;
23use futures::future::Shared;
24use risingwave_backup::error::BackupError;
25use risingwave_backup::meta_snapshot::{MetaSnapshot, Metadata};
26use risingwave_backup::storage::{MetaSnapshotStorage, ObjectStoreMetaSnapshotStorage};
27use risingwave_backup::{MetaSnapshotId, meta_snapshot_v1, meta_snapshot_v2};
28use risingwave_common::catalog::TableId;
29use risingwave_common::config::ObjectStoreConfig;
30use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
31use risingwave_common::system_param::reader::SystemParamsRead;
32use risingwave_object_store::object::build_remote_object_store;
33use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
34use thiserror_ext::AsReport;
35
36use crate::error::{StorageError, StorageResult};
37use crate::hummock::HummockError;
38use crate::hummock::local_version::pinned_version::{PinVersionAction, PinnedVersion};
39
40pub type BackupReaderRef = Arc<BackupReader>;
41
42type VersionHolder = (
43 PinnedVersion,
44 tokio::sync::mpsc::UnboundedReceiver<PinVersionAction>,
45);
46
47async fn create_snapshot_store(
48 config: &StoreConfig,
49 object_store_config: &ObjectStoreConfig,
50) -> StorageResult<ObjectStoreMetaSnapshotStorage> {
51 let backup_object_store = Arc::new(
52 build_remote_object_store(
53 &config.0,
54 Arc::new(ObjectStoreMetrics::unused()),
55 "Meta Backup",
56 Arc::new(object_store_config.clone()),
57 )
58 .await,
59 );
60 let store = ObjectStoreMetaSnapshotStorage::new(&config.1, backup_object_store).await?;
61 Ok(store)
62}
63
64type InflightRequest = Shared<Pin<Box<dyn Future<Output = Result<PinnedVersion, String>> + Send>>>;
65type StoreConfig = (String, String);
67pub struct BackupReader {
70 versions: parking_lot::RwLock<HashMap<MetaSnapshotId, VersionHolder>>,
71 inflight_request: parking_lot::Mutex<HashMap<MetaSnapshotId, InflightRequest>>,
72 store: ArcSwap<(ObjectStoreMetaSnapshotStorage, StoreConfig)>,
73 refresh_tx: tokio::sync::mpsc::UnboundedSender<u64>,
74 object_store_config: ObjectStoreConfig,
75}
76
77impl BackupReader {
78 pub async fn new(
79 storage_url: &str,
80 storage_directory: &str,
81 object_store_config: &ObjectStoreConfig,
82 ) -> StorageResult<BackupReaderRef> {
83 let config = (storage_url.to_owned(), storage_directory.to_owned());
84 let store = create_snapshot_store(&config, object_store_config).await?;
85 tracing::info!(
86 "backup reader is initialized: url={}, dir={}",
87 config.0,
88 config.1
89 );
90 Ok(Self::with_store(
91 (store, config),
92 object_store_config.clone(),
93 ))
94 }
95
96 fn with_store(
97 store: (ObjectStoreMetaSnapshotStorage, StoreConfig),
98 object_store_config: ObjectStoreConfig,
99 ) -> BackupReaderRef {
100 let (refresh_tx, refresh_rx) = tokio::sync::mpsc::unbounded_channel();
101 let instance = Arc::new(Self {
102 store: ArcSwap::from_pointee(store),
103 versions: Default::default(),
104 inflight_request: Default::default(),
105 object_store_config,
106 refresh_tx,
107 });
108 tokio::spawn(Self::start_manifest_refresher(instance.clone(), refresh_rx));
109 instance
110 }
111
112 pub async fn unused() -> BackupReaderRef {
113 Self::with_store(
114 (
115 risingwave_backup::storage::unused().await,
116 StoreConfig::default(),
117 ),
118 ObjectStoreConfig::default(),
119 )
120 }
121
122 async fn set_store(&self, config: StoreConfig) -> StorageResult<()> {
123 let new_store = create_snapshot_store(&config, &self.object_store_config).await?;
124 tracing::info!(
125 "backup reader is updated: url={}, dir={}",
126 config.0,
127 config.1
128 );
129 self.store.store(Arc::new((new_store, config)));
130 Ok(())
131 }
132
133 async fn start_manifest_refresher(
135 backup_reader: BackupReaderRef,
136 mut refresh_rx: tokio::sync::mpsc::UnboundedReceiver<u64>,
137 ) {
138 loop {
139 let expect_manifest_id = refresh_rx.recv().await;
140 if expect_manifest_id.is_none() {
141 break;
142 }
143 let expect_manifest_id = expect_manifest_id.unwrap();
144 let current_store = backup_reader.store.load_full();
146 let previous_id = current_store.0.manifest().manifest_id;
147 if expect_manifest_id <= previous_id {
148 continue;
149 }
150 if let Err(e) = current_store.0.refresh_manifest().await {
151 tracing::warn!(error = %e.as_report(), "failed to refresh backup manifest, will retry");
153 let backup_reader_clone = backup_reader.clone();
154 tokio::spawn(async move {
155 tokio::time::sleep(Duration::from_secs(60)).await;
156 backup_reader_clone.try_refresh_manifest(expect_manifest_id);
157 });
158 continue;
159 }
160 let manifest: HashSet<MetaSnapshotId> = current_store
162 .0
163 .manifest()
164 .snapshot_metadata
165 .iter()
166 .map(|s| s.id)
167 .collect();
168 backup_reader
169 .versions
170 .write()
171 .retain(|k, _v| manifest.contains(k));
172 }
173 }
174
175 pub fn try_refresh_manifest(self: &BackupReaderRef, min_manifest_id: u64) {
176 let _ = self.refresh_tx.send(min_manifest_id).inspect_err(|_| {
177 tracing::warn!(min_manifest_id, "failed to send refresh_manifest request")
178 });
179 }
180
181 pub async fn try_get_hummock_version(
185 self: &BackupReaderRef,
186 table_id: TableId,
187 epoch: u64,
188 ) -> StorageResult<Option<PinnedVersion>> {
189 let current_store = self.store.load_full();
191 let Some(snapshot_metadata) = current_store
193 .0
194 .manifest()
195 .snapshot_metadata
196 .iter()
197 .find(|v| {
198 if let Some(m) = v.state_table_info.get(&table_id.table_id()) {
199 return epoch == m.committed_epoch;
200 }
201 false
202 })
203 .cloned()
204 else {
205 return Ok(None);
206 };
207 let snapshot_id = snapshot_metadata.id;
208 let future = {
210 let mut req_guard = self.inflight_request.lock();
211 if let Some((v, _)) = self.versions.read().get(&snapshot_id) {
212 return Ok(Some(v.clone()));
213 }
214 if let Some(f) = req_guard.get(&snapshot_id) {
215 f.clone()
216 } else {
217 let this = self.clone();
218 let f = async move {
219 let to_not_found_error = |e: BackupError| {
220 format!(
221 "failed to get meta snapshot {}: {}",
222 snapshot_id,
223 e.as_report()
224 )
225 };
226 let version_holder = if snapshot_metadata.format_version < 2 {
227 let snapshot: meta_snapshot_v1::MetaSnapshotV1 = current_store
228 .0
229 .get(snapshot_id)
230 .await
231 .map_err(to_not_found_error)?;
232 build_version_holder(snapshot)
233 } else {
234 let snapshot: meta_snapshot_v2::MetaSnapshotV2 = current_store
235 .0
236 .get(snapshot_id)
237 .await
238 .map_err(to_not_found_error)?;
239 build_version_holder(snapshot)
240 };
241 let version_clone = version_holder.0.clone();
242 this.versions.write().insert(snapshot_id, version_holder);
243 Ok(version_clone)
244 }
245 .boxed()
246 .shared();
247 req_guard.insert(snapshot_id, f.clone());
248 f
249 }
250 };
251 let result = future
252 .await
253 .map(Some)
254 .map_err(|e| HummockError::read_backup_error(e).into());
255 self.inflight_request.lock().remove(&snapshot_id);
256 result
257 }
258
259 pub async fn watch_config_change(
260 &self,
261 mut rx: tokio::sync::watch::Receiver<SystemParamsReaderRef>,
262 ) {
263 loop {
264 if rx.changed().await.is_err() {
265 break;
266 }
267 let p = rx.borrow().load();
268 let config = (
269 p.backup_storage_url().to_owned(),
270 p.backup_storage_directory().to_owned(),
271 );
272 if config == self.store.load().1 {
273 continue;
274 }
275 if let Err(e) = self.set_store(config.clone()).await {
276 tracing::warn!(
278 url = config.0, dir = config.1,
279 error = %e.as_report(),
280 "failed to update backup reader",
281 );
282 }
283 }
284 }
285}
286
287fn build_version_holder<S: Metadata>(s: MetaSnapshot<S>) -> VersionHolder {
288 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
289 (PinnedVersion::new(s.metadata.hummock_version(), tx), rx)
290}
291
292impl From<BackupError> for StorageError {
293 fn from(e: BackupError) -> Self {
294 HummockError::other(e).into()
295 }
296}