use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
use std::ops::DerefMut;
use std::sync::Arc;
use bytes::Bytes;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::compact_task::ReportTask;
use risingwave_hummock_sdk::compaction_group::{
group_split, StateTableId, StaticCompactionGroupId,
};
use risingwave_hummock_sdk::version::{GroupDelta, GroupDeltas};
use risingwave_hummock_sdk::{can_concat, CompactionGroupId};
use risingwave_pb::hummock::compact_task::TaskStatus;
use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
use risingwave_pb::hummock::{
CompatibilityVersion, PbGroupConstruct, PbGroupMerge, PbStateTableInfoDelta,
};
use thiserror_ext::AsReport;
use super::CompactionGroupStatistic;
use crate::hummock::error::{Error, Result};
use crate::hummock::manager::transaction::HummockVersionTransaction;
use crate::hummock::manager::{commit_multi_var, HummockManager};
use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id};
impl HummockManager {
pub async fn merge_compaction_group(
&self,
group_1: CompactionGroupId,
group_2: CompactionGroupId,
) -> Result<()> {
self.merge_compaction_group_impl(group_1, group_2, None)
.await
}
pub async fn merge_compaction_group_for_test(
&self,
group_1: CompactionGroupId,
group_2: CompactionGroupId,
created_tables: HashSet<u32>,
) -> Result<()> {
self.merge_compaction_group_impl(group_1, group_2, Some(created_tables))
.await
}
pub async fn merge_compaction_group_impl(
&self,
group_1: CompactionGroupId,
group_2: CompactionGroupId,
created_tables: Option<HashSet<u32>>,
) -> Result<()> {
let compaction_guard = self.compaction.write().await;
let mut versioning_guard = self.versioning.write().await;
let versioning = versioning_guard.deref_mut();
if !versioning.current_version.levels.contains_key(&group_1) {
return Err(Error::CompactionGroup(format!("invalid group {}", group_1)));
}
if !versioning.current_version.levels.contains_key(&group_2) {
return Err(Error::CompactionGroup(format!("invalid group {}", group_2)));
}
let state_table_info = versioning.current_version.state_table_info.clone();
let mut member_table_ids_1 = state_table_info
.compaction_group_member_table_ids(group_1)
.iter()
.cloned()
.collect_vec();
let mut member_table_ids_2 = state_table_info
.compaction_group_member_table_ids(group_2)
.iter()
.cloned()
.collect_vec();
debug_assert!(!member_table_ids_1.is_empty());
debug_assert!(!member_table_ids_2.is_empty());
assert!(member_table_ids_1.is_sorted());
assert!(member_table_ids_2.is_sorted());
let created_tables = if let Some(created_tables) = created_tables {
assert!(cfg!(debug_assertions));
created_tables
} else {
match self.metadata_manager.get_created_table_ids().await {
Ok(created_tables) => HashSet::from_iter(created_tables),
Err(err) => {
tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
return Err(Error::CompactionGroup(format!(
"merge group_1 {} group_2 {} failed to fetch created table ids",
group_1, group_2
)));
}
}
};
fn contains_creating_table(
table_ids: &Vec<TableId>,
created_tables: &HashSet<u32>,
) -> bool {
table_ids
.iter()
.any(|table_id| !created_tables.contains(&table_id.table_id()))
}
if contains_creating_table(&member_table_ids_1, &created_tables)
|| contains_creating_table(&member_table_ids_2, &created_tables)
{
return Err(Error::CompactionGroup(format!(
"Not Merge creating group {} next_group {} member_table_ids_1 {:?} member_table_ids_2 {:?}",
group_1, group_2, member_table_ids_1, member_table_ids_2
)));
}
let (left_group_id, right_group_id) =
if member_table_ids_1.first().unwrap() < member_table_ids_2.first().unwrap() {
(group_1, group_2)
} else {
std::mem::swap(&mut member_table_ids_1, &mut member_table_ids_2);
(group_2, group_1)
};
if member_table_ids_1.last().unwrap() >= member_table_ids_2.first().unwrap() {
return Err(Error::CompactionGroup(format!(
"invalid merge group_1 {} group_2 {}",
left_group_id, right_group_id
)));
}
let combined_member_table_ids = member_table_ids_1
.iter()
.chain(member_table_ids_2.iter())
.collect_vec();
assert!(combined_member_table_ids.is_sorted());
let mut sst_id_set = HashSet::new();
for sst_id in versioning
.current_version
.get_sst_ids_by_group_id(left_group_id)
.chain(
versioning
.current_version
.get_sst_ids_by_group_id(right_group_id),
)
{
if !sst_id_set.insert(sst_id) {
return Err(Error::CompactionGroup(format!(
"invalid merge group_1 {} group_2 {} duplicated sst_id {}",
left_group_id, right_group_id, sst_id
)));
}
}
{
let left_levels = versioning
.current_version
.get_compaction_group_levels(group_1);
let right_levels = versioning
.current_version
.get_compaction_group_levels(group_2);
let max_level = std::cmp::max(left_levels.levels.len(), right_levels.levels.len());
for level_idx in 1..=max_level {
let left_level = left_levels.get_level(level_idx);
let right_level = right_levels.get_level(level_idx);
if left_level.table_infos.is_empty() || right_level.table_infos.is_empty() {
continue;
}
let left_last_sst = left_level.table_infos.last().unwrap().clone();
let right_first_sst = right_level.table_infos.first().unwrap().clone();
let left_sst_id = left_last_sst.sst_id;
let right_sst_id = right_first_sst.sst_id;
let left_obj_id = left_last_sst.object_id;
let right_obj_id = right_first_sst.object_id;
if !can_concat(&[left_last_sst, right_first_sst]) {
return Err(Error::CompactionGroup(format!(
"invalid merge group_1 {} group_2 {} level_idx {} left_last_sst_id {} right_first_sst_id {} left_obj_id {} right_obj_id {}",
left_group_id, right_group_id, level_idx, left_sst_id, right_sst_id, left_obj_id, right_obj_id
)));
}
}
}
let mut version = HummockVersionTransaction::new(
&mut versioning.current_version,
&mut versioning.hummock_version_deltas,
self.env.notification_manager(),
&self.metrics,
);
let mut new_version_delta = version.new_delta();
let target_compaction_group_id = {
new_version_delta.group_deltas.insert(
left_group_id,
GroupDeltas {
group_deltas: vec![GroupDelta::GroupMerge(PbGroupMerge {
left_group_id,
right_group_id,
})],
},
);
left_group_id
};
new_version_delta.with_latest_version(|version, new_version_delta| {
for table_id in combined_member_table_ids {
let table_id = TableId::new(table_id.table_id());
let info = version
.state_table_info
.info()
.get(&table_id)
.expect("have check exist previously");
assert!(new_version_delta
.state_table_info_delta
.insert(
table_id,
PbStateTableInfoDelta {
committed_epoch: info.committed_epoch,
compaction_group_id: target_compaction_group_id,
}
)
.is_none());
}
});
{
let mut compaction_group_manager = self.compaction_group_manager.write().await;
let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
{
let right_group_max_level = new_version_delta
.latest_version()
.get_compaction_group_levels(right_group_id)
.levels
.len();
remove_compaction_group_in_sst_stat(
&self.metrics,
right_group_id,
right_group_max_level,
);
}
new_version_delta.pre_apply();
compaction_groups_txn.remove(right_group_id);
commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
}
versioning.mark_next_time_travel_version_snapshot();
let mut canceled_tasks = vec![];
let compact_task_assignments =
compaction_guard.get_compact_task_assignments_by_group_id(right_group_id);
compact_task_assignments
.into_iter()
.for_each(|task_assignment| {
if let Some(task) = task_assignment.compact_task.as_ref() {
assert_eq!(task.compaction_group_id, right_group_id);
canceled_tasks.push(ReportTask {
task_id: task.task_id,
task_status: TaskStatus::ManualCanceled,
table_stats_change: HashMap::default(),
sorted_output_ssts: vec![],
object_timestamps: HashMap::default(),
});
}
});
drop(versioning_guard);
drop(compaction_guard);
if !canceled_tasks.is_empty() {
self.report_compact_tasks(canceled_tasks).await?;
}
self.metrics
.merge_compaction_group_count
.with_label_values(&[&left_group_id.to_string()])
.inc();
Ok(())
}
}
impl HummockManager {
async fn split_compaction_group_impl(
&self,
parent_group_id: CompactionGroupId,
split_table_ids: &[StateTableId],
table_id_to_split: StateTableId,
vnode_to_split: VirtualNode,
partition_vnode_count: Option<u32>,
) -> Result<Vec<(CompactionGroupId, Vec<StateTableId>)>> {
let mut result = vec![];
let compaction_guard = self.compaction.write().await;
let mut versioning_guard = self.versioning.write().await;
let versioning = versioning_guard.deref_mut();
if !versioning
.current_version
.levels
.contains_key(&parent_group_id)
{
return Err(Error::CompactionGroup(format!(
"invalid group {}",
parent_group_id
)));
}
let member_table_ids = versioning
.current_version
.state_table_info
.compaction_group_member_table_ids(parent_group_id)
.iter()
.map(|table_id| table_id.table_id)
.collect::<BTreeSet<_>>();
if !member_table_ids.contains(&table_id_to_split) {
return Err(Error::CompactionGroup(format!(
"table {} doesn't in group {}",
table_id_to_split, parent_group_id
)));
}
let split_full_key = group_split::build_split_full_key(table_id_to_split, vnode_to_split);
let table_ids = member_table_ids.into_iter().collect_vec();
if table_ids == split_table_ids {
return Err(Error::CompactionGroup(format!(
"invalid split attempt for group {}: all member tables are moved",
parent_group_id
)));
}
let (table_ids_left, table_ids_right) =
group_split::split_table_ids_with_table_id_and_vnode(
&table_ids,
split_full_key.user_key.table_id.table_id(),
split_full_key.user_key.get_vnode_id(),
);
if table_ids_left.is_empty() || table_ids_right.is_empty() {
if !table_ids_left.is_empty() {
result.push((parent_group_id, table_ids_left));
}
if !table_ids_right.is_empty() {
result.push((parent_group_id, table_ids_right));
}
return Ok(result);
}
result.push((parent_group_id, table_ids_left));
let split_key: Bytes = split_full_key.encode().into();
let mut version = HummockVersionTransaction::new(
&mut versioning.current_version,
&mut versioning.hummock_version_deltas,
self.env.notification_manager(),
&self.metrics,
);
let mut new_version_delta = version.new_delta();
let split_sst_count = new_version_delta
.latest_version()
.count_new_ssts_in_group_split(parent_group_id, split_key.clone());
let new_sst_start_id = next_sstable_object_id(&self.env, split_sst_count).await?;
let (new_compaction_group_id, config) = {
let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
let config = self
.compaction_group_manager
.read()
.await
.default_compaction_config()
.as_ref()
.clone();
#[expect(deprecated)]
new_version_delta.group_deltas.insert(
new_compaction_group_id,
GroupDeltas {
group_deltas: vec![GroupDelta::GroupConstruct(PbGroupConstruct {
group_config: Some(config.clone()),
group_id: new_compaction_group_id,
parent_group_id,
new_sst_start_id,
table_ids: vec![],
version: CompatibilityVersion::SplitGroupByTableId as i32, split_key: Some(split_key.into()),
})],
},
);
(new_compaction_group_id, config)
};
new_version_delta.with_latest_version(|version, new_version_delta| {
for table_id in &table_ids_right {
let table_id = TableId::new(*table_id);
let info = version
.state_table_info
.info()
.get(&table_id)
.expect("have check exist previously");
assert!(new_version_delta
.state_table_info_delta
.insert(
table_id,
PbStateTableInfoDelta {
committed_epoch: info.committed_epoch,
compaction_group_id: new_compaction_group_id,
}
)
.is_none());
}
});
result.push((new_compaction_group_id, table_ids_right));
{
let mut compaction_group_manager = self.compaction_group_manager.write().await;
let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
compaction_groups_txn
.create_compaction_groups(new_compaction_group_id, Arc::new(config));
for (cg_id, table_ids) in &result {
if let Some(partition_vnode_count) = partition_vnode_count
&& table_ids.len() == 1
&& table_ids == split_table_ids
{
if let Err(err) = compaction_groups_txn.update_compaction_config(
&[*cg_id],
&[MutableConfig::SplitWeightByVnode(partition_vnode_count)],
) {
tracing::error!(
error = %err.as_report(),
"failed to update compaction config for group-{}",
cg_id
);
}
}
}
new_version_delta.pre_apply();
commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
}
versioning.mark_next_time_travel_version_snapshot();
let mut canceled_tasks = vec![];
let compact_task_assignments =
compaction_guard.get_compact_task_assignments_by_group_id(parent_group_id);
let levels = versioning
.current_version
.get_compaction_group_levels(parent_group_id);
compact_task_assignments
.into_iter()
.for_each(|task_assignment| {
if let Some(task) = task_assignment.compact_task.as_ref() {
let input_sst_ids: HashSet<u64> = task
.input_ssts
.iter()
.flat_map(|level| level.table_infos.iter().map(|sst| sst.sst_id))
.collect();
let input_level_ids: Vec<u32> = task
.input_ssts
.iter()
.map(|level| level.level_idx)
.collect();
let need_cancel = !levels.check_sst_ids_exist(&input_level_ids, input_sst_ids);
if need_cancel {
canceled_tasks.push(ReportTask {
task_id: task.task_id,
task_status: TaskStatus::ManualCanceled,
table_stats_change: HashMap::default(),
sorted_output_ssts: vec![],
object_timestamps: HashMap::default(),
});
}
}
});
drop(versioning_guard);
drop(compaction_guard);
if !canceled_tasks.is_empty() {
self.report_compact_tasks(canceled_tasks).await?;
}
self.metrics
.split_compaction_group_count
.with_label_values(&[&parent_group_id.to_string()])
.inc();
Ok(result)
}
pub async fn move_state_tables_to_dedicated_compaction_group(
&self,
parent_group_id: CompactionGroupId,
table_ids: &[StateTableId],
partition_vnode_count: Option<u32>,
) -> Result<(
CompactionGroupId,
BTreeMap<CompactionGroupId, Vec<StateTableId>>,
)> {
if table_ids.is_empty() {
return Err(Error::CompactionGroup(
"table_ids must not be empty".to_string(),
));
}
if !table_ids.is_sorted() {
return Err(Error::CompactionGroup(
"table_ids must be sorted".to_string(),
));
}
let parent_table_ids = {
let versioning_guard = self.versioning.read().await;
versioning_guard
.current_version
.state_table_info
.compaction_group_member_table_ids(parent_group_id)
.iter()
.map(|table_id| table_id.table_id)
.collect_vec()
};
if parent_table_ids == table_ids {
return Err(Error::CompactionGroup(format!(
"invalid split attempt for group {}: all member tables are moved",
parent_group_id
)));
}
fn check_table_ids_valid(cg_id_to_table_ids: &BTreeMap<u64, Vec<u32>>) {
{
cg_id_to_table_ids
.iter()
.for_each(|(_cg_id, table_ids)| assert!(table_ids.is_sorted()));
}
{
let mut table_table_ids_vec = cg_id_to_table_ids.values().cloned().collect_vec();
table_table_ids_vec.sort_by(|a, b| a[0].cmp(&b[0]));
assert!(table_table_ids_vec.concat().is_sorted());
}
{
let mut all_table_ids = HashSet::new();
for table_ids in cg_id_to_table_ids.values() {
for table_id in table_ids {
assert!(all_table_ids.insert(*table_id));
}
}
}
}
let mut cg_id_to_table_ids: BTreeMap<u64, Vec<u32>> = BTreeMap::new();
let table_id_to_split = *table_ids.first().unwrap();
let mut target_compaction_group_id = 0;
let result_vec = self
.split_compaction_group_impl(
parent_group_id,
table_ids,
table_id_to_split,
VirtualNode::ZERO,
partition_vnode_count,
)
.await?;
assert!(result_vec.len() <= 2);
let mut finish_move = false;
for (cg_id, table_ids_after_split) in result_vec {
if table_ids_after_split.contains(&table_id_to_split) {
target_compaction_group_id = cg_id;
}
if table_ids_after_split == table_ids {
finish_move = true;
}
cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
}
check_table_ids_valid(&cg_id_to_table_ids);
if finish_move {
return Ok((target_compaction_group_id, cg_id_to_table_ids));
}
let table_id_to_split = *table_ids.last().unwrap();
let result_vec = self
.split_compaction_group_impl(
target_compaction_group_id,
table_ids,
table_id_to_split,
VirtualNode::MAX_REPRESENTABLE,
partition_vnode_count,
)
.await?;
assert!(result_vec.len() <= 2);
for (cg_id, table_ids_after_split) in result_vec {
if table_ids_after_split.contains(&table_id_to_split) {
target_compaction_group_id = cg_id;
}
cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
}
check_table_ids_valid(&cg_id_to_table_ids);
Ok((target_compaction_group_id, cg_id_to_table_ids))
}
}
impl HummockManager {
pub async fn try_split_compaction_group(
&self,
table_write_throughput: &HashMap<u32, VecDeque<u64>>,
checkpoint_secs: u64,
group: CompactionGroupStatistic,
) {
if group
.compaction_group_config
.compaction_config
.disable_auto_group_scheduling
.unwrap_or(false)
{
return;
}
for (table_id, table_size) in &group.table_statistic {
self.try_move_high_throughput_table_to_dedicated_cg(
table_write_throughput,
table_id,
table_size,
checkpoint_secs,
group.group_id,
)
.await;
}
self.try_split_huge_compaction_group(group).await;
}
pub async fn try_move_high_throughput_table_to_dedicated_cg(
&self,
table_write_throughput: &HashMap<u32, VecDeque<u64>>,
table_id: &u32,
_table_size: &u64,
checkpoint_secs: u64,
parent_group_id: u64,
) {
if !table_write_throughput.contains_key(table_id) {
return;
}
let table_stat_throughput_sample_size =
((self.env.opts.table_stat_throuput_window_seconds_for_split as f64)
/ (checkpoint_secs as f64))
.ceil() as usize;
let table_throughput = table_write_throughput.get(table_id).unwrap();
if table_throughput.len() < table_stat_throughput_sample_size {
return;
}
let is_high_write_throughput = is_table_high_write_throughput(
table_throughput,
table_stat_throughput_sample_size,
self.env.opts.table_high_write_throughput_threshold,
self.env
.opts
.table_stat_high_write_throughput_ratio_for_split,
);
if !is_high_write_throughput {
return;
}
let ret = self
.move_state_tables_to_dedicated_compaction_group(
parent_group_id,
&[*table_id],
Some(self.env.opts.partition_vnode_count),
)
.await;
match ret {
Ok(split_result) => {
tracing::info!("split state table [{}] from group-{} success table_vnode_partition_count {:?} split result {:?}", table_id, parent_group_id, self.env.opts.partition_vnode_count, split_result);
}
Err(e) => {
tracing::info!(
error = %e.as_report(),
"failed to split state table [{}] from group-{}",
table_id,
parent_group_id,
)
}
}
}
pub async fn try_split_huge_compaction_group(&self, group: CompactionGroupStatistic) {
let group_max_size = (group.compaction_group_config.max_estimated_group_size() as f64
* self.env.opts.split_group_size_ratio) as u64;
let is_huge_hybrid_group =
group.group_size > group_max_size && group.table_statistic.len() > 1; if is_huge_hybrid_group {
let mut accumulated_size = 0;
let mut table_ids = Vec::default();
for (table_id, table_size) in &group.table_statistic {
accumulated_size += table_size;
table_ids.push(*table_id);
assert!(table_ids.is_sorted());
if accumulated_size * 2 > group_max_size {
let ret = self
.move_state_tables_to_dedicated_compaction_group(
group.group_id,
&table_ids,
None,
)
.await;
match ret {
Ok(split_result) => {
tracing::info!(
"split_huge_compaction_group success {:?}",
split_result
);
self.metrics
.split_compaction_group_count
.with_label_values(&[&group.group_id.to_string()])
.inc();
return;
}
Err(e) => {
tracing::error!(
error = %e.as_report(),
"failed to split_huge_compaction_group table {:?} from group-{}",
table_ids,
group.group_id
);
return;
}
}
}
}
}
}
pub async fn try_merge_compaction_group(
&self,
table_write_throughput: &HashMap<u32, VecDeque<u64>>,
group: &CompactionGroupStatistic,
next_group: &CompactionGroupStatistic,
checkpoint_secs: u64,
created_tables: &HashSet<u32>,
) -> Result<()> {
if group.group_id == StaticCompactionGroupId::StateDefault as u64
&& next_group.group_id == StaticCompactionGroupId::MaterializedView as u64
{
return Err(Error::CompactionGroup(format!(
"group-{} and group-{} are both StaticCompactionGroupId",
group.group_id, next_group.group_id
)));
}
if group.table_statistic.is_empty() || next_group.table_statistic.is_empty() {
return Err(Error::CompactionGroup(format!(
"group-{} or group-{} is empty",
group.group_id, next_group.group_id
)));
}
if group
.compaction_group_config
.compaction_config
.disable_auto_group_scheduling
.unwrap_or(false)
|| next_group
.compaction_group_config
.compaction_config
.disable_auto_group_scheduling
.unwrap_or(false)
{
return Err(Error::CompactionGroup(format!(
"group-{} or group-{} disable_auto_group_scheduling",
group.group_id, next_group.group_id
)));
}
if check_is_creating_compaction_group(group, created_tables) {
return Err(Error::CompactionGroup(format!(
"Not Merge creating group {} next_group {}",
group.group_id, next_group.group_id
)));
}
let table_stat_throughput_sample_size =
((self.env.opts.table_stat_throuput_window_seconds_for_merge as f64)
/ (checkpoint_secs as f64))
.ceil() as usize;
if !check_is_low_write_throughput_compaction_group(
table_write_throughput,
table_stat_throughput_sample_size,
self.env.opts.table_low_write_throughput_threshold,
group,
self.env
.opts
.table_stat_low_write_throughput_ratio_for_merge,
) {
return Err(Error::CompactionGroup(format!(
"Not Merge high throughput group {} next_group {}",
group.group_id, next_group.group_id
)));
}
let size_limit = (group.compaction_group_config.max_estimated_group_size() as f64
* self.env.opts.split_group_size_ratio) as u64;
if (group.group_size + next_group.group_size) > size_limit {
return Err(Error::CompactionGroup(format!(
"Not Merge huge group {} group_size {} next_group {} next_group_size {} size_limit {}",
group.group_id, group.group_size, next_group.group_id, next_group.group_size, size_limit
)));
}
if check_is_creating_compaction_group(next_group, created_tables) {
return Err(Error::CompactionGroup(format!(
"Not Merge creating group {} next group {}",
group.group_id, next_group.group_id
)));
}
if !check_is_low_write_throughput_compaction_group(
table_write_throughput,
table_stat_throughput_sample_size,
self.env.opts.table_low_write_throughput_threshold,
next_group,
self.env
.opts
.table_stat_low_write_throughput_ratio_for_merge,
) {
return Err(Error::CompactionGroup(format!(
"Not Merge high throughput group {} next group {}",
group.group_id, next_group.group_id
)));
}
match self
.merge_compaction_group(group.group_id, next_group.group_id)
.await
{
Ok(()) => {
tracing::info!(
"merge group-{} to group-{}",
next_group.group_id,
group.group_id,
);
self.metrics
.merge_compaction_group_count
.with_label_values(&[&group.group_id.to_string()])
.inc();
}
Err(e) => {
tracing::info!(
error = %e.as_report(),
"failed to merge group-{} group-{}",
next_group.group_id,
group.group_id,
)
}
}
Ok(())
}
}
pub fn is_table_high_write_throughput(
table_throughput: &VecDeque<u64>,
sample_size: usize,
threshold: u64,
high_write_throughput_ratio: f64,
) -> bool {
assert!(table_throughput.len() >= sample_size);
let mut high_write_throughput_count = 0;
for throughput in table_throughput
.iter()
.skip(table_throughput.len().saturating_sub(sample_size))
{
if *throughput > threshold {
high_write_throughput_count += 1;
}
}
high_write_throughput_count as f64 > sample_size as f64 * high_write_throughput_ratio
}
pub fn is_table_low_write_throughput(
table_throughput: &VecDeque<u64>,
sample_size: usize,
threshold: u64,
low_write_throughput_ratio: f64,
) -> bool {
assert!(table_throughput.len() >= sample_size);
let mut low_write_throughput_count = 0;
for throughput in table_throughput
.iter()
.skip(table_throughput.len().saturating_sub(sample_size))
{
if *throughput <= threshold {
low_write_throughput_count += 1;
}
}
low_write_throughput_count as f64 > sample_size as f64 * low_write_throughput_ratio
}
fn check_is_low_write_throughput_compaction_group(
table_write_throughput: &HashMap<u32, VecDeque<u64>>,
sample_size: usize,
threshold: u64,
group: &CompactionGroupStatistic,
low_write_throughput_ratio: f64,
) -> bool {
let live_table = group
.table_statistic
.keys()
.filter(|table_id| table_write_throughput.contains_key(table_id))
.filter(|table_id| table_write_throughput.get(table_id).unwrap().len() >= sample_size)
.cloned()
.collect_vec();
if live_table.is_empty() {
return false;
}
live_table.into_iter().all(|table_id| {
let table_write_throughput = table_write_throughput.get(&table_id).unwrap();
is_table_low_write_throughput(
table_write_throughput,
sample_size,
threshold,
low_write_throughput_ratio,
)
})
}
fn check_is_creating_compaction_group(
group: &CompactionGroupStatistic,
created_tables: &HashSet<u32>,
) -> bool {
group
.table_statistic
.keys()
.any(|table_id| !created_tables.contains(table_id))
}