risingwave_storage/hummock/sstable/
sstable_object_id_manager.rsuse std::collections::VecDeque;
use std::ops::DerefMut;
use std::sync::Arc;
use parking_lot::Mutex;
use risingwave_hummock_sdk::{HummockSstableObjectId, SstObjectIdRange};
use risingwave_pb::hummock::GetNewSstIdsRequest;
use risingwave_rpc_client::{GrpcCompactorProxyClient, HummockMetaClient};
use sync_point::sync_point;
use thiserror_ext::AsReport;
use tokio::sync::oneshot;
use crate::hummock::{HummockError, HummockResult};
pub type SstableObjectIdManagerRef = Arc<SstableObjectIdManager>;
use dyn_clone::DynClone;
#[async_trait::async_trait]
pub trait GetObjectId: DynClone + Send + Sync {
async fn get_new_sst_object_id(&mut self) -> HummockResult<HummockSstableObjectId>;
}
dyn_clone::clone_trait_object!(GetObjectId);
pub struct SstableObjectIdManager {
wait_queue: Mutex<Option<Vec<oneshot::Sender<bool>>>>,
available_sst_object_ids: Mutex<SstObjectIdRange>,
remote_fetch_number: u32,
hummock_meta_client: Arc<dyn HummockMetaClient>,
}
impl SstableObjectIdManager {
pub fn new(hummock_meta_client: Arc<dyn HummockMetaClient>, remote_fetch_number: u32) -> Self {
Self {
wait_queue: Default::default(),
available_sst_object_ids: Mutex::new(SstObjectIdRange::new(
HummockSstableObjectId::MIN,
HummockSstableObjectId::MIN,
)),
remote_fetch_number,
hummock_meta_client,
}
}
async fn map_next_sst_object_id<F>(
self: &Arc<Self>,
f: F,
) -> HummockResult<HummockSstableObjectId>
where
F: Fn(&mut SstObjectIdRange) -> Option<HummockSstableObjectId>,
{
loop {
if let Some(new_id) = f(self.available_sst_object_ids.lock().deref_mut()) {
return Ok(new_id);
}
let waiter = {
let mut guard = self.wait_queue.lock();
if let Some(new_id) = f(self.available_sst_object_ids.lock().deref_mut()) {
return Ok(new_id);
}
let wait_queue = guard.deref_mut();
if let Some(wait_queue) = wait_queue {
let (tx, rx) = oneshot::channel();
wait_queue.push(tx);
Some(rx)
} else {
*wait_queue = Some(vec![]);
None
}
};
if let Some(waiter) = waiter {
sync_point!("MAP_NEXT_SST_OBJECT_ID.AS_FOLLOWER");
let _ = waiter.await;
continue;
}
sync_point!("MAP_NEXT_SST_OBJECT_ID.AS_LEADER");
sync_point!("MAP_NEXT_SST_OBJECT_ID.BEFORE_FETCH");
let this = self.clone();
tokio::spawn(async move {
let new_sst_ids = match this
.hummock_meta_client
.get_new_sst_ids(this.remote_fetch_number)
.await
.map_err(HummockError::meta_error)
{
Ok(new_sst_ids) => new_sst_ids,
Err(err) => {
this.notify_waiters(false);
return Err(err);
}
};
sync_point!("MAP_NEXT_SST_OBJECT_ID.AFTER_FETCH");
sync_point!("MAP_NEXT_SST_OBJECT_ID.BEFORE_FILL_CACHE");
let result = {
let mut guard = this.available_sst_object_ids.lock();
let available_sst_object_ids = guard.deref_mut();
if new_sst_ids.start_id < available_sst_object_ids.end_id {
Err(HummockError::meta_error(format!(
"SST id moves backwards. new {} < old {}",
new_sst_ids.start_id, available_sst_object_ids.end_id
)))
} else {
*available_sst_object_ids = new_sst_ids;
Ok(())
}
};
this.notify_waiters(result.is_ok());
result
})
.await
.unwrap()?;
}
}
fn notify_waiters(&self, success: bool) {
let mut guard = self.wait_queue.lock();
let wait_queue = guard.deref_mut().take().unwrap();
for notify in wait_queue {
let _ = notify.send(success);
}
}
}
#[async_trait::async_trait]
impl GetObjectId for Arc<SstableObjectIdManager> {
async fn get_new_sst_object_id(&mut self) -> HummockResult<HummockSstableObjectId> {
self.map_next_sst_object_id(|available_sst_object_ids| {
available_sst_object_ids.get_next_sst_object_id()
})
.await
}
}
struct SharedComapctorObjectIdManagerCore {
output_object_ids: VecDeque<u64>,
client: Option<GrpcCompactorProxyClient>,
sstable_id_remote_fetch_number: u32,
}
impl SharedComapctorObjectIdManagerCore {
pub fn new(
output_object_ids: VecDeque<u64>,
client: GrpcCompactorProxyClient,
sstable_id_remote_fetch_number: u32,
) -> Self {
Self {
output_object_ids,
client: Some(client),
sstable_id_remote_fetch_number,
}
}
pub fn for_test(output_object_ids: VecDeque<u64>) -> Self {
Self {
output_object_ids,
client: None,
sstable_id_remote_fetch_number: 0,
}
}
}
#[derive(Clone)]
pub struct SharedComapctorObjectIdManager {
core: Arc<tokio::sync::Mutex<SharedComapctorObjectIdManagerCore>>,
}
impl SharedComapctorObjectIdManager {
pub fn new(
output_object_ids: VecDeque<u64>,
client: GrpcCompactorProxyClient,
sstable_id_remote_fetch_number: u32,
) -> Self {
Self {
core: Arc::new(tokio::sync::Mutex::new(
SharedComapctorObjectIdManagerCore::new(
output_object_ids,
client,
sstable_id_remote_fetch_number,
),
)),
}
}
pub fn for_test(output_object_ids: VecDeque<u64>) -> Self {
Self {
core: Arc::new(tokio::sync::Mutex::new(
SharedComapctorObjectIdManagerCore::for_test(output_object_ids),
)),
}
}
}
#[async_trait::async_trait]
impl GetObjectId for SharedComapctorObjectIdManager {
async fn get_new_sst_object_id(&mut self) -> HummockResult<HummockSstableObjectId> {
let mut guard = self.core.lock().await;
let core = guard.deref_mut();
if let Some(first_element) = core.output_object_ids.pop_front() {
Ok(first_element)
} else {
tracing::warn!("The pre-allocated object ids are used up, and new object id are obtained through RPC.");
let request = GetNewSstIdsRequest {
number: core.sstable_id_remote_fetch_number,
};
match core
.client
.as_mut()
.expect("GrpcCompactorProxyClient is None")
.get_new_sst_ids(request)
.await
{
Ok(response) => {
let resp = response.into_inner();
let start_id = resp.start_id;
core.output_object_ids.extend((start_id + 1)..resp.end_id);
Ok(start_id)
}
Err(e) => Err(HummockError::other(format!(
"Fail to get new sst id: {}",
e.as_report()
))),
}
}
}
}