risingwave_storage/hummock/event_handler/
mod.rsuse std::collections::{HashMap, HashSet};
use std::sync::Arc;
use parking_lot::{RwLock, RwLockReadGuard};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId};
use thiserror_ext::AsReport;
use tokio::sync::oneshot;
use crate::hummock::shared_buffer::shared_buffer_batch::{SharedBufferBatch, SharedBufferBatchId};
use crate::hummock::HummockResult;
use crate::mem_table::ImmutableMemtable;
use crate::store::SealCurrentEpochOptions;
pub mod hummock_event_handler;
pub mod refiller;
pub mod uploader;
pub use hummock_event_handler::HummockEventHandler;
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
use super::store::version::HummockReadVersion;
use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
use crate::hummock::event_handler::uploader::SyncedData;
#[derive(Debug)]
pub struct BufferWriteRequest {
pub batch: SharedBufferBatch,
pub epoch: HummockEpoch,
pub grant_sender: oneshot::Sender<()>,
}
#[derive(Debug)]
pub enum HummockVersionUpdate {
VersionDeltas(Vec<HummockVersionDelta>),
PinnedVersion(Box<HummockVersion>),
}
pub enum HummockEvent {
BufferMayFlush,
SyncEpoch {
sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
},
Clear(oneshot::Sender<()>, Option<HashSet<TableId>>),
Shutdown,
ImmToUploader {
instance_id: SharedBufferBatchId,
imm: ImmutableMemtable,
},
StartEpoch {
epoch: HummockEpoch,
table_ids: HashSet<TableId>,
},
InitEpoch {
instance_id: LocalInstanceId,
init_epoch: HummockEpoch,
},
LocalSealEpoch {
instance_id: LocalInstanceId,
next_epoch: HummockEpoch,
opts: SealCurrentEpochOptions,
},
#[cfg(any(test, feature = "test"))]
FlushEvent(oneshot::Sender<()>),
RegisterReadVersion {
table_id: TableId,
new_read_version_sender: oneshot::Sender<(HummockReadVersionRef, LocalInstanceGuard)>,
is_replicated: bool,
vnodes: Arc<Bitmap>,
},
DestroyReadVersion {
instance_id: LocalInstanceId,
},
GetMinUncommittedSstId {
result_tx: oneshot::Sender<Option<HummockSstableObjectId>>,
},
}
impl HummockEvent {
fn to_debug_string(&self) -> String {
match self {
HummockEvent::BufferMayFlush => "BufferMayFlush".to_string(),
HummockEvent::SyncEpoch {
sync_result_sender: _,
sync_table_epochs,
} => format!("AwaitSyncEpoch epoch {:?}", sync_table_epochs),
HummockEvent::Clear(_, table_ids) => {
format!("Clear {:?}", table_ids)
}
HummockEvent::Shutdown => "Shutdown".to_string(),
HummockEvent::StartEpoch { epoch, table_ids } => {
format!("StartEpoch {} {:?}", epoch, table_ids)
}
HummockEvent::InitEpoch {
instance_id,
init_epoch,
} => {
format!("InitEpoch {} {}", instance_id, init_epoch)
}
HummockEvent::ImmToUploader { instance_id, imm } => {
format!("ImmToUploader {} {}", instance_id, imm.batch_id())
}
HummockEvent::LocalSealEpoch {
instance_id,
next_epoch,
opts,
} => {
format!(
"LocalSealEpoch next_epoch: {}, instance_id: {}, opts: {:?}",
next_epoch, instance_id, opts
)
}
HummockEvent::RegisterReadVersion {
table_id,
new_read_version_sender: _,
is_replicated,
vnodes: _,
} => format!(
"RegisterReadVersion table_id {:?}, is_replicated: {:?}",
table_id, is_replicated
),
HummockEvent::DestroyReadVersion { instance_id } => {
format!("DestroyReadVersion instance_id {:?}", instance_id)
}
#[cfg(any(test, feature = "test"))]
HummockEvent::FlushEvent(_) => "FlushEvent".to_string(),
HummockEvent::GetMinUncommittedSstId { .. } => "GetMinSpilledSstId".to_string(),
}
}
}
impl std::fmt::Debug for HummockEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HummockEvent")
.field("debug_string", &self.to_debug_string())
.finish()
}
}
pub type LocalInstanceId = u64;
pub const TEST_LOCAL_INSTANCE_ID: LocalInstanceId = 233;
pub type HummockReadVersionRef = Arc<RwLock<HummockReadVersion>>;
pub type ReadVersionMappingType = HashMap<TableId, HashMap<LocalInstanceId, HummockReadVersionRef>>;
pub type ReadOnlyReadVersionMapping = ReadOnlyRwLockRef<ReadVersionMappingType>;
pub struct ReadOnlyRwLockRef<T>(Arc<RwLock<T>>);
impl<T> Clone for ReadOnlyRwLockRef<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<T> ReadOnlyRwLockRef<T> {
pub fn new(inner: Arc<RwLock<T>>) -> Self {
Self(inner)
}
pub fn read(&self) -> RwLockReadGuard<'_, T> {
self.0.read()
}
}
pub struct LocalInstanceGuard {
pub table_id: TableId,
pub instance_id: LocalInstanceId,
event_sender: Option<HummockEventSender>,
}
impl Drop for LocalInstanceGuard {
fn drop(&mut self) {
if let Some(sender) = self.event_sender.take() {
sender
.send(HummockEvent::DestroyReadVersion {
instance_id: self.instance_id,
})
.unwrap_or_else(|err| {
tracing::debug!(
error = %err.as_report(),
table_id = %self.table_id,
instance_id = self.instance_id,
"LocalInstanceGuard Drop SendError",
)
})
}
}
}