use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use fail::fail_point;
use parking_lot::RwLock;
use risingwave_hummock_sdk::compact::statistics_compact_task;
use risingwave_hummock_sdk::compact_task::CompactTask;
use risingwave_hummock_sdk::{HummockCompactionTaskId, HummockContextId};
use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
use risingwave_pb::hummock::{
CancelCompactTask, CompactTaskAssignment, CompactTaskProgress, SubscribeCompactionEventResponse,
};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use crate::manager::MetaSrvEnv;
use crate::model::MetadataModelError;
use crate::MetaResult;
pub type CompactorManagerRef = Arc<CompactorManager>;
pub const TASK_RUN_TOO_LONG: &str = "running too long";
pub const TASK_NOT_FOUND: &str = "task not found";
pub const TASK_NORMAL: &str = "task is normal, please wait some time";
#[derive(Debug)]
pub struct Compactor {
context_id: HummockContextId,
sender: UnboundedSender<MetaResult<SubscribeCompactionEventResponse>>,
}
struct TaskHeartbeat {
task: CompactTask,
num_ssts_sealed: u32,
num_ssts_uploaded: u32,
num_progress_key: u64,
num_pending_read_io: u64,
num_pending_write_io: u64,
create_time: Instant,
expire_at: u64,
update_at: u64,
}
impl Compactor {
pub fn new(
context_id: HummockContextId,
sender: UnboundedSender<MetaResult<SubscribeCompactionEventResponse>>,
) -> Self {
Self { context_id, sender }
}
pub fn send_event(&self, event: ResponseEvent) -> MetaResult<()> {
fail_point!("compaction_send_task_fail", |_| Err(anyhow::anyhow!(
"compaction_send_task_fail"
)
.into()));
self.sender
.send(Ok(SubscribeCompactionEventResponse {
event: Some(event),
create_at: SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Clock may have gone backwards")
.as_millis() as u64,
}))
.map_err(|e| anyhow::anyhow!(e))?;
Ok(())
}
pub fn cancel_task(&self, task_id: u64) -> MetaResult<()> {
self.sender
.send(Ok(SubscribeCompactionEventResponse {
event: Some(ResponseEvent::CancelCompactTask(CancelCompactTask {
context_id: self.context_id,
task_id,
})),
create_at: SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Clock may have gone backwards")
.as_millis() as u64,
}))
.map_err(|e| anyhow::anyhow!(e))?;
Ok(())
}
pub fn cancel_tasks(&self, task_ids: &Vec<u64>) -> MetaResult<()> {
for task_id in task_ids {
self.cancel_task(*task_id)?;
}
Ok(())
}
pub fn context_id(&self) -> HummockContextId {
self.context_id
}
}
pub struct CompactorManagerInner {
pub task_expired_seconds: u64,
pub heartbeat_expired_seconds: u64,
task_heartbeats: HashMap<HummockCompactionTaskId, TaskHeartbeat>,
pub compactor_map: HashMap<HummockContextId, Arc<Compactor>>,
}
impl CompactorManagerInner {
pub async fn with_meta(env: MetaSrvEnv) -> MetaResult<Self> {
use risingwave_meta_model::compaction_task;
use sea_orm::EntityTrait;
let task_assignment: Vec<CompactTaskAssignment> = compaction_task::Entity::find()
.all(&env.meta_store_ref().conn)
.await
.map_err(MetadataModelError::from)?
.into_iter()
.map(Into::into)
.collect();
let mut manager = Self {
task_expired_seconds: env.opts.compaction_task_max_progress_interval_secs,
heartbeat_expired_seconds: env.opts.compaction_task_max_heartbeat_interval_secs,
task_heartbeats: Default::default(),
compactor_map: Default::default(),
};
task_assignment.into_iter().for_each(|assignment| {
manager.initiate_task_heartbeat(CompactTask::from(assignment.compact_task.unwrap()));
});
Ok(manager)
}
pub fn for_test() -> Self {
Self {
task_expired_seconds: 1,
heartbeat_expired_seconds: 1,
task_heartbeats: Default::default(),
compactor_map: Default::default(),
}
}
pub fn next_compactor(&self) -> Option<Arc<Compactor>> {
use rand::Rng;
if self.compactor_map.is_empty() {
return None;
}
let rand_index = rand::thread_rng().gen_range(0..self.compactor_map.len());
let compactor = self.compactor_map.values().nth(rand_index).unwrap().clone();
Some(compactor)
}
pub fn add_compactor(
&mut self,
context_id: HummockContextId,
) -> UnboundedReceiver<MetaResult<SubscribeCompactionEventResponse>> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
self.compactor_map
.insert(context_id, Arc::new(Compactor::new(context_id, tx)));
tracing::info!("Added compactor session {}", context_id);
rx
}
pub fn abort_all_compactors(&mut self) {
while let Some(compactor) = self.next_compactor() {
self.remove_compactor(compactor.context_id);
}
}
pub fn remove_compactor(&mut self, context_id: HummockContextId) {
self.compactor_map.remove(&context_id);
tracing::info!("Removed compactor session {}", context_id);
}
pub fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<Compactor>> {
self.compactor_map.get(&context_id).cloned()
}
pub fn check_tasks_status(
&self,
tasks: &[HummockCompactionTaskId],
slow_task_duration: Duration,
) -> HashMap<HummockCompactionTaskId, (Duration, &'static str)> {
let tasks_ids: HashSet<u64> = HashSet::from_iter(tasks.to_vec());
let mut ret = HashMap::default();
for TaskHeartbeat {
task, create_time, ..
} in self.task_heartbeats.values()
{
if !tasks_ids.contains(&task.task_id) {
continue;
}
let pending_time = create_time.elapsed();
if pending_time > slow_task_duration {
ret.insert(task.task_id, (pending_time, TASK_RUN_TOO_LONG));
} else {
ret.insert(task.task_id, (pending_time, TASK_NORMAL));
}
}
for task_id in tasks {
if !ret.contains_key(task_id) {
ret.insert(*task_id, (Duration::from_secs(0), TASK_NOT_FOUND));
}
}
ret
}
pub fn get_heartbeat_expired_tasks(&self) -> Vec<CompactTask> {
let heartbeat_expired_ts: u64 = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Clock may have gone backwards")
.as_secs()
- self.heartbeat_expired_seconds;
Self::get_heartbeat_expired_tasks_impl(&self.task_heartbeats, heartbeat_expired_ts)
}
fn get_heartbeat_expired_tasks_impl(
task_heartbeats: &HashMap<HummockCompactionTaskId, TaskHeartbeat>,
heartbeat_expired_ts: u64,
) -> Vec<CompactTask> {
let mut cancellable_tasks = vec![];
const MAX_TASK_DURATION_SEC: u64 = 2700;
for TaskHeartbeat {
expire_at,
task,
create_time,
num_ssts_sealed,
num_ssts_uploaded,
num_progress_key,
num_pending_read_io,
num_pending_write_io,
update_at,
} in task_heartbeats.values()
{
if *update_at < heartbeat_expired_ts {
cancellable_tasks.push(task.clone());
}
let task_duration_too_long = create_time.elapsed().as_secs() > MAX_TASK_DURATION_SEC;
if task_duration_too_long {
let compact_task_statistics = statistics_compact_task(task);
tracing::info!(
"CompactionGroupId {} Task {} duration too long create_time {:?} expire_at {:?} num_ssts_sealed {} num_ssts_uploaded {} num_progress_key {} \
pending_read_io_count {} pending_write_io_count {} target_level {} \
base_level {} target_sub_level_id {} task_type {} compact_task_statistics {:?}",
task.compaction_group_id,
task.task_id,
create_time,
expire_at,
num_ssts_sealed,
num_ssts_uploaded,
num_progress_key,
num_pending_read_io,
num_pending_write_io,
task.target_level,
task.base_level,
task.target_sub_level_id,
task.task_type.as_str_name(),
compact_task_statistics
);
}
}
cancellable_tasks
}
pub fn initiate_task_heartbeat(&mut self, task: CompactTask) {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Clock may have gone backwards")
.as_secs();
self.task_heartbeats.insert(
task.task_id,
TaskHeartbeat {
task,
num_ssts_sealed: 0,
num_ssts_uploaded: 0,
num_progress_key: 0,
num_pending_read_io: 0,
num_pending_write_io: 0,
create_time: Instant::now(),
expire_at: now + self.task_expired_seconds,
update_at: now,
},
);
}
pub fn remove_task_heartbeat(&mut self, task_id: u64) {
self.task_heartbeats.remove(&task_id).unwrap();
}
pub fn update_task_heartbeats(
&mut self,
progress_list: &Vec<CompactTaskProgress>,
) -> Vec<CompactTask> {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Clock may have gone backwards")
.as_secs();
let mut cancel_tasks = vec![];
for progress in progress_list {
if let Some(task_ref) = self.task_heartbeats.get_mut(&progress.task_id) {
task_ref.update_at = now;
if task_ref.num_ssts_sealed < progress.num_ssts_sealed
|| task_ref.num_ssts_uploaded < progress.num_ssts_uploaded
|| task_ref.num_progress_key < progress.num_progress_key
{
task_ref.expire_at = now + self.task_expired_seconds;
task_ref.num_ssts_sealed = progress.num_ssts_sealed;
task_ref.num_ssts_uploaded = progress.num_ssts_uploaded;
task_ref.num_progress_key = progress.num_progress_key;
}
task_ref.num_pending_read_io = progress.num_pending_read_io;
task_ref.num_pending_write_io = progress.num_pending_write_io;
if task_ref.expire_at < now {
cancel_tasks.push(task_ref.task.clone())
}
}
}
cancel_tasks
}
pub fn compactor_num(&self) -> usize {
self.compactor_map.len()
}
pub fn get_progress(&self) -> Vec<CompactTaskProgress> {
self.task_heartbeats
.values()
.map(|hb| CompactTaskProgress {
task_id: hb.task.task_id,
num_ssts_sealed: hb.num_ssts_sealed,
num_ssts_uploaded: hb.num_ssts_uploaded,
num_progress_key: hb.num_progress_key,
num_pending_read_io: hb.num_pending_read_io,
num_pending_write_io: hb.num_pending_write_io,
compaction_group_id: Some(hb.task.compaction_group_id),
})
.collect()
}
}
pub struct CompactorManager {
inner: Arc<RwLock<CompactorManagerInner>>,
}
impl CompactorManager {
pub async fn with_meta(env: MetaSrvEnv) -> MetaResult<Self> {
let inner = CompactorManagerInner::with_meta(env).await?;
Ok(Self {
inner: Arc::new(RwLock::new(inner)),
})
}
pub fn for_test() -> Self {
let inner = CompactorManagerInner::for_test();
Self {
inner: Arc::new(RwLock::new(inner)),
}
}
pub fn next_compactor(&self) -> Option<Arc<Compactor>> {
self.inner.read().next_compactor()
}
pub fn add_compactor(
&self,
context_id: HummockContextId,
) -> UnboundedReceiver<MetaResult<SubscribeCompactionEventResponse>> {
self.inner.write().add_compactor(context_id)
}
pub fn abort_all_compactors(&self) {
self.inner.write().abort_all_compactors();
}
pub fn remove_compactor(&self, context_id: HummockContextId) {
self.inner.write().remove_compactor(context_id)
}
pub fn get_compactor(&self, context_id: HummockContextId) -> Option<Arc<Compactor>> {
self.inner.read().get_compactor(context_id)
}
pub fn check_tasks_status(
&self,
tasks: &[HummockCompactionTaskId],
slow_task_duration: Duration,
) -> HashMap<HummockCompactionTaskId, (Duration, &'static str)> {
self.inner
.read()
.check_tasks_status(tasks, slow_task_duration)
}
pub fn get_heartbeat_expired_tasks(&self) -> Vec<CompactTask> {
self.inner.read().get_heartbeat_expired_tasks()
}
pub fn initiate_task_heartbeat(&self, task: CompactTask) {
self.inner.write().initiate_task_heartbeat(task);
}
pub fn remove_task_heartbeat(&self, task_id: u64) {
self.inner.write().remove_task_heartbeat(task_id);
}
pub fn update_task_heartbeats(
&self,
progress_list: &Vec<CompactTaskProgress>,
) -> Vec<CompactTask> {
self.inner.write().update_task_heartbeats(progress_list)
}
pub fn compactor_num(&self) -> usize {
self.inner.read().compactor_num()
}
pub fn get_progress(&self) -> Vec<CompactTaskProgress> {
self.inner.read().get_progress()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_pb::hummock::CompactTaskProgress;
use risingwave_rpc_client::HummockMetaClient;
use crate::hummock::compaction::selector::default_compaction_selector;
use crate::hummock::test_utils::{
add_ssts, register_table_ids_to_compaction_group, setup_compute_env,
};
use crate::hummock::{CompactorManager, MockHummockMetaClient};
#[tokio::test]
async fn test_compactor_manager() {
let (env, context_id) = {
let (env, hummock_manager, _cluster_manager, worker_id) = setup_compute_env(80).await;
let context_id = worker_id as _;
let hummock_meta_client: Arc<dyn HummockMetaClient> = Arc::new(
MockHummockMetaClient::new(hummock_manager.clone(), context_id),
);
let compactor_manager = hummock_manager.compactor_manager_ref_for_test();
register_table_ids_to_compaction_group(
hummock_manager.as_ref(),
&[1],
StaticCompactionGroupId::StateDefault.into(),
)
.await;
let _sst_infos =
add_ssts(1, hummock_manager.as_ref(), hummock_meta_client.clone()).await;
let _receiver = compactor_manager.add_compactor(context_id);
hummock_manager
.get_compact_task(
StaticCompactionGroupId::StateDefault.into(),
&mut default_compaction_selector(),
)
.await
.unwrap()
.unwrap();
(env, context_id)
};
let compactor_manager = CompactorManager::with_meta(env).await.unwrap();
assert_eq!(compactor_manager.compactor_num(), 0);
assert!(compactor_manager.get_compactor(context_id).is_none());
tokio::time::sleep(Duration::from_secs(2)).await;
let expired = compactor_manager.get_heartbeat_expired_tasks();
assert_eq!(expired.len(), 1);
assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 1);
compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
task_id: expired[0].task_id + 1,
num_ssts_sealed: 1,
num_ssts_uploaded: 1,
num_progress_key: 100,
..Default::default()
}]);
assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 1);
compactor_manager.update_task_heartbeats(&vec![CompactTaskProgress {
task_id: expired[0].task_id,
num_ssts_sealed: 1,
num_ssts_uploaded: 1,
num_progress_key: 100,
..Default::default()
}]);
assert_eq!(compactor_manager.get_heartbeat_expired_tasks().len(), 0);
assert_eq!(compactor_manager.compactor_num(), 0);
assert!(compactor_manager.get_compactor(context_id).is_none());
compactor_manager.add_compactor(context_id);
assert_eq!(compactor_manager.compactor_num(), 1);
assert_eq!(
compactor_manager
.get_compactor(context_id)
.unwrap()
.context_id(),
context_id
);
compactor_manager.remove_compactor(context_id);
assert_eq!(compactor_manager.compactor_num(), 0);
assert!(compactor_manager.get_compactor(context_id).is_none());
}
}