1use std::collections::{BTreeMap, HashMap, HashSet};
30use std::sync::{Arc, LazyLock};
31use std::time::Instant;
32
33use anyhow::Context;
34use compaction_event_loop::{
35 HummockCompactionEventDispatcher, HummockCompactionEventHandler, HummockCompactionEventLoop,
36 HummockCompactorDedicatedEventLoop,
37};
38use fail::fail_point;
39use itertools::Itertools;
40use parking_lot::Mutex;
41use rand::rng as thread_rng;
42use rand::seq::SliceRandom;
43use risingwave_common::catalog::TableId;
44use risingwave_common::config::meta::default::compaction_config;
45use risingwave_common::hash::VnodeCountCompat;
46use risingwave_common::util::epoch::Epoch;
47use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask};
48use risingwave_hummock_sdk::compaction_group::StateTableId;
49use risingwave_hummock_sdk::key_range::KeyRange;
50use risingwave_hummock_sdk::level::Levels;
51use risingwave_hummock_sdk::sstable_info::SstableInfo;
52use risingwave_hummock_sdk::table_stats::{
53 PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats,
54};
55use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
56use risingwave_hummock_sdk::version::{GroupDelta, IntraLevelDelta};
57use risingwave_hummock_sdk::{
58 CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockSstableId,
59 HummockSstableObjectId, HummockVersionId, compact_task_to_string, statistics_compact_task,
60};
61use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
62use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
63use risingwave_pb::hummock::{
64 CompactTaskAssignment, CompactionConfig, PbCompactStatus, PbCompactTaskAssignment,
65 SubscribeCompactionEventRequest, TableOption, TableSchema, compact_task,
66};
67use thiserror_ext::AsReport;
68use tokio::sync::RwLockWriteGuard;
69use tokio::sync::mpsc::UnboundedReceiver;
70use tokio::sync::mpsc::error::SendError;
71use tokio::sync::oneshot::Sender;
72use tokio::task::JoinHandle;
73use tonic::Streaming;
74use tracing::warn;
75
76use crate::hummock::compaction::selector::level_selector::PickerInfo;
77use crate::hummock::compaction::selector::{
78 DynamicLevelSelector, DynamicLevelSelectorCore, LocalSelectorStatistic, ManualCompactionOption,
79 ManualCompactionSelector, SpaceReclaimCompactionSelector, TombstoneCompactionSelector,
80 TtlCompactionSelector, VnodeWatermarkCompactionSelector,
81};
82use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig, CompactionSelector};
83use crate::hummock::error::{Error, Result};
84use crate::hummock::manager::transaction::{
85 HummockVersionStatsTransaction, HummockVersionTransaction,
86};
87use crate::hummock::manager::versioning::Versioning;
88use crate::hummock::metrics_utils::{
89 build_compact_task_level_type_metrics_label, trigger_compact_tasks_stat,
90 trigger_local_table_stat,
91};
92use crate::hummock::model::CompactionGroup;
93use crate::hummock::sequence::next_compaction_task_id;
94use crate::hummock::{HummockManager, commit_multi_var, start_measure_real_process_timer};
95use crate::manager::META_NODE_ID;
96use crate::model::BTreeMapTransaction;
97
98pub mod compaction_event_loop;
99pub mod compaction_group_manager;
100pub mod compaction_group_schedule;
101
102static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
103 [
104 TaskStatus::ManualCanceled,
105 TaskStatus::SendFailCanceled,
106 TaskStatus::AssignFailCanceled,
107 TaskStatus::HeartbeatCanceled,
108 TaskStatus::InvalidGroupCanceled,
109 TaskStatus::NoAvailMemoryResourceCanceled,
110 TaskStatus::NoAvailCpuResourceCanceled,
111 TaskStatus::HeartbeatProgressCanceled,
112 ]
113 .into_iter()
114 .collect()
115});
116
117type CompactionRequestChannelItem = (CompactionGroupId, compact_task::TaskType);
118
119fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> {
120 let mut compaction_selectors: HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> =
121 HashMap::default();
122 compaction_selectors.insert(
123 compact_task::TaskType::Dynamic,
124 Box::<DynamicLevelSelector>::default(),
125 );
126 compaction_selectors.insert(
127 compact_task::TaskType::SpaceReclaim,
128 Box::<SpaceReclaimCompactionSelector>::default(),
129 );
130 compaction_selectors.insert(
131 compact_task::TaskType::Ttl,
132 Box::<TtlCompactionSelector>::default(),
133 );
134 compaction_selectors.insert(
135 compact_task::TaskType::Tombstone,
136 Box::<TombstoneCompactionSelector>::default(),
137 );
138 compaction_selectors.insert(
139 compact_task::TaskType::VnodeWatermark,
140 Box::<VnodeWatermarkCompactionSelector>::default(),
141 );
142 compaction_selectors
143}
144
145impl HummockVersionTransaction<'_> {
146 fn apply_compact_task(&mut self, compact_task: &CompactTask) {
147 let mut version_delta = self.new_delta();
148 let trivial_move = compact_task.is_trivial_move_task();
149 version_delta.trivial_move = trivial_move;
150
151 let group_deltas = &mut version_delta
152 .group_deltas
153 .entry(compact_task.compaction_group_id)
154 .or_default()
155 .group_deltas;
156 let mut removed_table_ids_map: BTreeMap<u32, HashSet<HummockSstableId>> =
157 BTreeMap::default();
158
159 for level in &compact_task.input_ssts {
160 let level_idx = level.level_idx;
161
162 removed_table_ids_map
163 .entry(level_idx)
164 .or_default()
165 .extend(level.table_infos.iter().map(|sst| sst.sst_id));
166 }
167
168 for (level_idx, removed_table_ids) in removed_table_ids_map {
169 let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
170 level_idx,
171 0, removed_table_ids,
173 vec![], 0, compact_task.compaction_group_version_id,
176 ));
177
178 group_deltas.push(group_delta);
179 }
180
181 let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
182 compact_task.target_level,
183 compact_task.target_sub_level_id,
184 HashSet::new(), compact_task.sorted_output_ssts.clone(),
186 compact_task.split_weight_by_vnode,
187 compact_task.compaction_group_version_id,
188 ));
189
190 group_deltas.push(group_delta);
191 version_delta.pre_apply();
192 }
193}
194
195#[derive(Default)]
196pub struct Compaction {
197 pub compact_task_assignment: BTreeMap<HummockCompactionTaskId, PbCompactTaskAssignment>,
199 pub compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus>,
201
202 pub _deterministic_mode: bool,
203}
204
205impl HummockManager {
206 pub async fn get_assigned_compact_task_num(&self) -> u64 {
207 self.compaction.read().await.compact_task_assignment.len() as u64
208 }
209
210 pub async fn list_compaction_status(
211 &self,
212 ) -> (Vec<PbCompactStatus>, Vec<CompactTaskAssignment>) {
213 let compaction = self.compaction.read().await;
214 (
215 compaction.compaction_statuses.values().map_into().collect(),
216 compaction
217 .compact_task_assignment
218 .values()
219 .cloned()
220 .collect(),
221 )
222 }
223
224 pub async fn get_compaction_scores(
225 &self,
226 compaction_group_id: CompactionGroupId,
227 ) -> Vec<PickerInfo> {
228 let (status, levels, group) = {
229 let compaction = self.compaction.read().await;
230 let versioning = self.versioning.read().await;
231 let config_manager = self.compaction_group_manager.read().await;
232 match (
233 compaction.compaction_statuses.get(&compaction_group_id),
234 versioning.current_version.levels.get(&compaction_group_id),
235 config_manager.try_get_compaction_group_config(compaction_group_id),
236 ) {
237 (Some(cs), Some(v), Some(cf)) => (cs.to_owned(), v.to_owned(), cf),
238 _ => {
239 return vec![];
240 }
241 }
242 };
243 let dynamic_level_core = DynamicLevelSelectorCore::new(
244 group.compaction_config,
245 Arc::new(CompactionDeveloperConfig::default()),
246 );
247 let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers);
248 ctx.score_levels
249 }
250}
251
252impl HummockManager {
253 pub fn compaction_event_loop(
254 hummock_manager: Arc<Self>,
255 compactor_streams_change_rx: UnboundedReceiver<(
256 HummockContextId,
257 Streaming<SubscribeCompactionEventRequest>,
258 )>,
259 ) -> Vec<(JoinHandle<()>, Sender<()>)> {
260 let mut join_handle_vec = Vec::default();
261
262 let hummock_compaction_event_handler =
263 HummockCompactionEventHandler::new(hummock_manager.clone());
264
265 let dedicated_event_loop = HummockCompactorDedicatedEventLoop::new(
266 hummock_manager.clone(),
267 hummock_compaction_event_handler.clone(),
268 );
269
270 let (dedicated_event_loop_join_handle, event_tx, shutdown_tx) = dedicated_event_loop.run();
271 join_handle_vec.push((dedicated_event_loop_join_handle, shutdown_tx));
272
273 let hummock_compaction_event_dispatcher = HummockCompactionEventDispatcher::new(
274 hummock_manager.env.opts.clone(),
275 hummock_compaction_event_handler,
276 Some(event_tx),
277 );
278
279 let event_loop = HummockCompactionEventLoop::new(
280 hummock_compaction_event_dispatcher,
281 hummock_manager.metrics.clone(),
282 compactor_streams_change_rx,
283 );
284
285 let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
286 join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
287
288 join_handle_vec
289 }
290
291 pub fn add_compactor_stream(
292 &self,
293 context_id: HummockContextId,
294 req_stream: Streaming<SubscribeCompactionEventRequest>,
295 ) {
296 self.compactor_streams_change_tx
297 .send((context_id, req_stream))
298 .unwrap();
299 }
300
301 pub async fn auto_pick_compaction_group_and_type(
302 &self,
303 ) -> Option<(CompactionGroupId, compact_task::TaskType)> {
304 let mut compaction_group_ids = self.compaction_group_ids().await;
305 compaction_group_ids.shuffle(&mut thread_rng());
306
307 for cg_id in compaction_group_ids {
308 if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) {
309 return Some((cg_id, pick_type));
310 }
311 }
312
313 None
314 }
315
316 async fn auto_pick_compaction_groups_and_type(
319 &self,
320 ) -> (Vec<CompactionGroupId>, compact_task::TaskType) {
321 let mut compaction_group_ids = self.compaction_group_ids().await;
322 compaction_group_ids.shuffle(&mut thread_rng());
323
324 let mut normal_groups = vec![];
325 for cg_id in compaction_group_ids {
326 if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) {
327 if pick_type == TaskType::Dynamic {
328 normal_groups.push(cg_id);
329 } else if normal_groups.is_empty() {
330 return (vec![cg_id], pick_type);
331 }
332 }
333 }
334 (normal_groups, TaskType::Dynamic)
335 }
336}
337
338impl HummockManager {
339 pub async fn get_compact_tasks_impl(
340 &self,
341 compaction_groups: Vec<CompactionGroupId>,
342 max_select_count: usize,
343 selector: &mut Box<dyn CompactionSelector>,
344 ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
345 let deterministic_mode = self.env.opts.compaction_deterministic_test;
346
347 let mut compaction_guard = self.compaction.write().await;
348 let compaction: &mut Compaction = &mut compaction_guard;
349 let mut versioning_guard = self.versioning.write().await;
350 let versioning: &mut Versioning = &mut versioning_guard;
351
352 let _timer = start_measure_real_process_timer!(self, "get_compact_tasks_impl");
353
354 let start_time = Instant::now();
355 let mut compaction_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
356
357 let mut compact_task_assignment =
358 BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
359
360 let mut version = HummockVersionTransaction::new(
361 &mut versioning.current_version,
362 &mut versioning.hummock_version_deltas,
363 self.env.notification_manager(),
364 None,
365 &self.metrics,
366 );
367 let mut version_stats = HummockVersionStatsTransaction::new(
369 &mut versioning.version_stats,
370 self.env.notification_manager(),
371 );
372
373 if deterministic_mode {
374 version.disable_apply_to_txn();
375 }
376 let all_versioned_table_schemas = if self.env.opts.enable_dropped_column_reclaim {
377 self.metadata_manager
378 .catalog_controller
379 .get_versioned_table_schemas()
380 .await
381 .map_err(|e| Error::Internal(e.into()))?
382 } else {
383 HashMap::default()
384 };
385 let mut unschedule_groups = vec![];
386 let mut trivial_tasks = vec![];
387 let mut pick_tasks = vec![];
388 let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts(
389 &self.env.opts,
390 ));
391 'outside: for compaction_group_id in compaction_groups {
392 if pick_tasks.len() >= max_select_count {
393 break;
394 }
395
396 if !version
397 .latest_version()
398 .levels
399 .contains_key(&compaction_group_id)
400 {
401 continue;
402 }
403
404 let group_config = {
408 let config_manager = self.compaction_group_manager.read().await;
409
410 match config_manager.try_get_compaction_group_config(compaction_group_id) {
411 Some(config) => config,
412 None => continue,
413 }
414 };
415
416 let task_id = next_compaction_task_id(&self.env).await?;
418
419 if !compaction_statuses.contains_key(&compaction_group_id) {
420 compaction_statuses.insert(
422 compaction_group_id,
423 CompactStatus::new(
424 compaction_group_id,
425 group_config.compaction_config.max_level,
426 ),
427 );
428 }
429 let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap();
430
431 let can_trivial_move = matches!(selector.task_type(), TaskType::Dynamic)
432 || matches!(selector.task_type(), TaskType::Emergency);
433
434 let mut stats = LocalSelectorStatistic::default();
435 let member_table_ids: Vec<_> = version
436 .latest_version()
437 .state_table_info
438 .compaction_group_member_table_ids(compaction_group_id)
439 .iter()
440 .copied()
441 .collect();
442
443 let mut table_id_to_option: HashMap<TableId, _> = HashMap::default();
444
445 {
446 let guard = self.table_id_to_table_option.read();
447 for table_id in &member_table_ids {
448 if let Some(opts) = guard.get(table_id) {
449 table_id_to_option.insert(*table_id, *opts);
450 }
451 }
452 }
453
454 while let Some(compact_task) = compact_status.get_compact_task(
455 version
456 .latest_version()
457 .get_compaction_group_levels(compaction_group_id),
458 version
459 .latest_version()
460 .state_table_info
461 .compaction_group_member_table_ids(compaction_group_id),
462 task_id as HummockCompactionTaskId,
463 &group_config,
464 &mut stats,
465 selector,
466 &table_id_to_option,
467 developer_config.clone(),
468 &version.latest_version().table_watermarks,
469 &version.latest_version().state_table_info,
470 ) {
471 let target_level_id = compact_task.input.target_level as u32;
472 let compaction_group_version_id = version
473 .latest_version()
474 .get_compaction_group_levels(compaction_group_id)
475 .compaction_group_version_id;
476 let compression_algorithm = match compact_task.compression_algorithm.as_str() {
477 "Lz4" => 1,
478 "Zstd" => 2,
479 _ => 0,
480 };
481 let vnode_partition_count = compact_task.input.vnode_partition_count;
482 let mut compact_task = CompactTask {
483 input_ssts: compact_task.input.input_levels,
484 splits: vec![KeyRange::inf()],
485 sorted_output_ssts: vec![],
486 task_id,
487 target_level: target_level_id,
488 gc_delete_keys: version
491 .latest_version()
492 .get_compaction_group_levels(compaction_group_id)
493 .is_last_level(target_level_id),
494 base_level: compact_task.base_level as u32,
495 task_status: TaskStatus::Pending,
496 compaction_group_id: group_config.group_id,
497 compaction_group_version_id,
498 existing_table_ids: member_table_ids.clone(),
499 compression_algorithm,
500 target_file_size: compact_task.target_file_size,
501 table_options: table_id_to_option
502 .iter()
503 .map(|(table_id, table_option)| {
504 (*table_id, TableOption::from(table_option))
505 })
506 .collect(),
507 current_epoch_time: Epoch::now().0,
508 compaction_filter_mask: group_config.compaction_config.compaction_filter_mask,
509 target_sub_level_id: compact_task.input.target_sub_level_id,
510 task_type: compact_task.compaction_task_type,
511 split_weight_by_vnode: vnode_partition_count,
512 max_sub_compaction: group_config.compaction_config.max_sub_compaction,
513 max_kv_count_for_xor16: group_config.compaction_config.max_kv_count_for_xor16,
514 ..Default::default()
515 };
516
517 let is_trivial_reclaim = compact_task.is_trivial_reclaim();
518 let is_trivial_move = compact_task.is_trivial_move_task();
519 if is_trivial_reclaim || (is_trivial_move && can_trivial_move) {
520 let log_label = if is_trivial_reclaim {
521 "TrivialReclaim"
522 } else {
523 "TrivialMove"
524 };
525 let label = if is_trivial_reclaim {
526 "trivial-space-reclaim"
527 } else {
528 "trivial-move"
529 };
530
531 tracing::debug!(
532 "{} for compaction group {}: input: {:?}, cost time: {:?}",
533 log_label,
534 compact_task.compaction_group_id,
535 compact_task.input_ssts,
536 start_time.elapsed()
537 );
538 compact_task.task_status = TaskStatus::Success;
539 compact_status.report_compact_task(&compact_task);
540 if !is_trivial_reclaim {
541 compact_task
542 .sorted_output_ssts
543 .clone_from(&compact_task.input_ssts[0].table_infos);
544 }
545 update_table_stats_for_vnode_watermark_trivial_reclaim(
546 &mut version_stats.table_stats,
547 &compact_task,
548 );
549 self.metrics
550 .compact_frequency
551 .with_label_values(&[
552 label,
553 &compact_task.compaction_group_id.to_string(),
554 selector.task_type().as_str_name(),
555 "SUCCESS",
556 ])
557 .inc();
558
559 version.apply_compact_task(&compact_task);
560 trivial_tasks.push(compact_task);
561 if trivial_tasks.len() >= self.env.opts.max_trivial_move_task_count_per_loop {
562 break 'outside;
563 }
564 } else {
565 self.calculate_vnode_partition(
566 &mut compact_task,
567 group_config.compaction_config.as_ref(),
568 version
569 .latest_version()
570 .get_compaction_group_levels(compaction_group_id),
571 )
572 .await?;
573
574 let table_ids_to_be_compacted = compact_task.build_compact_table_ids();
575
576 let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks) = version
577 .latest_version()
578 .safe_epoch_table_watermarks(&table_ids_to_be_compacted)
579 .into_iter()
580 .partition(|(_table_id, table_watermarke)| {
581 matches!(
582 table_watermarke.watermark_type,
583 WatermarkSerdeType::PkPrefix
584 )
585 });
586
587 compact_task.pk_prefix_table_watermarks = pk_prefix_table_watermarks;
588 compact_task.non_pk_prefix_table_watermarks = non_pk_prefix_table_watermarks;
589
590 compact_task.table_schemas = compact_task
591 .existing_table_ids
592 .iter()
593 .filter_map(|table_id| {
594 all_versioned_table_schemas.get(table_id).map(|column_ids| {
595 (
596 *table_id,
597 TableSchema {
598 column_ids: column_ids.clone(),
599 },
600 )
601 })
602 })
603 .collect();
604
605 compact_task_assignment.insert(
606 compact_task.task_id,
607 CompactTaskAssignment {
608 compact_task: Some(compact_task.clone().into()),
609 context_id: META_NODE_ID, },
611 );
612
613 pick_tasks.push(compact_task);
614 break;
615 }
616
617 stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
618 stats = LocalSelectorStatistic::default();
619 }
620 if pick_tasks
621 .last()
622 .map(|task| task.compaction_group_id != compaction_group_id)
623 .unwrap_or(true)
624 {
625 unschedule_groups.push(compaction_group_id);
626 }
627 stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
628 }
629
630 if !trivial_tasks.is_empty() {
631 commit_multi_var!(
632 self.meta_store_ref(),
633 compaction_statuses,
634 compact_task_assignment,
635 version,
636 version_stats
637 )?;
638 self.metrics
639 .compact_task_batch_count
640 .with_label_values(&["batch_trivial_move"])
641 .observe(trivial_tasks.len() as f64);
642
643 for trivial_task in &trivial_tasks {
644 self.metrics
645 .compact_task_trivial_move_sst_count
646 .with_label_values(&[&trivial_task.compaction_group_id.to_string()])
647 .observe(trivial_task.input_ssts[0].table_infos.len() as _);
648 }
649
650 drop(versioning_guard);
651 } else {
652 drop(versioning_guard);
655 commit_multi_var!(
656 self.meta_store_ref(),
657 compaction_statuses,
658 compact_task_assignment
659 )?;
660 }
661 drop(compaction_guard);
662 if !pick_tasks.is_empty() {
663 self.metrics
664 .compact_task_batch_count
665 .with_label_values(&["batch_get_compact_task"])
666 .observe(pick_tasks.len() as f64);
667 }
668
669 for compact_task in &mut pick_tasks {
670 let compaction_group_id = compact_task.compaction_group_id;
671
672 self.compactor_manager
674 .initiate_task_heartbeat(compact_task.clone());
675
676 compact_task.task_status = TaskStatus::Pending;
678 let compact_task_statistics = statistics_compact_task(compact_task);
679
680 let level_type_label = build_compact_task_level_type_metrics_label(
681 compact_task.input_ssts[0].level_idx as usize,
682 compact_task.input_ssts.last().unwrap().level_idx as usize,
683 );
684
685 let level_count = compact_task.input_ssts.len();
686 if compact_task.input_ssts[0].level_idx == 0 {
687 self.metrics
688 .l0_compact_level_count
689 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
690 .observe(level_count as _);
691 }
692
693 self.metrics
694 .compact_task_size
695 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
696 .observe(compact_task_statistics.total_file_size as _);
697
698 self.metrics
699 .compact_task_size
700 .with_label_values(&[
701 &compaction_group_id.to_string(),
702 &format!("{} uncompressed", level_type_label),
703 ])
704 .observe(compact_task_statistics.total_uncompressed_file_size as _);
705
706 self.metrics
707 .compact_task_file_count
708 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
709 .observe(compact_task_statistics.total_file_count as _);
710
711 tracing::trace!(
712 "For compaction group {}: pick up {} {} sub_level in level {} to compact to target {}. cost time: {:?} compact_task_statistics {:?}",
713 compaction_group_id,
714 level_count,
715 compact_task.input_ssts[0].level_type.as_str_name(),
716 compact_task.input_ssts[0].level_idx,
717 compact_task.target_level,
718 start_time.elapsed(),
719 compact_task_statistics
720 );
721 }
722
723 #[cfg(test)]
724 {
725 self.check_state_consistency().await;
726 }
727 pick_tasks.extend(trivial_tasks);
728 Ok((pick_tasks, unschedule_groups))
729 }
730
731 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
733 fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore(
734 anyhow::anyhow!("failpoint metastore err")
735 )));
736 let ret = self
737 .cancel_compact_task_impl(vec![task_id], task_status)
738 .await?;
739 Ok(ret[0])
740 }
741
742 pub async fn cancel_compact_tasks(
743 &self,
744 tasks: Vec<u64>,
745 task_status: TaskStatus,
746 ) -> Result<Vec<bool>> {
747 self.cancel_compact_task_impl(tasks, task_status).await
748 }
749
750 async fn cancel_compact_task_impl(
751 &self,
752 task_ids: Vec<u64>,
753 task_status: TaskStatus,
754 ) -> Result<Vec<bool>> {
755 assert!(CANCEL_STATUS_SET.contains(&task_status));
756 let tasks = task_ids
757 .into_iter()
758 .map(|task_id| ReportTask {
759 task_id,
760 task_status,
761 sorted_output_ssts: vec![],
762 table_stats_change: HashMap::default(),
763 object_timestamps: HashMap::default(),
764 })
765 .collect_vec();
766 let rets = self.report_compact_tasks(tasks).await?;
767 #[cfg(test)]
768 {
769 self.check_state_consistency().await;
770 }
771 Ok(rets)
772 }
773
774 async fn get_compact_tasks(
775 &self,
776 mut compaction_groups: Vec<CompactionGroupId>,
777 max_select_count: usize,
778 selector: &mut Box<dyn CompactionSelector>,
779 ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
780 fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
781 anyhow::anyhow!("failpoint metastore error")
782 )));
783 compaction_groups.shuffle(&mut thread_rng());
784 let (mut tasks, groups) = self
785 .get_compact_tasks_impl(compaction_groups, max_select_count, selector)
786 .await?;
787 tasks.retain(|task| {
788 if task.task_status == TaskStatus::Success {
789 debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
790 false
791 } else {
792 true
793 }
794 });
795 Ok((tasks, groups))
796 }
797
798 pub async fn get_compact_task(
799 &self,
800 compaction_group_id: CompactionGroupId,
801 selector: &mut Box<dyn CompactionSelector>,
802 ) -> Result<Option<CompactTask>> {
803 fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
804 anyhow::anyhow!("failpoint metastore error")
805 )));
806
807 let (normal_tasks, _) = self
808 .get_compact_tasks_impl(vec![compaction_group_id], 1, selector)
809 .await?;
810 for task in normal_tasks {
811 if task.task_status != TaskStatus::Success {
812 return Ok(Some(task));
813 }
814 debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
815 }
816 Ok(None)
817 }
818
819 pub async fn manual_get_compact_task(
820 &self,
821 compaction_group_id: CompactionGroupId,
822 manual_compaction_option: ManualCompactionOption,
823 ) -> Result<Option<CompactTask>> {
824 let mut selector: Box<dyn CompactionSelector> =
825 Box::new(ManualCompactionSelector::new(manual_compaction_option));
826 self.get_compact_task(compaction_group_id, &mut selector)
827 .await
828 }
829
830 pub async fn report_compact_task(
831 &self,
832 task_id: u64,
833 task_status: TaskStatus,
834 sorted_output_ssts: Vec<SstableInfo>,
835 table_stats_change: Option<PbTableStatsMap>,
836 object_timestamps: HashMap<HummockSstableObjectId, u64>,
837 ) -> Result<bool> {
838 let rets = self
839 .report_compact_tasks(vec![ReportTask {
840 task_id,
841 task_status,
842 sorted_output_ssts,
843 table_stats_change: table_stats_change.unwrap_or_default(),
844 object_timestamps,
845 }])
846 .await?;
847 Ok(rets[0])
848 }
849
850 pub async fn report_compact_tasks(&self, report_tasks: Vec<ReportTask>) -> Result<Vec<bool>> {
851 let compaction_guard = self.compaction.write().await;
852 let versioning_guard = self.versioning.write().await;
853
854 self.report_compact_tasks_impl(report_tasks, compaction_guard, versioning_guard)
855 .await
856 }
857
858 pub async fn report_compact_tasks_impl(
866 &self,
867 report_tasks: Vec<ReportTask>,
868 mut compaction_guard: RwLockWriteGuard<'_, Compaction>,
869 mut versioning_guard: RwLockWriteGuard<'_, Versioning>,
870 ) -> Result<Vec<bool>> {
871 let deterministic_mode = self.env.opts.compaction_deterministic_test;
872 let compaction: &mut Compaction = &mut compaction_guard;
873 let start_time = Instant::now();
874 let original_keys = compaction.compaction_statuses.keys().cloned().collect_vec();
875 let mut compact_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
876 let mut rets = vec![false; report_tasks.len()];
877 let mut compact_task_assignment =
878 BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
879 let versioning: &mut Versioning = &mut versioning_guard;
881 let _timer = start_measure_real_process_timer!(self, "report_compact_tasks");
882
883 for group_id in original_keys {
885 if !versioning.current_version.levels.contains_key(&group_id) {
886 compact_statuses.remove(group_id);
887 }
888 }
889 let mut tasks = vec![];
890
891 let mut version = HummockVersionTransaction::new(
892 &mut versioning.current_version,
893 &mut versioning.hummock_version_deltas,
894 self.env.notification_manager(),
895 None,
896 &self.metrics,
897 );
898
899 if deterministic_mode {
900 version.disable_apply_to_txn();
901 }
902
903 let mut version_stats = HummockVersionStatsTransaction::new(
904 &mut versioning.version_stats,
905 self.env.notification_manager(),
906 );
907 let mut success_count = 0;
908 for (idx, task) in report_tasks.into_iter().enumerate() {
909 rets[idx] = true;
910 let mut compact_task = match compact_task_assignment.remove(task.task_id) {
911 Some(compact_task) => CompactTask::from(compact_task.compact_task.unwrap()),
912 None => {
913 tracing::warn!("{}", format!("compact task {} not found", task.task_id));
914 rets[idx] = false;
915 continue;
916 }
917 };
918
919 {
920 compact_task.task_status = task.task_status;
922 compact_task.sorted_output_ssts = task.sorted_output_ssts;
923 }
924
925 match compact_statuses.get_mut(compact_task.compaction_group_id) {
926 Some(mut compact_status) => {
927 compact_status.report_compact_task(&compact_task);
928 }
929 None => {
930 compact_task.task_status = TaskStatus::InvalidGroupCanceled;
936 }
937 }
938
939 let is_success = if let TaskStatus::Success = compact_task.task_status {
940 match self
941 .report_compaction_sanity_check(&task.object_timestamps)
942 .await
943 {
944 Err(e) => {
945 warn!(
946 "failed to commit compaction task {} {}",
947 compact_task.task_id,
948 e.as_report()
949 );
950 compact_task.task_status = TaskStatus::RetentionTimeRejected;
951 false
952 }
953 _ => {
954 let group = version
955 .latest_version()
956 .levels
957 .get(&compact_task.compaction_group_id)
958 .unwrap();
959 let is_expired = compact_task.is_expired(group.compaction_group_version_id);
960 if is_expired {
961 compact_task.task_status = TaskStatus::InputOutdatedCanceled;
962 warn!(
963 "The task may be expired because of group split, task:\n {:?}",
964 compact_task_to_string(&compact_task)
965 );
966 }
967 !is_expired
968 }
969 }
970 } else {
971 false
972 };
973 if is_success {
974 success_count += 1;
975 version.apply_compact_task(&compact_task);
976 if purge_prost_table_stats(&mut version_stats.table_stats, version.latest_version())
977 {
978 self.metrics.version_stats.reset();
979 versioning.local_metrics.clear();
980 }
981 add_prost_table_stats_map(&mut version_stats.table_stats, &task.table_stats_change);
982 trigger_local_table_stat(
983 &self.metrics,
984 &mut versioning.local_metrics,
985 &version_stats,
986 &task.table_stats_change,
987 );
988 }
989 tasks.push(compact_task);
990 }
991 if success_count > 0 {
992 commit_multi_var!(
993 self.meta_store_ref(),
994 compact_statuses,
995 compact_task_assignment,
996 version,
997 version_stats
998 )?;
999
1000 self.metrics
1001 .compact_task_batch_count
1002 .with_label_values(&["batch_report_task"])
1003 .observe(success_count as f64);
1004 } else {
1005 commit_multi_var!(
1007 self.meta_store_ref(),
1008 compact_statuses,
1009 compact_task_assignment
1010 )?;
1011 }
1012
1013 let mut success_groups = vec![];
1014 for compact_task in &tasks {
1015 self.compactor_manager
1016 .remove_task_heartbeat(compact_task.task_id);
1017 tracing::trace!(
1018 "Reported compaction task. {}. cost time: {:?}",
1019 compact_task_to_string(compact_task),
1020 start_time.elapsed(),
1021 );
1022
1023 if !deterministic_mode
1024 && (matches!(compact_task.task_type, compact_task::TaskType::Dynamic)
1025 || matches!(compact_task.task_type, compact_task::TaskType::Emergency))
1026 {
1027 self.try_send_compaction_request(
1029 compact_task.compaction_group_id,
1030 compact_task::TaskType::Dynamic,
1031 );
1032 }
1033
1034 if compact_task.task_status == TaskStatus::Success {
1035 success_groups.push(compact_task.compaction_group_id);
1036 }
1037 }
1038
1039 trigger_compact_tasks_stat(
1040 &self.metrics,
1041 &tasks,
1042 &compaction.compaction_statuses,
1043 &versioning_guard.current_version,
1044 );
1045 drop(versioning_guard);
1046 if !success_groups.is_empty() {
1047 self.try_update_write_limits(&success_groups).await;
1048 }
1049 Ok(rets)
1050 }
1051
1052 pub async fn trigger_compaction_deterministic(
1055 &self,
1056 _base_version_id: HummockVersionId,
1057 compaction_groups: Vec<CompactionGroupId>,
1058 ) -> Result<()> {
1059 self.on_current_version(|old_version| {
1060 tracing::info!(
1061 "Trigger compaction for version {}, groups {:?}",
1062 old_version.id,
1063 compaction_groups
1064 );
1065 })
1066 .await;
1067
1068 if compaction_groups.is_empty() {
1069 return Ok(());
1070 }
1071 for compaction_group in compaction_groups {
1072 self.try_send_compaction_request(compaction_group, compact_task::TaskType::Dynamic);
1073 }
1074 Ok(())
1075 }
1076
1077 pub async fn trigger_manual_compaction(
1078 &self,
1079 compaction_group: CompactionGroupId,
1080 manual_compaction_option: ManualCompactionOption,
1081 ) -> Result<()> {
1082 let start_time = Instant::now();
1083
1084 let compactor = match self.compactor_manager.next_compactor() {
1086 Some(compactor) => compactor,
1087 None => {
1088 tracing::warn!("trigger_manual_compaction No compactor is available.");
1089 return Err(anyhow::anyhow!(
1090 "trigger_manual_compaction No compactor is available. compaction_group {}",
1091 compaction_group
1092 )
1093 .into());
1094 }
1095 };
1096
1097 let compact_task = self
1099 .manual_get_compact_task(compaction_group, manual_compaction_option)
1100 .await;
1101 let compact_task = match compact_task {
1102 Ok(Some(compact_task)) => compact_task,
1103 Ok(None) => {
1104 return Err(anyhow::anyhow!(
1106 "trigger_manual_compaction No compaction_task is available. compaction_group {}",
1107 compaction_group
1108 )
1109 .into());
1110 }
1111 Err(err) => {
1112 tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
1113
1114 return Err(anyhow::anyhow!(err)
1115 .context(format!(
1116 "Failed to get compaction task for compaction_group {}",
1117 compaction_group,
1118 ))
1119 .into());
1120 }
1121 };
1122
1123 let compact_task_string = compact_task_to_string(&compact_task);
1125 compactor
1127 .send_event(ResponseEvent::CompactTask(compact_task.into()))
1128 .with_context(|| {
1129 format!(
1130 "Failed to trigger compaction task for compaction_group {}",
1131 compaction_group,
1132 )
1133 })?;
1134
1135 tracing::info!(
1136 "Trigger manual compaction task. {}. cost time: {:?}",
1137 &compact_task_string,
1138 start_time.elapsed(),
1139 );
1140
1141 Ok(())
1142 }
1143
1144 pub fn try_send_compaction_request(
1146 &self,
1147 compaction_group: CompactionGroupId,
1148 task_type: compact_task::TaskType,
1149 ) -> bool {
1150 match self
1151 .compaction_state
1152 .try_sched_compaction(compaction_group, task_type)
1153 {
1154 Ok(_) => true,
1155 Err(e) => {
1156 tracing::error!(
1157 error = %e.as_report(),
1158 "failed to send compaction request for compaction group {}",
1159 compaction_group,
1160 );
1161 false
1162 }
1163 }
1164 }
1165
1166 async fn try_apply_vnode_aligned_partition(
1169 &self,
1170 compact_task: &mut CompactTask,
1171 compaction_config: &CompactionConfig,
1172 levels: &Levels,
1173 ) -> Result<bool> {
1174 let Some(threshold) = compaction_config.vnode_aligned_level_size_threshold else {
1177 return Ok(false);
1178 };
1179
1180 if compact_task.target_level < compact_task.base_level
1181 || compact_task.existing_table_ids.len() != 1
1182 {
1183 return Ok(false);
1184 }
1185
1186 let target_level_size = levels
1188 .get_level(compact_task.target_level as usize)
1189 .total_file_size;
1190
1191 if target_level_size < threshold {
1192 return Ok(false);
1193 }
1194
1195 let table_id = compact_task.existing_table_ids[0];
1197
1198 let table = self
1200 .metadata_manager
1201 .get_table_catalog_by_ids(&[table_id])
1202 .await
1203 .with_context(|| {
1204 format!(
1205 "Failed to get table catalog for table_id {} in compaction_group {}",
1206 table_id, compact_task.compaction_group_id
1207 )
1208 })
1209 .map_err(Error::Internal)?
1210 .into_iter()
1211 .next()
1212 .ok_or_else(|| {
1213 Error::Internal(anyhow::anyhow!(
1214 "Table catalog not found for table_id {} in compaction_group {}",
1215 table_id,
1216 compact_task.compaction_group_id
1217 ))
1218 })?;
1219
1220 compact_task
1221 .table_vnode_partition
1222 .insert(table_id, table.vnode_count() as u32);
1223
1224 Ok(true)
1225 }
1226
1227 fn apply_split_weight_by_vnode_partition(
1230 &self,
1231 compact_task: &mut CompactTask,
1232 compaction_config: &CompactionConfig,
1233 ) {
1234 if compaction_config.split_weight_by_vnode > 0 {
1235 for table_id in &compact_task.existing_table_ids {
1236 compact_task
1237 .table_vnode_partition
1238 .insert(*table_id, compact_task.split_weight_by_vnode);
1239 }
1240
1241 return;
1242 }
1243
1244 let mut table_size_info: HashMap<TableId, u64> = HashMap::default();
1246 let mut existing_table_ids: HashSet<TableId> = HashSet::default();
1247 for input_ssts in &compact_task.input_ssts {
1248 for sst in &input_ssts.table_infos {
1249 existing_table_ids.extend(sst.table_ids.iter());
1250 for table_id in &sst.table_ids {
1251 *table_size_info.entry(*table_id).or_default() +=
1252 sst.sst_size / (sst.table_ids.len() as u64);
1253 }
1254 }
1255 }
1256 compact_task
1257 .existing_table_ids
1258 .retain(|table_id| existing_table_ids.contains(table_id));
1259
1260 let hybrid_vnode_count = self.env.opts.hybrid_partition_node_count;
1261 let default_partition_count = self.env.opts.partition_vnode_count;
1262 let compact_task_table_size_partition_threshold_low = self
1263 .env
1264 .opts
1265 .compact_task_table_size_partition_threshold_low;
1266 let compact_task_table_size_partition_threshold_high = self
1267 .env
1268 .opts
1269 .compact_task_table_size_partition_threshold_high;
1270
1271 let table_write_throughput_statistic_manager =
1273 self.table_write_throughput_statistic_manager.read();
1274 let timestamp = chrono::Utc::now().timestamp();
1275
1276 for (table_id, compact_table_size) in table_size_info {
1277 let write_throughput = table_write_throughput_statistic_manager
1278 .get_table_throughput_descending(table_id, timestamp)
1279 .peekable()
1280 .peek()
1281 .map(|item| item.throughput)
1282 .unwrap_or(0);
1283
1284 if compact_table_size > compact_task_table_size_partition_threshold_high
1285 && default_partition_count > 0
1286 {
1287 compact_task
1288 .table_vnode_partition
1289 .insert(table_id, default_partition_count);
1290 } else if (compact_table_size > compact_task_table_size_partition_threshold_low
1291 || (write_throughput > self.env.opts.table_high_write_throughput_threshold
1292 && compact_table_size > compaction_config.target_file_size_base))
1293 && hybrid_vnode_count > 0
1294 {
1295 compact_task
1296 .table_vnode_partition
1297 .insert(table_id, hybrid_vnode_count);
1298 } else if compact_table_size > compaction_config.target_file_size_base {
1299 compact_task.table_vnode_partition.insert(table_id, 1);
1300 }
1301 }
1302
1303 compact_task
1304 .table_vnode_partition
1305 .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id));
1306 }
1307
1308 pub(crate) async fn calculate_vnode_partition(
1309 &self,
1310 compact_task: &mut CompactTask,
1311 compaction_config: &CompactionConfig,
1312 levels: &Levels,
1313 ) -> Result<()> {
1314 if self
1316 .try_apply_vnode_aligned_partition(compact_task, compaction_config, levels)
1317 .await?
1318 {
1319 return Ok(());
1320 }
1321
1322 if compact_task.target_level > compact_task.base_level {
1327 return Ok(());
1328 }
1329
1330 self.apply_split_weight_by_vnode_partition(compact_task, compaction_config);
1332
1333 Ok(())
1334 }
1335
1336 pub fn compactor_manager_ref(&self) -> crate::hummock::CompactorManagerRef {
1337 self.compactor_manager.clone()
1338 }
1339}
1340
1341#[cfg(any(test, feature = "test"))]
1342impl HummockManager {
1343 pub async fn compaction_task_from_assignment_for_test(
1344 &self,
1345 task_id: u64,
1346 ) -> Option<CompactTaskAssignment> {
1347 let compaction_guard = self.compaction.read().await;
1348 let assignment_ref = &compaction_guard.compact_task_assignment;
1349 assignment_ref.get(&task_id).cloned()
1350 }
1351
1352 pub async fn report_compact_task_for_test(
1353 &self,
1354 task_id: u64,
1355 compact_task: Option<CompactTask>,
1356 task_status: TaskStatus,
1357 sorted_output_ssts: Vec<SstableInfo>,
1358 table_stats_change: Option<PbTableStatsMap>,
1359 ) -> Result<()> {
1360 if let Some(task) = compact_task {
1361 let mut guard = self.compaction.write().await;
1362 guard.compact_task_assignment.insert(
1363 task_id,
1364 CompactTaskAssignment {
1365 compact_task: Some(task.into()),
1366 context_id: 0.into(),
1367 },
1368 );
1369 }
1370
1371 self.report_compact_tasks(vec![ReportTask {
1374 task_id,
1375 task_status,
1376 sorted_output_ssts,
1377 table_stats_change: table_stats_change.unwrap_or_default(),
1378 object_timestamps: HashMap::default(),
1379 }])
1380 .await?;
1381 Ok(())
1382 }
1383}
1384
1385#[derive(Debug, Default)]
1386pub struct CompactionState {
1387 scheduled: Mutex<HashSet<(CompactionGroupId, compact_task::TaskType)>>,
1388}
1389
1390impl CompactionState {
1391 pub fn new() -> Self {
1392 Self {
1393 scheduled: Default::default(),
1394 }
1395 }
1396
1397 pub fn try_sched_compaction(
1399 &self,
1400 compaction_group: CompactionGroupId,
1401 task_type: TaskType,
1402 ) -> std::result::Result<bool, SendError<CompactionRequestChannelItem>> {
1403 let mut guard = self.scheduled.lock();
1404 let key = (compaction_group, task_type);
1405 if guard.contains(&key) {
1406 return Ok(false);
1407 }
1408 guard.insert(key);
1409 Ok(true)
1410 }
1411
1412 pub fn unschedule(
1413 &self,
1414 compaction_group: CompactionGroupId,
1415 task_type: compact_task::TaskType,
1416 ) {
1417 self.scheduled.lock().remove(&(compaction_group, task_type));
1418 }
1419
1420 pub fn auto_pick_type(&self, group: CompactionGroupId) -> Option<TaskType> {
1421 let guard = self.scheduled.lock();
1422 if guard.contains(&(group, compact_task::TaskType::Dynamic)) {
1423 Some(compact_task::TaskType::Dynamic)
1424 } else if guard.contains(&(group, compact_task::TaskType::SpaceReclaim)) {
1425 Some(compact_task::TaskType::SpaceReclaim)
1426 } else if guard.contains(&(group, compact_task::TaskType::Ttl)) {
1427 Some(compact_task::TaskType::Ttl)
1428 } else if guard.contains(&(group, compact_task::TaskType::Tombstone)) {
1429 Some(compact_task::TaskType::Tombstone)
1430 } else if guard.contains(&(group, compact_task::TaskType::VnodeWatermark)) {
1431 Some(compact_task::TaskType::VnodeWatermark)
1432 } else {
1433 None
1434 }
1435 }
1436}
1437
1438impl Compaction {
1439 pub fn get_compact_task_assignments_by_group_id(
1440 &self,
1441 compaction_group_id: CompactionGroupId,
1442 ) -> Vec<CompactTaskAssignment> {
1443 self.compact_task_assignment
1444 .iter()
1445 .filter_map(|(_, assignment)| {
1446 if assignment
1447 .compact_task
1448 .as_ref()
1449 .is_some_and(|task| task.compaction_group_id == compaction_group_id)
1450 {
1451 Some(CompactTaskAssignment {
1452 compact_task: assignment.compact_task.clone(),
1453 context_id: assignment.context_id,
1454 })
1455 } else {
1456 None
1457 }
1458 })
1459 .collect()
1460 }
1461}
1462
1463#[derive(Clone, Default)]
1464pub struct CompactionGroupStatistic {
1465 pub group_id: CompactionGroupId,
1466 pub group_size: u64,
1467 pub table_statistic: BTreeMap<StateTableId, u64>,
1468 pub compaction_group_config: CompactionGroup,
1469}
1470
1471fn update_table_stats_for_vnode_watermark_trivial_reclaim(
1473 table_stats: &mut PbTableStatsMap,
1474 task: &CompactTask,
1475) {
1476 if task.task_type != TaskType::VnodeWatermark {
1477 return;
1478 }
1479 let mut deleted_table_keys: HashMap<TableId, u64> = HashMap::default();
1480 for s in task.input_ssts.iter().flat_map(|l| l.table_infos.iter()) {
1481 assert_eq!(s.table_ids.len(), 1);
1482 let e = deleted_table_keys.entry(s.table_ids[0]).or_insert(0);
1483 *e += s.total_key_count;
1484 }
1485 for (table_id, delete_count) in deleted_table_keys {
1486 let Some(stats) = table_stats.get_mut(&table_id) else {
1487 continue;
1488 };
1489 if stats.total_key_count == 0 {
1490 continue;
1491 }
1492 let new_total_key_count = stats.total_key_count.saturating_sub(delete_count as i64);
1493 let ratio = new_total_key_count as f64 / stats.total_key_count as f64;
1494 stats.total_key_count = new_total_key_count;
1496 stats.total_key_size = (stats.total_key_size as f64 * ratio).ceil() as i64;
1498 stats.total_value_size = (stats.total_value_size as f64 * ratio).ceil() as i64;
1499 }
1500}
1501
1502#[derive(Debug, Clone)]
1503pub enum GroupState {
1504 Normal,
1506
1507 Emergency(String), WriteStop(String), }
1513
1514impl GroupState {
1515 pub fn is_write_stop(&self) -> bool {
1516 matches!(self, Self::WriteStop(_))
1517 }
1518
1519 pub fn is_emergency(&self) -> bool {
1520 matches!(self, Self::Emergency(_))
1521 }
1522
1523 pub fn reason(&self) -> Option<&str> {
1524 match self {
1525 Self::Emergency(reason) | Self::WriteStop(reason) => Some(reason),
1526 _ => None,
1527 }
1528 }
1529}
1530
1531#[derive(Clone, Default)]
1532pub struct GroupStateValidator;
1533
1534impl GroupStateValidator {
1535 pub fn write_stop_sub_level_count(
1536 level_count: usize,
1537 compaction_config: &CompactionConfig,
1538 ) -> bool {
1539 let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize;
1540 level_count > threshold
1541 }
1542
1543 pub fn write_stop_l0_size(l0_size: u64, compaction_config: &CompactionConfig) -> bool {
1544 l0_size
1545 > compaction_config
1546 .level0_stop_write_threshold_max_size
1547 .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1548 }
1549
1550 pub fn write_stop_l0_file_count(
1551 l0_file_count: usize,
1552 compaction_config: &CompactionConfig,
1553 ) -> bool {
1554 l0_file_count
1555 > compaction_config
1556 .level0_stop_write_threshold_max_sst_count
1557 .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1558 as usize
1559 }
1560
1561 pub fn emergency_l0_file_count(
1562 l0_file_count: usize,
1563 compaction_config: &CompactionConfig,
1564 ) -> bool {
1565 l0_file_count
1566 > compaction_config
1567 .emergency_level0_sst_file_count
1568 .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1569 as usize
1570 }
1571
1572 pub fn emergency_l0_partition_count(
1573 last_l0_sub_level_partition_count: usize,
1574 compaction_config: &CompactionConfig,
1575 ) -> bool {
1576 last_l0_sub_level_partition_count
1577 > compaction_config
1578 .emergency_level0_sub_level_partition
1579 .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1580 as usize
1581 }
1582
1583 pub fn check_single_group_write_stop(
1584 levels: &Levels,
1585 compaction_config: &CompactionConfig,
1586 ) -> GroupState {
1587 if Self::write_stop_sub_level_count(levels.l0.sub_levels.len(), compaction_config) {
1588 return GroupState::WriteStop(format!(
1589 "WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels",
1590 levels.l0.sub_levels.len(),
1591 compaction_config.level0_stop_write_threshold_sub_level_number
1592 ));
1593 }
1594
1595 if Self::write_stop_l0_file_count(
1596 levels
1597 .l0
1598 .sub_levels
1599 .iter()
1600 .map(|l| l.table_infos.len())
1601 .sum(),
1602 compaction_config,
1603 ) {
1604 return GroupState::WriteStop(format!(
1605 "WriteStop(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1606 levels
1607 .l0
1608 .sub_levels
1609 .iter()
1610 .map(|l| l.table_infos.len())
1611 .sum::<usize>(),
1612 compaction_config
1613 .level0_stop_write_threshold_max_sst_count
1614 .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1615 ));
1616 }
1617
1618 if Self::write_stop_l0_size(levels.l0.total_file_size, compaction_config) {
1619 return GroupState::WriteStop(format!(
1620 "WriteStop(l0_size: {}, threshold: {}) too large L0 size",
1621 levels.l0.total_file_size,
1622 compaction_config
1623 .level0_stop_write_threshold_max_size
1624 .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1625 ));
1626 }
1627
1628 GroupState::Normal
1629 }
1630
1631 pub fn check_single_group_emergency(
1632 levels: &Levels,
1633 compaction_config: &CompactionConfig,
1634 ) -> GroupState {
1635 if Self::emergency_l0_file_count(
1636 levels
1637 .l0
1638 .sub_levels
1639 .iter()
1640 .map(|l| l.table_infos.len())
1641 .sum(),
1642 compaction_config,
1643 ) {
1644 return GroupState::Emergency(format!(
1645 "Emergency(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1646 levels
1647 .l0
1648 .sub_levels
1649 .iter()
1650 .map(|l| l.table_infos.len())
1651 .sum::<usize>(),
1652 compaction_config
1653 .emergency_level0_sst_file_count
1654 .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1655 ));
1656 }
1657
1658 if Self::emergency_l0_partition_count(
1659 levels
1660 .l0
1661 .sub_levels
1662 .first()
1663 .map(|l| l.table_infos.len())
1664 .unwrap_or(0),
1665 compaction_config,
1666 ) {
1667 return GroupState::Emergency(format!(
1668 "Emergency(l0_partition_count: {}, threshold: {}) too many L0 partitions",
1669 levels
1670 .l0
1671 .sub_levels
1672 .first()
1673 .map(|l| l.table_infos.len())
1674 .unwrap_or(0),
1675 compaction_config
1676 .emergency_level0_sub_level_partition
1677 .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1678 ));
1679 }
1680
1681 GroupState::Normal
1682 }
1683
1684 pub fn group_state(levels: &Levels, compaction_config: &CompactionConfig) -> GroupState {
1685 let state = Self::check_single_group_write_stop(levels, compaction_config);
1686 if state.is_write_stop() {
1687 return state;
1688 }
1689
1690 Self::check_single_group_emergency(levels, compaction_config)
1691 }
1692}