risingwave_storage/hummock/
backup_reader.rs1use std::collections::{HashMap, HashSet};
16use std::future::Future;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::time::Duration;
20
21use arc_swap::ArcSwap;
22use futures::FutureExt;
23use futures::future::Shared;
24use risingwave_backup::error::BackupError;
25use risingwave_backup::meta_snapshot::{MetaSnapshot, Metadata};
26use risingwave_backup::storage::{MetaSnapshotStorage, ObjectStoreMetaSnapshotStorage};
27use risingwave_backup::{MetaSnapshotId, meta_snapshot_v1, meta_snapshot_v2};
28use risingwave_common::catalog::TableId;
29use risingwave_common::config::ObjectStoreConfig;
30use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
31use risingwave_common::system_param::reader::SystemParamsRead;
32use risingwave_object_store::object::build_remote_object_store;
33use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
34use thiserror_ext::AsReport;
35
36use crate::error::{StorageError, StorageResult};
37use crate::hummock::HummockError;
38use crate::hummock::local_version::pinned_version::{PinVersionAction, PinnedVersion};
39
40pub type BackupReaderRef = Arc<BackupReader>;
41
42type VersionHolder = (
43 PinnedVersion,
44 tokio::sync::mpsc::UnboundedReceiver<PinVersionAction>,
45);
46
47async fn create_snapshot_store(
48 config: &StoreConfig,
49 object_store_config: &ObjectStoreConfig,
50) -> StorageResult<ObjectStoreMetaSnapshotStorage> {
51 let backup_object_store = Arc::new(
52 build_remote_object_store(
53 &config.0,
54 Arc::new(ObjectStoreMetrics::unused()),
55 "Meta Backup",
56 Arc::new(object_store_config.clone()),
57 )
58 .await,
59 );
60 let store = ObjectStoreMetaSnapshotStorage::new(&config.1, backup_object_store).await?;
61 Ok(store)
62}
63
64type InflightRequest = Shared<Pin<Box<dyn Future<Output = Result<PinnedVersion, String>> + Send>>>;
65type StoreConfig = (String, String);
67pub struct BackupReader {
70 versions: parking_lot::RwLock<HashMap<MetaSnapshotId, VersionHolder>>,
71 inflight_request: parking_lot::Mutex<HashMap<MetaSnapshotId, InflightRequest>>,
72 store: ArcSwap<(ObjectStoreMetaSnapshotStorage, StoreConfig)>,
73 refresh_tx: tokio::sync::mpsc::UnboundedSender<u64>,
74 object_store_config: ObjectStoreConfig,
75}
76
77impl BackupReader {
78 pub async fn new(
79 storage_url: &str,
80 storage_directory: &str,
81 object_store_config: &ObjectStoreConfig,
82 ) -> StorageResult<BackupReaderRef> {
83 let config = (storage_url.to_owned(), storage_directory.to_owned());
84 let store = create_snapshot_store(&config, object_store_config).await?;
85 tracing::info!(
86 "backup reader is initialized: url={}, dir={}",
87 config.0,
88 config.1
89 );
90 Ok(Self::with_store(
91 (store, config),
92 object_store_config.clone(),
93 ))
94 }
95
96 fn with_store(
97 store: (ObjectStoreMetaSnapshotStorage, StoreConfig),
98 object_store_config: ObjectStoreConfig,
99 ) -> BackupReaderRef {
100 let (refresh_tx, refresh_rx) = tokio::sync::mpsc::unbounded_channel();
101 let instance = Arc::new(Self {
102 store: ArcSwap::from_pointee(store),
103 versions: Default::default(),
104 inflight_request: Default::default(),
105 object_store_config,
106 refresh_tx,
107 });
108 tokio::spawn(Self::start_manifest_refresher(instance.clone(), refresh_rx));
109 instance
110 }
111
112 pub async fn unused() -> BackupReaderRef {
113 Self::with_store(
114 (
115 risingwave_backup::storage::unused().await,
116 StoreConfig::default(),
117 ),
118 ObjectStoreConfig::default(),
119 )
120 }
121
122 async fn set_store(&self, config: StoreConfig) -> StorageResult<()> {
123 let new_store = create_snapshot_store(&config, &self.object_store_config).await?;
124 tracing::info!(
125 "backup reader is updated: url={}, dir={}",
126 config.0,
127 config.1
128 );
129 self.store.store(Arc::new((new_store, config)));
130 Ok(())
131 }
132
133 async fn start_manifest_refresher(
135 backup_reader: BackupReaderRef,
136 mut refresh_rx: tokio::sync::mpsc::UnboundedReceiver<u64>,
137 ) {
138 loop {
139 let expect_manifest_id = refresh_rx.recv().await;
140 if expect_manifest_id.is_none() {
141 break;
142 }
143 let expect_manifest_id = expect_manifest_id.unwrap();
144 let current_store = backup_reader.store.load_full();
146 let previous_id = current_store.0.manifest().await.manifest_id;
147 if expect_manifest_id <= previous_id {
148 continue;
149 }
150 if let Err(e) = current_store.0.refresh_manifest().await {
151 tracing::warn!(error = %e.as_report(), "failed to refresh backup manifest, will retry");
153 let backup_reader_clone = backup_reader.clone();
154 tokio::spawn(async move {
155 tokio::time::sleep(Duration::from_secs(60)).await;
156 backup_reader_clone.try_refresh_manifest(expect_manifest_id);
157 });
158 continue;
159 }
160 let manifest: HashSet<MetaSnapshotId> = current_store
162 .0
163 .manifest()
164 .await
165 .snapshot_metadata
166 .iter()
167 .map(|s| s.id)
168 .collect();
169 backup_reader
170 .versions
171 .write()
172 .retain(|k, _v| manifest.contains(k));
173 }
174 }
175
176 pub fn try_refresh_manifest(self: &BackupReaderRef, min_manifest_id: u64) {
177 let _ = self.refresh_tx.send(min_manifest_id).inspect_err(|_| {
178 tracing::warn!(min_manifest_id, "failed to send refresh_manifest request")
179 });
180 }
181
182 pub async fn try_get_hummock_version(
186 self: &BackupReaderRef,
187 table_id: TableId,
188 epoch: u64,
189 ) -> StorageResult<Option<PinnedVersion>> {
190 let current_store = self.store.load_full();
192 let Some(snapshot_metadata) = current_store
194 .0
195 .manifest()
196 .await
197 .snapshot_metadata
198 .iter()
199 .find(|v| {
200 if let Some(m) = v.state_table_info.get(&table_id) {
201 return epoch == m.committed_epoch;
202 }
203 false
204 })
205 .cloned()
206 else {
207 return Ok(None);
208 };
209 let snapshot_id = snapshot_metadata.id;
210 let future = {
212 let mut req_guard = self.inflight_request.lock();
213 if let Some((v, _)) = self.versions.read().get(&snapshot_id) {
214 return Ok(Some(v.clone()));
215 }
216 if let Some(f) = req_guard.get(&snapshot_id) {
217 f.clone()
218 } else {
219 let this = self.clone();
220 let f = async move {
221 let to_not_found_error = |e: BackupError| {
222 format!(
223 "failed to get meta snapshot {}: {}",
224 snapshot_id,
225 e.as_report()
226 )
227 };
228 let version_holder = if snapshot_metadata.format_version < 2 {
229 let snapshot: meta_snapshot_v1::MetaSnapshotV1 = current_store
230 .0
231 .get(snapshot_id)
232 .await
233 .map_err(to_not_found_error)?;
234 build_version_holder(snapshot)
235 } else {
236 let snapshot: meta_snapshot_v2::MetaSnapshotV2 = current_store
237 .0
238 .get(snapshot_id)
239 .await
240 .map_err(to_not_found_error)?;
241 build_version_holder(snapshot)
242 };
243 let version_clone = version_holder.0.clone();
244 this.versions.write().insert(snapshot_id, version_holder);
245 Ok(version_clone)
246 }
247 .boxed()
248 .shared();
249 req_guard.insert(snapshot_id, f.clone());
250 f
251 }
252 };
253 let result = future
254 .await
255 .map(Some)
256 .map_err(|e| HummockError::read_backup_error(e).into());
257 self.inflight_request.lock().remove(&snapshot_id);
258 result
259 }
260
261 pub async fn watch_config_change(
262 &self,
263 mut rx: tokio::sync::watch::Receiver<SystemParamsReaderRef>,
264 ) {
265 loop {
266 if rx.changed().await.is_err() {
267 break;
268 }
269 let p = rx.borrow().load();
270 let config = (
271 p.backup_storage_url().to_owned(),
272 p.backup_storage_directory().to_owned(),
273 );
274 if config == self.store.load().1 {
275 continue;
276 }
277 if let Err(e) = self.set_store(config.clone()).await {
278 tracing::warn!(
280 url = config.0, dir = config.1,
281 error = %e.as_report(),
282 "failed to update backup reader",
283 );
284 }
285 }
286 }
287}
288
289fn build_version_holder<S: Metadata>(s: MetaSnapshot<S>) -> VersionHolder {
290 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
291 (PinnedVersion::new(s.metadata.hummock_version(), tx), rx)
292}
293
294impl From<BackupError> for StorageError {
295 fn from(e: BackupError) -> Self {
296 HummockError::other(e).into()
297 }
298}