risingwave_storage/hummock/
backup_reader.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::future::Future;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::time::Duration;
20
21use arc_swap::ArcSwap;
22use futures::FutureExt;
23use futures::future::Shared;
24use risingwave_backup::error::BackupError;
25use risingwave_backup::meta_snapshot::{MetaSnapshot, Metadata};
26use risingwave_backup::storage::{MetaSnapshotStorage, ObjectStoreMetaSnapshotStorage};
27use risingwave_backup::{MetaSnapshotId, meta_snapshot_v1, meta_snapshot_v2};
28use risingwave_common::catalog::TableId;
29use risingwave_common::config::ObjectStoreConfig;
30use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
31use risingwave_common::system_param::reader::SystemParamsRead;
32use risingwave_object_store::object::build_remote_object_store;
33use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
34use thiserror_ext::AsReport;
35
36use crate::error::{StorageError, StorageResult};
37use crate::hummock::HummockError;
38use crate::hummock::local_version::pinned_version::{PinVersionAction, PinnedVersion};
39
40pub type BackupReaderRef = Arc<BackupReader>;
41
42type VersionHolder = (
43    PinnedVersion,
44    tokio::sync::mpsc::UnboundedReceiver<PinVersionAction>,
45);
46
47async fn create_snapshot_store(
48    config: &StoreConfig,
49    object_store_config: &ObjectStoreConfig,
50) -> StorageResult<ObjectStoreMetaSnapshotStorage> {
51    let backup_object_store = Arc::new(
52        build_remote_object_store(
53            &config.0,
54            Arc::new(ObjectStoreMetrics::unused()),
55            "Meta Backup",
56            Arc::new(object_store_config.clone()),
57        )
58        .await,
59    );
60    let store = ObjectStoreMetaSnapshotStorage::new(&config.1, backup_object_store).await?;
61    Ok(store)
62}
63
64type InflightRequest = Shared<Pin<Box<dyn Future<Output = Result<PinnedVersion, String>> + Send>>>;
65/// (url, dir)
66type StoreConfig = (String, String);
67/// `BackupReader` helps to access historical hummock versions,
68/// which are persisted in meta snapshots (aka backups).
69pub struct BackupReader {
70    versions: parking_lot::RwLock<HashMap<MetaSnapshotId, VersionHolder>>,
71    inflight_request: parking_lot::Mutex<HashMap<MetaSnapshotId, InflightRequest>>,
72    store: ArcSwap<(ObjectStoreMetaSnapshotStorage, StoreConfig)>,
73    refresh_tx: tokio::sync::mpsc::UnboundedSender<u64>,
74    object_store_config: ObjectStoreConfig,
75}
76
77impl BackupReader {
78    pub async fn new(
79        storage_url: &str,
80        storage_directory: &str,
81        object_store_config: &ObjectStoreConfig,
82    ) -> StorageResult<BackupReaderRef> {
83        let config = (storage_url.to_owned(), storage_directory.to_owned());
84        let store = create_snapshot_store(&config, object_store_config).await?;
85        tracing::info!(
86            "backup reader is initialized: url={}, dir={}",
87            config.0,
88            config.1
89        );
90        Ok(Self::with_store(
91            (store, config),
92            object_store_config.clone(),
93        ))
94    }
95
96    fn with_store(
97        store: (ObjectStoreMetaSnapshotStorage, StoreConfig),
98        object_store_config: ObjectStoreConfig,
99    ) -> BackupReaderRef {
100        let (refresh_tx, refresh_rx) = tokio::sync::mpsc::unbounded_channel();
101        let instance = Arc::new(Self {
102            store: ArcSwap::from_pointee(store),
103            versions: Default::default(),
104            inflight_request: Default::default(),
105            object_store_config,
106            refresh_tx,
107        });
108        tokio::spawn(Self::start_manifest_refresher(instance.clone(), refresh_rx));
109        instance
110    }
111
112    pub async fn unused() -> BackupReaderRef {
113        Self::with_store(
114            (
115                risingwave_backup::storage::unused().await,
116                StoreConfig::default(),
117            ),
118            ObjectStoreConfig::default(),
119        )
120    }
121
122    async fn set_store(&self, config: StoreConfig) -> StorageResult<()> {
123        let new_store = create_snapshot_store(&config, &self.object_store_config).await?;
124        tracing::info!(
125            "backup reader is updated: url={}, dir={}",
126            config.0,
127            config.1
128        );
129        self.store.store(Arc::new((new_store, config)));
130        Ok(())
131    }
132
133    /// Watches latest manifest id to keep local manifest update to date.
134    async fn start_manifest_refresher(
135        backup_reader: BackupReaderRef,
136        mut refresh_rx: tokio::sync::mpsc::UnboundedReceiver<u64>,
137    ) {
138        loop {
139            let expect_manifest_id = refresh_rx.recv().await;
140            if expect_manifest_id.is_none() {
141                break;
142            }
143            let expect_manifest_id = expect_manifest_id.unwrap();
144            // Use the same store throughout one run.
145            let current_store = backup_reader.store.load_full();
146            let previous_id = current_store.0.manifest().manifest_id;
147            if expect_manifest_id <= previous_id {
148                continue;
149            }
150            if let Err(e) = current_store.0.refresh_manifest().await {
151                // reschedule refresh request
152                tracing::warn!(error = %e.as_report(), "failed to refresh backup manifest, will retry");
153                let backup_reader_clone = backup_reader.clone();
154                tokio::spawn(async move {
155                    tokio::time::sleep(Duration::from_secs(60)).await;
156                    backup_reader_clone.try_refresh_manifest(expect_manifest_id);
157                });
158                continue;
159            }
160            // purge stale version cache
161            let manifest: HashSet<MetaSnapshotId> = current_store
162                .0
163                .manifest()
164                .snapshot_metadata
165                .iter()
166                .map(|s| s.id)
167                .collect();
168            backup_reader
169                .versions
170                .write()
171                .retain(|k, _v| manifest.contains(k));
172        }
173    }
174
175    pub fn try_refresh_manifest(self: &BackupReaderRef, min_manifest_id: u64) {
176        let _ = self.refresh_tx.send(min_manifest_id).inspect_err(|_| {
177            tracing::warn!(min_manifest_id, "failed to send refresh_manifest request")
178        });
179    }
180
181    /// Tries to get a hummock version eligible for querying `epoch`.
182    /// SSTs of the returned version are expected to be guarded by corresponding backup.
183    /// Otherwise, reading the version may encounter object store error, due to SST absence.
184    pub async fn try_get_hummock_version(
185        self: &BackupReaderRef,
186        table_id: TableId,
187        epoch: u64,
188    ) -> StorageResult<Option<PinnedVersion>> {
189        // Use the same store throughout the call.
190        let current_store = self.store.load_full();
191        // 1. check manifest to locate snapshot, if any.
192        let Some(snapshot_metadata) = current_store
193            .0
194            .manifest()
195            .snapshot_metadata
196            .iter()
197            .find(|v| {
198                if let Some(m) = v.state_table_info.get(&table_id.table_id()) {
199                    return epoch == m.committed_epoch;
200                }
201                false
202            })
203            .cloned()
204        else {
205            return Ok(None);
206        };
207        let snapshot_id = snapshot_metadata.id;
208        // 2. load hummock version of chosen snapshot.
209        let future = {
210            let mut req_guard = self.inflight_request.lock();
211            if let Some((v, _)) = self.versions.read().get(&snapshot_id) {
212                return Ok(Some(v.clone()));
213            }
214            if let Some(f) = req_guard.get(&snapshot_id) {
215                f.clone()
216            } else {
217                let this = self.clone();
218                let f = async move {
219                    let to_not_found_error = |e: BackupError| {
220                        format!(
221                            "failed to get meta snapshot {}: {}",
222                            snapshot_id,
223                            e.as_report()
224                        )
225                    };
226                    let version_holder = if snapshot_metadata.format_version < 2 {
227                        let snapshot: meta_snapshot_v1::MetaSnapshotV1 = current_store
228                            .0
229                            .get(snapshot_id)
230                            .await
231                            .map_err(to_not_found_error)?;
232                        build_version_holder(snapshot)
233                    } else {
234                        let snapshot: meta_snapshot_v2::MetaSnapshotV2 = current_store
235                            .0
236                            .get(snapshot_id)
237                            .await
238                            .map_err(to_not_found_error)?;
239                        build_version_holder(snapshot)
240                    };
241                    let version_clone = version_holder.0.clone();
242                    this.versions.write().insert(snapshot_id, version_holder);
243                    Ok(version_clone)
244                }
245                .boxed()
246                .shared();
247                req_guard.insert(snapshot_id, f.clone());
248                f
249            }
250        };
251        let result = future
252            .await
253            .map(Some)
254            .map_err(|e| HummockError::read_backup_error(e).into());
255        self.inflight_request.lock().remove(&snapshot_id);
256        result
257    }
258
259    pub async fn watch_config_change(
260        &self,
261        mut rx: tokio::sync::watch::Receiver<SystemParamsReaderRef>,
262    ) {
263        loop {
264            if rx.changed().await.is_err() {
265                break;
266            }
267            let p = rx.borrow().load();
268            let config = (
269                p.backup_storage_url().to_owned(),
270                p.backup_storage_directory().to_owned(),
271            );
272            if config == self.store.load().1 {
273                continue;
274            }
275            if let Err(e) = self.set_store(config.clone()).await {
276                // Retry is driven by periodic system params notification.
277                tracing::warn!(
278                    url = config.0, dir = config.1,
279                    error = %e.as_report(),
280                    "failed to update backup reader",
281                );
282            }
283        }
284    }
285}
286
287fn build_version_holder<S: Metadata>(s: MetaSnapshot<S>) -> VersionHolder {
288    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
289    (PinnedVersion::new(s.metadata.hummock_version(), tx), rx)
290}
291
292impl From<BackupError> for StorageError {
293    fn from(e: BackupError) -> Self {
294        HummockError::other(e).into()
295    }
296}