risingwave_storage/hummock/
backup_reader.rsuse std::collections::{HashMap, HashSet};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use arc_swap::ArcSwap;
use futures::future::Shared;
use futures::FutureExt;
use risingwave_backup::error::BackupError;
use risingwave_backup::meta_snapshot::{MetaSnapshot, Metadata};
use risingwave_backup::storage::{MetaSnapshotStorage, ObjectStoreMetaSnapshotStorage};
use risingwave_backup::{meta_snapshot_v1, meta_snapshot_v2, MetaSnapshotId};
use risingwave_common::catalog::TableId;
use risingwave_common::config::ObjectStoreConfig;
use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_object_store::object::build_remote_object_store;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use thiserror_ext::AsReport;
use crate::error::{StorageError, StorageResult};
use crate::hummock::local_version::pinned_version::{PinVersionAction, PinnedVersion};
use crate::hummock::HummockError;
pub type BackupReaderRef = Arc<BackupReader>;
type VersionHolder = (
PinnedVersion,
tokio::sync::mpsc::UnboundedReceiver<PinVersionAction>,
);
async fn create_snapshot_store(
config: &StoreConfig,
object_store_config: &ObjectStoreConfig,
) -> StorageResult<ObjectStoreMetaSnapshotStorage> {
let backup_object_store = Arc::new(
build_remote_object_store(
&config.0,
Arc::new(ObjectStoreMetrics::unused()),
"Meta Backup",
Arc::new(object_store_config.clone()),
)
.await,
);
let store = ObjectStoreMetaSnapshotStorage::new(&config.1, backup_object_store).await?;
Ok(store)
}
type InflightRequest = Shared<Pin<Box<dyn Future<Output = Result<PinnedVersion, String>> + Send>>>;
type StoreConfig = (String, String);
pub struct BackupReader {
versions: parking_lot::RwLock<HashMap<MetaSnapshotId, VersionHolder>>,
inflight_request: parking_lot::Mutex<HashMap<MetaSnapshotId, InflightRequest>>,
store: ArcSwap<(ObjectStoreMetaSnapshotStorage, StoreConfig)>,
refresh_tx: tokio::sync::mpsc::UnboundedSender<u64>,
object_store_config: ObjectStoreConfig,
}
impl BackupReader {
pub async fn new(
storage_url: &str,
storage_directory: &str,
object_store_config: &ObjectStoreConfig,
) -> StorageResult<BackupReaderRef> {
let config = (storage_url.to_string(), storage_directory.to_string());
let store = create_snapshot_store(&config, object_store_config).await?;
tracing::info!(
"backup reader is initialized: url={}, dir={}",
config.0,
config.1
);
Ok(Self::with_store(
(store, config),
object_store_config.clone(),
))
}
fn with_store(
store: (ObjectStoreMetaSnapshotStorage, StoreConfig),
object_store_config: ObjectStoreConfig,
) -> BackupReaderRef {
let (refresh_tx, refresh_rx) = tokio::sync::mpsc::unbounded_channel();
let instance = Arc::new(Self {
store: ArcSwap::from_pointee(store),
versions: Default::default(),
inflight_request: Default::default(),
object_store_config,
refresh_tx,
});
tokio::spawn(Self::start_manifest_refresher(instance.clone(), refresh_rx));
instance
}
pub async fn unused() -> BackupReaderRef {
Self::with_store(
(
risingwave_backup::storage::unused().await,
StoreConfig::default(),
),
ObjectStoreConfig::default(),
)
}
async fn set_store(&self, config: StoreConfig) -> StorageResult<()> {
let new_store = create_snapshot_store(&config, &self.object_store_config).await?;
tracing::info!(
"backup reader is updated: url={}, dir={}",
config.0,
config.1
);
self.store.store(Arc::new((new_store, config)));
Ok(())
}
async fn start_manifest_refresher(
backup_reader: BackupReaderRef,
mut refresh_rx: tokio::sync::mpsc::UnboundedReceiver<u64>,
) {
loop {
let expect_manifest_id = refresh_rx.recv().await;
if expect_manifest_id.is_none() {
break;
}
let expect_manifest_id = expect_manifest_id.unwrap();
let current_store = backup_reader.store.load_full();
let previous_id = current_store.0.manifest().manifest_id;
if expect_manifest_id <= previous_id {
continue;
}
if let Err(e) = current_store.0.refresh_manifest().await {
tracing::warn!(error = %e.as_report(), "failed to refresh backup manifest, will retry");
let backup_reader_clone = backup_reader.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(60)).await;
backup_reader_clone.try_refresh_manifest(expect_manifest_id);
});
continue;
}
let manifest: HashSet<MetaSnapshotId> = current_store
.0
.manifest()
.snapshot_metadata
.iter()
.map(|s| s.id)
.collect();
backup_reader
.versions
.write()
.retain(|k, _v| manifest.contains(k));
}
}
pub fn try_refresh_manifest(self: &BackupReaderRef, min_manifest_id: u64) {
let _ = self.refresh_tx.send(min_manifest_id).inspect_err(|_| {
tracing::warn!(min_manifest_id, "failed to send refresh_manifest request")
});
}
pub async fn try_get_hummock_version(
self: &BackupReaderRef,
table_id: TableId,
epoch: u64,
) -> StorageResult<Option<PinnedVersion>> {
let current_store = self.store.load_full();
let Some(snapshot_metadata) = current_store
.0
.manifest()
.snapshot_metadata
.iter()
.find(|v| {
if let Some(m) = v.state_table_info.get(&table_id.table_id()) {
return epoch == m.committed_epoch;
}
false
})
.cloned()
else {
return Ok(None);
};
let snapshot_id = snapshot_metadata.id;
let future = {
let mut req_guard = self.inflight_request.lock();
if let Some((v, _)) = self.versions.read().get(&snapshot_id) {
return Ok(Some(v.clone()));
}
if let Some(f) = req_guard.get(&snapshot_id) {
f.clone()
} else {
let this = self.clone();
let f = async move {
let to_not_found_error = |e: BackupError| {
format!(
"failed to get meta snapshot {}: {}",
snapshot_id,
e.as_report()
)
};
let version_holder = if snapshot_metadata.format_version < 2 {
let snapshot: meta_snapshot_v1::MetaSnapshotV1 = current_store
.0
.get(snapshot_id)
.await
.map_err(to_not_found_error)?;
build_version_holder(snapshot)
} else {
let snapshot: meta_snapshot_v2::MetaSnapshotV2 = current_store
.0
.get(snapshot_id)
.await
.map_err(to_not_found_error)?;
build_version_holder(snapshot)
};
let version_clone = version_holder.0.clone();
this.versions.write().insert(snapshot_id, version_holder);
Ok(version_clone)
}
.boxed()
.shared();
req_guard.insert(snapshot_id, f.clone());
f
}
};
let result = future
.await
.map(Some)
.map_err(|e| HummockError::read_backup_error(e).into());
self.inflight_request.lock().remove(&snapshot_id);
result
}
pub async fn watch_config_change(
&self,
mut rx: tokio::sync::watch::Receiver<SystemParamsReaderRef>,
) {
loop {
if rx.changed().await.is_err() {
break;
}
let p = rx.borrow().load();
let config = (
p.backup_storage_url().to_string(),
p.backup_storage_directory().to_string(),
);
if config == self.store.load().1 {
continue;
}
if let Err(e) = self.set_store(config.clone()).await {
tracing::warn!(
url = config.0, dir = config.1,
error = %e.as_report(),
"failed to update backup reader",
);
}
}
}
}
fn build_version_holder<S: Metadata>(s: MetaSnapshot<S>) -> VersionHolder {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
(PinnedVersion::new(s.metadata.hummock_version(), tx), rx)
}
impl From<BackupError> for StorageError {
fn from(e: BackupError) -> Self {
HummockError::other(e).into()
}
}