1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17use std::time::Instant;
18
19use anyhow::Context;
20use compaction_event_loop::{
21 HummockCompactionEventDispatcher, HummockCompactionEventHandler, HummockCompactionEventLoop,
22 HummockCompactorDedicatedEventLoop,
23};
24use fail::fail_point;
25use itertools::Itertools;
26use parking_lot::Mutex;
27use rand::rng as thread_rng;
28use rand::seq::SliceRandom;
29use risingwave_common::catalog::TableId;
30use risingwave_common::config::meta::default::compaction_config;
31use risingwave_common::hash::VnodeCountCompat;
32use risingwave_common::util::epoch::Epoch;
33use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask};
34use risingwave_hummock_sdk::compaction_group::StateTableId;
35use risingwave_hummock_sdk::key_range::KeyRange;
36use risingwave_hummock_sdk::level::Levels;
37use risingwave_hummock_sdk::sstable_info::SstableInfo;
38use risingwave_hummock_sdk::table_stats::{
39 PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats,
40};
41use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
42use risingwave_hummock_sdk::version::{GroupDelta, IntraLevelDelta};
43use risingwave_hummock_sdk::{
44 CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockSstableId,
45 HummockSstableObjectId, HummockVersionId, compact_task_to_string, statistics_compact_task,
46};
47use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
48use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
49use risingwave_pb::hummock::{
50 CompactTaskAssignment, CompactionConfig, PbCompactStatus, PbCompactTaskAssignment,
51 SubscribeCompactionEventRequest, TableOption, TableSchema, compact_task,
52};
53use thiserror_ext::AsReport;
54use tokio::sync::RwLockWriteGuard;
55use tokio::sync::mpsc::UnboundedReceiver;
56use tokio::sync::mpsc::error::SendError;
57use tokio::sync::oneshot::Sender;
58use tokio::task::JoinHandle;
59use tonic::Streaming;
60use tracing::warn;
61
62use crate::hummock::compaction::selector::level_selector::PickerInfo;
63use crate::hummock::compaction::selector::{
64 DynamicLevelSelector, DynamicLevelSelectorCore, LocalSelectorStatistic, ManualCompactionOption,
65 ManualCompactionSelector, SpaceReclaimCompactionSelector, TombstoneCompactionSelector,
66 TtlCompactionSelector, VnodeWatermarkCompactionSelector,
67};
68use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig, CompactionSelector};
69use crate::hummock::error::{Error, Result};
70use crate::hummock::manager::transaction::{
71 HummockVersionStatsTransaction, HummockVersionTransaction,
72};
73use crate::hummock::manager::versioning::Versioning;
74use crate::hummock::metrics_utils::{
75 build_compact_task_level_type_metrics_label, trigger_compact_tasks_stat,
76 trigger_local_table_stat,
77};
78use crate::hummock::model::CompactionGroup;
79use crate::hummock::sequence::next_compaction_task_id;
80use crate::hummock::{HummockManager, commit_multi_var, start_measure_real_process_timer};
81use crate::manager::META_NODE_ID;
82use crate::model::BTreeMapTransaction;
83
84pub mod compaction_event_loop;
85pub mod compaction_group_manager;
86pub mod compaction_group_schedule;
87
88static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
89 [
90 TaskStatus::ManualCanceled,
91 TaskStatus::SendFailCanceled,
92 TaskStatus::AssignFailCanceled,
93 TaskStatus::HeartbeatCanceled,
94 TaskStatus::InvalidGroupCanceled,
95 TaskStatus::NoAvailMemoryResourceCanceled,
96 TaskStatus::NoAvailCpuResourceCanceled,
97 TaskStatus::HeartbeatProgressCanceled,
98 ]
99 .into_iter()
100 .collect()
101});
102
103type CompactionRequestChannelItem = (CompactionGroupId, compact_task::TaskType);
104
105fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> {
106 let mut compaction_selectors: HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> =
107 HashMap::default();
108 compaction_selectors.insert(
109 compact_task::TaskType::Dynamic,
110 Box::<DynamicLevelSelector>::default(),
111 );
112 compaction_selectors.insert(
113 compact_task::TaskType::SpaceReclaim,
114 Box::<SpaceReclaimCompactionSelector>::default(),
115 );
116 compaction_selectors.insert(
117 compact_task::TaskType::Ttl,
118 Box::<TtlCompactionSelector>::default(),
119 );
120 compaction_selectors.insert(
121 compact_task::TaskType::Tombstone,
122 Box::<TombstoneCompactionSelector>::default(),
123 );
124 compaction_selectors.insert(
125 compact_task::TaskType::VnodeWatermark,
126 Box::<VnodeWatermarkCompactionSelector>::default(),
127 );
128 compaction_selectors
129}
130
131impl HummockVersionTransaction<'_> {
132 fn apply_compact_task(&mut self, compact_task: &CompactTask) {
133 let mut version_delta = self.new_delta();
134 let trivial_move = compact_task.is_trivial_move_task();
135 version_delta.trivial_move = trivial_move;
136
137 let group_deltas = &mut version_delta
138 .group_deltas
139 .entry(compact_task.compaction_group_id)
140 .or_default()
141 .group_deltas;
142 let mut removed_table_ids_map: BTreeMap<u32, HashSet<HummockSstableId>> =
143 BTreeMap::default();
144
145 for level in &compact_task.input_ssts {
146 let level_idx = level.level_idx;
147
148 removed_table_ids_map
149 .entry(level_idx)
150 .or_default()
151 .extend(level.table_infos.iter().map(|sst| sst.sst_id));
152 }
153
154 for (level_idx, removed_table_ids) in removed_table_ids_map {
155 let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
156 level_idx,
157 0, removed_table_ids,
159 vec![], 0, compact_task.compaction_group_version_id,
162 ));
163
164 group_deltas.push(group_delta);
165 }
166
167 let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
168 compact_task.target_level,
169 compact_task.target_sub_level_id,
170 HashSet::new(), compact_task.sorted_output_ssts.clone(),
172 compact_task.split_weight_by_vnode,
173 compact_task.compaction_group_version_id,
174 ));
175
176 group_deltas.push(group_delta);
177 version_delta.pre_apply();
178 }
179}
180
181#[derive(Default)]
182pub struct Compaction {
183 pub compact_task_assignment: BTreeMap<HummockCompactionTaskId, PbCompactTaskAssignment>,
185 pub compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus>,
187
188 pub _deterministic_mode: bool,
189}
190
191impl HummockManager {
192 pub async fn get_assigned_compact_task_num(&self) -> u64 {
193 self.compaction.read().await.compact_task_assignment.len() as u64
194 }
195
196 pub async fn list_compaction_status(
197 &self,
198 ) -> (Vec<PbCompactStatus>, Vec<CompactTaskAssignment>) {
199 let compaction = self.compaction.read().await;
200 (
201 compaction.compaction_statuses.values().map_into().collect(),
202 compaction
203 .compact_task_assignment
204 .values()
205 .cloned()
206 .collect(),
207 )
208 }
209
210 pub async fn get_compaction_scores(
211 &self,
212 compaction_group_id: CompactionGroupId,
213 ) -> Vec<PickerInfo> {
214 let (status, levels, group) = {
215 let compaction = self.compaction.read().await;
216 let versioning = self.versioning.read().await;
217 let config_manager = self.compaction_group_manager.read().await;
218 match (
219 compaction.compaction_statuses.get(&compaction_group_id),
220 versioning.current_version.levels.get(&compaction_group_id),
221 config_manager.try_get_compaction_group_config(compaction_group_id),
222 ) {
223 (Some(cs), Some(v), Some(cf)) => (cs.to_owned(), v.to_owned(), cf),
224 _ => {
225 return vec![];
226 }
227 }
228 };
229 let dynamic_level_core = DynamicLevelSelectorCore::new(
230 group.compaction_config,
231 Arc::new(CompactionDeveloperConfig::default()),
232 );
233 let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers);
234 ctx.score_levels
235 }
236}
237
238impl HummockManager {
239 pub fn compaction_event_loop(
240 hummock_manager: Arc<Self>,
241 compactor_streams_change_rx: UnboundedReceiver<(
242 HummockContextId,
243 Streaming<SubscribeCompactionEventRequest>,
244 )>,
245 ) -> Vec<(JoinHandle<()>, Sender<()>)> {
246 let mut join_handle_vec = Vec::default();
247
248 let hummock_compaction_event_handler =
249 HummockCompactionEventHandler::new(hummock_manager.clone());
250
251 let dedicated_event_loop = HummockCompactorDedicatedEventLoop::new(
252 hummock_manager.clone(),
253 hummock_compaction_event_handler.clone(),
254 );
255
256 let (dedicated_event_loop_join_handle, event_tx, shutdown_tx) = dedicated_event_loop.run();
257 join_handle_vec.push((dedicated_event_loop_join_handle, shutdown_tx));
258
259 let hummock_compaction_event_dispatcher = HummockCompactionEventDispatcher::new(
260 hummock_manager.env.opts.clone(),
261 hummock_compaction_event_handler,
262 Some(event_tx),
263 );
264
265 let event_loop = HummockCompactionEventLoop::new(
266 hummock_compaction_event_dispatcher,
267 hummock_manager.metrics.clone(),
268 compactor_streams_change_rx,
269 );
270
271 let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
272 join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
273
274 join_handle_vec
275 }
276
277 pub fn add_compactor_stream(
278 &self,
279 context_id: HummockContextId,
280 req_stream: Streaming<SubscribeCompactionEventRequest>,
281 ) {
282 self.compactor_streams_change_tx
283 .send((context_id, req_stream))
284 .unwrap();
285 }
286
287 pub async fn auto_pick_compaction_group_and_type(
288 &self,
289 ) -> Option<(CompactionGroupId, compact_task::TaskType)> {
290 let mut compaction_group_ids = self.compaction_group_ids().await;
291 compaction_group_ids.shuffle(&mut thread_rng());
292
293 for cg_id in compaction_group_ids {
294 if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) {
295 return Some((cg_id, pick_type));
296 }
297 }
298
299 None
300 }
301
302 async fn auto_pick_compaction_groups_and_type(
305 &self,
306 ) -> (Vec<CompactionGroupId>, compact_task::TaskType) {
307 let mut compaction_group_ids = self.compaction_group_ids().await;
308 compaction_group_ids.shuffle(&mut thread_rng());
309
310 let mut normal_groups = vec![];
311 for cg_id in compaction_group_ids {
312 if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) {
313 if pick_type == TaskType::Dynamic {
314 normal_groups.push(cg_id);
315 } else if normal_groups.is_empty() {
316 return (vec![cg_id], pick_type);
317 }
318 }
319 }
320 (normal_groups, TaskType::Dynamic)
321 }
322}
323
324impl HummockManager {
325 pub async fn get_compact_tasks_impl(
326 &self,
327 compaction_groups: Vec<CompactionGroupId>,
328 max_select_count: usize,
329 selector: &mut Box<dyn CompactionSelector>,
330 ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
331 let deterministic_mode = self.env.opts.compaction_deterministic_test;
332
333 let mut compaction_guard = self.compaction.write().await;
334 let compaction: &mut Compaction = &mut compaction_guard;
335 let mut versioning_guard = self.versioning.write().await;
336 let versioning: &mut Versioning = &mut versioning_guard;
337
338 let _timer = start_measure_real_process_timer!(self, "get_compact_tasks_impl");
339
340 let start_time = Instant::now();
341 let mut compaction_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
342
343 let mut compact_task_assignment =
344 BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
345
346 let mut version = HummockVersionTransaction::new(
347 &mut versioning.current_version,
348 &mut versioning.hummock_version_deltas,
349 self.env.notification_manager(),
350 None,
351 &self.metrics,
352 );
353 let mut version_stats = HummockVersionStatsTransaction::new(
355 &mut versioning.version_stats,
356 self.env.notification_manager(),
357 );
358
359 if deterministic_mode {
360 version.disable_apply_to_txn();
361 }
362 let all_versioned_table_schemas = if self.env.opts.enable_dropped_column_reclaim {
363 self.metadata_manager
364 .catalog_controller
365 .get_versioned_table_schemas()
366 .await
367 .map_err(|e| Error::Internal(e.into()))?
368 } else {
369 HashMap::default()
370 };
371 let mut unschedule_groups = vec![];
372 let mut trivial_tasks = vec![];
373 let mut pick_tasks = vec![];
374 let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts(
375 &self.env.opts,
376 ));
377 'outside: for compaction_group_id in compaction_groups {
378 if pick_tasks.len() >= max_select_count {
379 break;
380 }
381
382 if !version
383 .latest_version()
384 .levels
385 .contains_key(&compaction_group_id)
386 {
387 continue;
388 }
389
390 let group_config = {
394 let config_manager = self.compaction_group_manager.read().await;
395
396 match config_manager.try_get_compaction_group_config(compaction_group_id) {
397 Some(config) => config,
398 None => continue,
399 }
400 };
401
402 let task_id = next_compaction_task_id(&self.env).await?;
404
405 if !compaction_statuses.contains_key(&compaction_group_id) {
406 compaction_statuses.insert(
408 compaction_group_id,
409 CompactStatus::new(
410 compaction_group_id,
411 group_config.compaction_config.max_level,
412 ),
413 );
414 }
415 let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap();
416
417 let can_trivial_move = matches!(selector.task_type(), TaskType::Dynamic)
418 || matches!(selector.task_type(), TaskType::Emergency);
419
420 let mut stats = LocalSelectorStatistic::default();
421 let member_table_ids: Vec<_> = version
422 .latest_version()
423 .state_table_info
424 .compaction_group_member_table_ids(compaction_group_id)
425 .iter()
426 .copied()
427 .collect();
428
429 let mut table_id_to_option: HashMap<TableId, _> = HashMap::default();
430
431 {
432 let guard = self.table_id_to_table_option.read();
433 for table_id in &member_table_ids {
434 if let Some(opts) = guard.get(table_id) {
435 table_id_to_option.insert(*table_id, *opts);
436 }
437 }
438 }
439
440 while let Some(compact_task) = compact_status.get_compact_task(
441 version
442 .latest_version()
443 .get_compaction_group_levels(compaction_group_id),
444 version
445 .latest_version()
446 .state_table_info
447 .compaction_group_member_table_ids(compaction_group_id),
448 task_id as HummockCompactionTaskId,
449 &group_config,
450 &mut stats,
451 selector,
452 &table_id_to_option,
453 developer_config.clone(),
454 &version.latest_version().table_watermarks,
455 &version.latest_version().state_table_info,
456 ) {
457 let target_level_id = compact_task.input.target_level as u32;
458 let compaction_group_version_id = version
459 .latest_version()
460 .get_compaction_group_levels(compaction_group_id)
461 .compaction_group_version_id;
462 let compression_algorithm = match compact_task.compression_algorithm.as_str() {
463 "Lz4" => 1,
464 "Zstd" => 2,
465 _ => 0,
466 };
467 let vnode_partition_count = compact_task.input.vnode_partition_count;
468 let mut compact_task = CompactTask {
469 input_ssts: compact_task.input.input_levels,
470 splits: vec![KeyRange::inf()],
471 sorted_output_ssts: vec![],
472 task_id,
473 target_level: target_level_id,
474 gc_delete_keys: version
477 .latest_version()
478 .get_compaction_group_levels(compaction_group_id)
479 .is_last_level(target_level_id),
480 base_level: compact_task.base_level as u32,
481 task_status: TaskStatus::Pending,
482 compaction_group_id: group_config.group_id,
483 compaction_group_version_id,
484 existing_table_ids: member_table_ids.clone(),
485 compression_algorithm,
486 target_file_size: compact_task.target_file_size,
487 table_options: table_id_to_option
488 .iter()
489 .map(|(table_id, table_option)| {
490 (*table_id, TableOption::from(table_option))
491 })
492 .collect(),
493 current_epoch_time: Epoch::now().0,
494 compaction_filter_mask: group_config.compaction_config.compaction_filter_mask,
495 target_sub_level_id: compact_task.input.target_sub_level_id,
496 task_type: compact_task.compaction_task_type,
497 split_weight_by_vnode: vnode_partition_count,
498 max_sub_compaction: group_config.compaction_config.max_sub_compaction,
499 max_kv_count_for_xor16: group_config.compaction_config.max_kv_count_for_xor16,
500 ..Default::default()
501 };
502
503 let is_trivial_reclaim = compact_task.is_trivial_reclaim();
504 let is_trivial_move = compact_task.is_trivial_move_task();
505 if is_trivial_reclaim || (is_trivial_move && can_trivial_move) {
506 let log_label = if is_trivial_reclaim {
507 "TrivialReclaim"
508 } else {
509 "TrivialMove"
510 };
511 let label = if is_trivial_reclaim {
512 "trivial-space-reclaim"
513 } else {
514 "trivial-move"
515 };
516
517 tracing::debug!(
518 "{} for compaction group {}: input: {:?}, cost time: {:?}",
519 log_label,
520 compact_task.compaction_group_id,
521 compact_task.input_ssts,
522 start_time.elapsed()
523 );
524 compact_task.task_status = TaskStatus::Success;
525 compact_status.report_compact_task(&compact_task);
526 if !is_trivial_reclaim {
527 compact_task
528 .sorted_output_ssts
529 .clone_from(&compact_task.input_ssts[0].table_infos);
530 }
531 update_table_stats_for_vnode_watermark_trivial_reclaim(
532 &mut version_stats.table_stats,
533 &compact_task,
534 );
535 self.metrics
536 .compact_frequency
537 .with_label_values(&[
538 label,
539 &compact_task.compaction_group_id.to_string(),
540 selector.task_type().as_str_name(),
541 "SUCCESS",
542 ])
543 .inc();
544
545 version.apply_compact_task(&compact_task);
546 trivial_tasks.push(compact_task);
547 if trivial_tasks.len() >= self.env.opts.max_trivial_move_task_count_per_loop {
548 break 'outside;
549 }
550 } else {
551 self.calculate_vnode_partition(
552 &mut compact_task,
553 group_config.compaction_config.as_ref(),
554 version
555 .latest_version()
556 .get_compaction_group_levels(compaction_group_id),
557 )
558 .await?;
559
560 let table_ids_to_be_compacted = compact_task.build_compact_table_ids();
561
562 let mut pk_prefix_table_watermarks = BTreeMap::default();
563 let mut non_pk_prefix_table_watermarks = BTreeMap::default();
564 let mut value_table_watermarks = BTreeMap::default();
565 for (table_id, watermark) in version
566 .latest_version()
567 .safe_epoch_table_watermarks(&table_ids_to_be_compacted)
568 {
569 match watermark.watermark_type {
570 WatermarkSerdeType::PkPrefix => {
571 pk_prefix_table_watermarks.insert(table_id, watermark);
572 }
573 WatermarkSerdeType::NonPkPrefix => {
574 non_pk_prefix_table_watermarks.insert(table_id, watermark);
575 }
576 WatermarkSerdeType::Value => {
577 value_table_watermarks.insert(table_id, watermark);
578 }
579 }
580 }
581 compact_task.pk_prefix_table_watermarks = pk_prefix_table_watermarks;
582 compact_task.non_pk_prefix_table_watermarks = non_pk_prefix_table_watermarks;
583 compact_task.value_table_watermarks = value_table_watermarks;
584
585 compact_task.table_schemas = compact_task
586 .existing_table_ids
587 .iter()
588 .filter_map(|table_id| {
589 all_versioned_table_schemas.get(table_id).map(|column_ids| {
590 (
591 *table_id,
592 TableSchema {
593 column_ids: column_ids.clone(),
594 },
595 )
596 })
597 })
598 .collect();
599
600 compact_task_assignment.insert(
601 compact_task.task_id,
602 CompactTaskAssignment {
603 compact_task: Some(compact_task.clone().into()),
604 context_id: META_NODE_ID, },
606 );
607
608 pick_tasks.push(compact_task);
609 break;
610 }
611
612 stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
613 stats = LocalSelectorStatistic::default();
614 }
615 if pick_tasks
616 .last()
617 .map(|task| task.compaction_group_id != compaction_group_id)
618 .unwrap_or(true)
619 {
620 unschedule_groups.push(compaction_group_id);
621 }
622 stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
623 }
624
625 if !trivial_tasks.is_empty() {
626 commit_multi_var!(
627 self.meta_store_ref(),
628 compaction_statuses,
629 compact_task_assignment,
630 version,
631 version_stats
632 )?;
633 self.metrics
634 .compact_task_batch_count
635 .with_label_values(&["batch_trivial_move"])
636 .observe(trivial_tasks.len() as f64);
637
638 for trivial_task in &trivial_tasks {
639 self.metrics
640 .compact_task_trivial_move_sst_count
641 .with_label_values(&[&trivial_task.compaction_group_id.to_string()])
642 .observe(trivial_task.input_ssts[0].table_infos.len() as _);
643 }
644
645 drop(versioning_guard);
646 } else {
647 drop(versioning_guard);
650 commit_multi_var!(
651 self.meta_store_ref(),
652 compaction_statuses,
653 compact_task_assignment
654 )?;
655 }
656 drop(compaction_guard);
657 if !pick_tasks.is_empty() {
658 self.metrics
659 .compact_task_batch_count
660 .with_label_values(&["batch_get_compact_task"])
661 .observe(pick_tasks.len() as f64);
662 }
663
664 for compact_task in &mut pick_tasks {
665 let compaction_group_id = compact_task.compaction_group_id;
666
667 self.compactor_manager
669 .initiate_task_heartbeat(compact_task.clone());
670
671 compact_task.task_status = TaskStatus::Pending;
673 let compact_task_statistics = statistics_compact_task(compact_task);
674
675 let level_type_label = build_compact_task_level_type_metrics_label(
676 compact_task.input_ssts[0].level_idx as usize,
677 compact_task.input_ssts.last().unwrap().level_idx as usize,
678 );
679
680 let level_count = compact_task.input_ssts.len();
681 if compact_task.input_ssts[0].level_idx == 0 {
682 self.metrics
683 .l0_compact_level_count
684 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
685 .observe(level_count as _);
686 }
687
688 self.metrics
689 .compact_task_size
690 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
691 .observe(compact_task_statistics.total_file_size as _);
692
693 self.metrics
694 .compact_task_size
695 .with_label_values(&[
696 &compaction_group_id.to_string(),
697 &format!("{} uncompressed", level_type_label),
698 ])
699 .observe(compact_task_statistics.total_uncompressed_file_size as _);
700
701 self.metrics
702 .compact_task_file_count
703 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
704 .observe(compact_task_statistics.total_file_count as _);
705
706 tracing::trace!(
707 "For compaction group {}: pick up {} {} sub_level in level {} to compact to target {}. cost time: {:?} compact_task_statistics {:?}",
708 compaction_group_id,
709 level_count,
710 compact_task.input_ssts[0].level_type.as_str_name(),
711 compact_task.input_ssts[0].level_idx,
712 compact_task.target_level,
713 start_time.elapsed(),
714 compact_task_statistics
715 );
716 }
717
718 #[cfg(test)]
719 {
720 self.check_state_consistency().await;
721 }
722 pick_tasks.extend(trivial_tasks);
723 Ok((pick_tasks, unschedule_groups))
724 }
725
726 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
728 fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore(
729 anyhow::anyhow!("failpoint metastore err")
730 )));
731 let ret = self
732 .cancel_compact_task_impl(vec![task_id], task_status)
733 .await?;
734 Ok(ret[0])
735 }
736
737 pub async fn cancel_compact_tasks(
738 &self,
739 tasks: Vec<u64>,
740 task_status: TaskStatus,
741 ) -> Result<Vec<bool>> {
742 self.cancel_compact_task_impl(tasks, task_status).await
743 }
744
745 async fn cancel_compact_task_impl(
746 &self,
747 task_ids: Vec<u64>,
748 task_status: TaskStatus,
749 ) -> Result<Vec<bool>> {
750 assert!(CANCEL_STATUS_SET.contains(&task_status));
751 let tasks = task_ids
752 .into_iter()
753 .map(|task_id| ReportTask {
754 task_id,
755 task_status,
756 sorted_output_ssts: vec![],
757 table_stats_change: HashMap::default(),
758 object_timestamps: HashMap::default(),
759 })
760 .collect_vec();
761 let rets = self.report_compact_tasks(tasks).await?;
762 #[cfg(test)]
763 {
764 self.check_state_consistency().await;
765 }
766 Ok(rets)
767 }
768
769 async fn get_compact_tasks(
770 &self,
771 mut compaction_groups: Vec<CompactionGroupId>,
772 max_select_count: usize,
773 selector: &mut Box<dyn CompactionSelector>,
774 ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
775 fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
776 anyhow::anyhow!("failpoint metastore error")
777 )));
778 compaction_groups.shuffle(&mut thread_rng());
779 let (mut tasks, groups) = self
780 .get_compact_tasks_impl(compaction_groups, max_select_count, selector)
781 .await?;
782 tasks.retain(|task| {
783 if task.task_status == TaskStatus::Success {
784 debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
785 false
786 } else {
787 true
788 }
789 });
790 Ok((tasks, groups))
791 }
792
793 pub async fn get_compact_task(
794 &self,
795 compaction_group_id: CompactionGroupId,
796 selector: &mut Box<dyn CompactionSelector>,
797 ) -> Result<Option<CompactTask>> {
798 fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
799 anyhow::anyhow!("failpoint metastore error")
800 )));
801
802 let (normal_tasks, _) = self
803 .get_compact_tasks_impl(vec![compaction_group_id], 1, selector)
804 .await?;
805 for task in normal_tasks {
806 if task.task_status != TaskStatus::Success {
807 return Ok(Some(task));
808 }
809 debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
810 }
811 Ok(None)
812 }
813
814 pub async fn manual_get_compact_task(
815 &self,
816 compaction_group_id: CompactionGroupId,
817 manual_compaction_option: ManualCompactionOption,
818 ) -> Result<Option<CompactTask>> {
819 let mut selector: Box<dyn CompactionSelector> =
820 Box::new(ManualCompactionSelector::new(manual_compaction_option));
821 self.get_compact_task(compaction_group_id, &mut selector)
822 .await
823 }
824
825 pub async fn report_compact_task(
826 &self,
827 task_id: u64,
828 task_status: TaskStatus,
829 sorted_output_ssts: Vec<SstableInfo>,
830 table_stats_change: Option<PbTableStatsMap>,
831 object_timestamps: HashMap<HummockSstableObjectId, u64>,
832 ) -> Result<bool> {
833 let rets = self
834 .report_compact_tasks(vec![ReportTask {
835 task_id,
836 task_status,
837 sorted_output_ssts,
838 table_stats_change: table_stats_change.unwrap_or_default(),
839 object_timestamps,
840 }])
841 .await?;
842 Ok(rets[0])
843 }
844
845 pub async fn report_compact_tasks(&self, report_tasks: Vec<ReportTask>) -> Result<Vec<bool>> {
846 let compaction_guard = self.compaction.write().await;
847 let versioning_guard = self.versioning.write().await;
848
849 self.report_compact_tasks_impl(report_tasks, compaction_guard, versioning_guard)
850 .await
851 }
852
853 pub async fn report_compact_tasks_impl(
861 &self,
862 report_tasks: Vec<ReportTask>,
863 mut compaction_guard: RwLockWriteGuard<'_, Compaction>,
864 mut versioning_guard: RwLockWriteGuard<'_, Versioning>,
865 ) -> Result<Vec<bool>> {
866 let deterministic_mode = self.env.opts.compaction_deterministic_test;
867 let compaction: &mut Compaction = &mut compaction_guard;
868 let start_time = Instant::now();
869 let original_keys = compaction.compaction_statuses.keys().cloned().collect_vec();
870 let mut compact_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
871 let mut rets = vec![false; report_tasks.len()];
872 let mut compact_task_assignment =
873 BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
874 let versioning: &mut Versioning = &mut versioning_guard;
876 let _timer = start_measure_real_process_timer!(self, "report_compact_tasks");
877
878 for group_id in original_keys {
880 if !versioning.current_version.levels.contains_key(&group_id) {
881 compact_statuses.remove(group_id);
882 }
883 }
884 let mut tasks = vec![];
885
886 let mut version = HummockVersionTransaction::new(
887 &mut versioning.current_version,
888 &mut versioning.hummock_version_deltas,
889 self.env.notification_manager(),
890 None,
891 &self.metrics,
892 );
893
894 if deterministic_mode {
895 version.disable_apply_to_txn();
896 }
897
898 let mut version_stats = HummockVersionStatsTransaction::new(
899 &mut versioning.version_stats,
900 self.env.notification_manager(),
901 );
902 let mut success_count = 0;
903 for (idx, task) in report_tasks.into_iter().enumerate() {
904 rets[idx] = true;
905 let mut compact_task = match compact_task_assignment.remove(task.task_id) {
906 Some(compact_task) => CompactTask::from(compact_task.compact_task.unwrap()),
907 None => {
908 tracing::warn!("{}", format!("compact task {} not found", task.task_id));
909 rets[idx] = false;
910 continue;
911 }
912 };
913
914 {
915 compact_task.task_status = task.task_status;
917 compact_task.sorted_output_ssts = task.sorted_output_ssts;
918 }
919
920 match compact_statuses.get_mut(compact_task.compaction_group_id) {
921 Some(mut compact_status) => {
922 compact_status.report_compact_task(&compact_task);
923 }
924 None => {
925 compact_task.task_status = TaskStatus::InvalidGroupCanceled;
931 }
932 }
933
934 let is_success = if let TaskStatus::Success = compact_task.task_status {
935 match self
936 .report_compaction_sanity_check(&task.object_timestamps)
937 .await
938 {
939 Err(e) => {
940 warn!(
941 "failed to commit compaction task {} {}",
942 compact_task.task_id,
943 e.as_report()
944 );
945 compact_task.task_status = TaskStatus::RetentionTimeRejected;
946 false
947 }
948 _ => {
949 let group = version
950 .latest_version()
951 .levels
952 .get(&compact_task.compaction_group_id)
953 .unwrap();
954 let is_expired = compact_task.is_expired(group.compaction_group_version_id);
955 if is_expired {
956 compact_task.task_status = TaskStatus::InputOutdatedCanceled;
957 warn!(
958 "The task may be expired because of group split, task:\n {:?}",
959 compact_task_to_string(&compact_task)
960 );
961 }
962 !is_expired
963 }
964 }
965 } else {
966 false
967 };
968 if is_success {
969 success_count += 1;
970 version.apply_compact_task(&compact_task);
971 if purge_prost_table_stats(
972 &mut version_stats.table_stats,
973 version.latest_version(),
974 &HashSet::default(),
975 ) {
976 self.metrics.version_stats.reset();
977 versioning.local_metrics.clear();
978 }
979 add_prost_table_stats_map(&mut version_stats.table_stats, &task.table_stats_change);
980 trigger_local_table_stat(
981 &self.metrics,
982 &mut versioning.local_metrics,
983 &version_stats,
984 &task.table_stats_change,
985 );
986 }
987 tasks.push(compact_task);
988 }
989 if success_count > 0 {
990 commit_multi_var!(
991 self.meta_store_ref(),
992 compact_statuses,
993 compact_task_assignment,
994 version,
995 version_stats
996 )?;
997
998 self.metrics
999 .compact_task_batch_count
1000 .with_label_values(&["batch_report_task"])
1001 .observe(success_count as f64);
1002 } else {
1003 commit_multi_var!(
1005 self.meta_store_ref(),
1006 compact_statuses,
1007 compact_task_assignment
1008 )?;
1009 }
1010
1011 let mut success_groups = vec![];
1012 for compact_task in &tasks {
1013 self.compactor_manager
1014 .remove_task_heartbeat(compact_task.task_id);
1015 tracing::trace!(
1016 "Reported compaction task. {}. cost time: {:?}",
1017 compact_task_to_string(compact_task),
1018 start_time.elapsed(),
1019 );
1020
1021 if !deterministic_mode
1022 && (matches!(compact_task.task_type, compact_task::TaskType::Dynamic)
1023 || matches!(compact_task.task_type, compact_task::TaskType::Emergency))
1024 {
1025 self.try_send_compaction_request(
1027 compact_task.compaction_group_id,
1028 compact_task::TaskType::Dynamic,
1029 );
1030 }
1031
1032 if compact_task.task_status == TaskStatus::Success {
1033 success_groups.push(compact_task.compaction_group_id);
1034 }
1035 }
1036
1037 trigger_compact_tasks_stat(
1038 &self.metrics,
1039 &tasks,
1040 &compaction.compaction_statuses,
1041 &versioning_guard.current_version,
1042 );
1043 drop(versioning_guard);
1044 if !success_groups.is_empty() {
1045 self.try_update_write_limits(&success_groups).await;
1046 }
1047 Ok(rets)
1048 }
1049
1050 pub async fn trigger_compaction_deterministic(
1053 &self,
1054 _base_version_id: HummockVersionId,
1055 compaction_groups: Vec<CompactionGroupId>,
1056 ) -> Result<()> {
1057 self.on_current_version(|old_version| {
1058 tracing::info!(
1059 "Trigger compaction for version {}, groups {:?}",
1060 old_version.id,
1061 compaction_groups
1062 );
1063 })
1064 .await;
1065
1066 if compaction_groups.is_empty() {
1067 return Ok(());
1068 }
1069 for compaction_group in compaction_groups {
1070 self.try_send_compaction_request(compaction_group, compact_task::TaskType::Dynamic);
1071 }
1072 Ok(())
1073 }
1074
1075 pub async fn trigger_manual_compaction(
1076 &self,
1077 compaction_group: CompactionGroupId,
1078 manual_compaction_option: ManualCompactionOption,
1079 ) -> Result<()> {
1080 let start_time = Instant::now();
1081
1082 let compactor = match self.compactor_manager.next_compactor() {
1084 Some(compactor) => compactor,
1085 None => {
1086 tracing::warn!("trigger_manual_compaction No compactor is available.");
1087 return Err(anyhow::anyhow!(
1088 "trigger_manual_compaction No compactor is available. compaction_group {}",
1089 compaction_group
1090 )
1091 .into());
1092 }
1093 };
1094
1095 let compact_task = self
1097 .manual_get_compact_task(compaction_group, manual_compaction_option)
1098 .await;
1099 let compact_task = match compact_task {
1100 Ok(Some(compact_task)) => compact_task,
1101 Ok(None) => {
1102 return Err(anyhow::anyhow!(
1104 "trigger_manual_compaction No compaction_task is available. compaction_group {}",
1105 compaction_group
1106 )
1107 .into());
1108 }
1109 Err(err) => {
1110 tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
1111
1112 return Err(anyhow::anyhow!(err)
1113 .context(format!(
1114 "Failed to get compaction task for compaction_group {}",
1115 compaction_group,
1116 ))
1117 .into());
1118 }
1119 };
1120
1121 let compact_task_string = compact_task_to_string(&compact_task);
1123 compactor
1125 .send_event(ResponseEvent::CompactTask(compact_task.into()))
1126 .with_context(|| {
1127 format!(
1128 "Failed to trigger compaction task for compaction_group {}",
1129 compaction_group,
1130 )
1131 })?;
1132
1133 tracing::info!(
1134 "Trigger manual compaction task. {}. cost time: {:?}",
1135 &compact_task_string,
1136 start_time.elapsed(),
1137 );
1138
1139 Ok(())
1140 }
1141
1142 pub fn try_send_compaction_request(
1144 &self,
1145 compaction_group: CompactionGroupId,
1146 task_type: compact_task::TaskType,
1147 ) -> bool {
1148 match self
1149 .compaction_state
1150 .try_sched_compaction(compaction_group, task_type)
1151 {
1152 Ok(_) => true,
1153 Err(e) => {
1154 tracing::error!(
1155 error = %e.as_report(),
1156 "failed to send compaction request for compaction group {}",
1157 compaction_group,
1158 );
1159 false
1160 }
1161 }
1162 }
1163
1164 async fn try_apply_vnode_aligned_partition(
1167 &self,
1168 compact_task: &mut CompactTask,
1169 compaction_config: &CompactionConfig,
1170 levels: &Levels,
1171 ) -> Result<bool> {
1172 let Some(threshold) = compaction_config.vnode_aligned_level_size_threshold else {
1175 return Ok(false);
1176 };
1177
1178 if compact_task.target_level < compact_task.base_level
1179 || compact_task.existing_table_ids.len() != 1
1180 {
1181 return Ok(false);
1182 }
1183
1184 let target_level_size = levels
1186 .get_level(compact_task.target_level as usize)
1187 .total_file_size;
1188
1189 if target_level_size < threshold {
1190 return Ok(false);
1191 }
1192
1193 let table_id = compact_task.existing_table_ids[0];
1195
1196 let table = self
1198 .metadata_manager
1199 .get_table_catalog_by_ids(&[table_id])
1200 .await
1201 .with_context(|| {
1202 format!(
1203 "Failed to get table catalog for table_id {} in compaction_group {}",
1204 table_id, compact_task.compaction_group_id
1205 )
1206 })
1207 .map_err(Error::Internal)?
1208 .into_iter()
1209 .next()
1210 .ok_or_else(|| {
1211 Error::Internal(anyhow::anyhow!(
1212 "Table catalog not found for table_id {} in compaction_group {}",
1213 table_id,
1214 compact_task.compaction_group_id
1215 ))
1216 })?;
1217
1218 compact_task
1219 .table_vnode_partition
1220 .insert(table_id, table.vnode_count() as u32);
1221
1222 Ok(true)
1223 }
1224
1225 fn apply_split_weight_by_vnode_partition(
1228 &self,
1229 compact_task: &mut CompactTask,
1230 compaction_config: &CompactionConfig,
1231 ) {
1232 if compaction_config.split_weight_by_vnode > 0 {
1233 for table_id in &compact_task.existing_table_ids {
1234 compact_task
1235 .table_vnode_partition
1236 .insert(*table_id, compact_task.split_weight_by_vnode);
1237 }
1238
1239 return;
1240 }
1241
1242 let mut table_size_info: HashMap<TableId, u64> = HashMap::default();
1244 let mut existing_table_ids: HashSet<TableId> = HashSet::default();
1245 for input_ssts in &compact_task.input_ssts {
1246 for sst in &input_ssts.table_infos {
1247 existing_table_ids.extend(sst.table_ids.iter());
1248 for table_id in &sst.table_ids {
1249 *table_size_info.entry(*table_id).or_default() +=
1250 sst.sst_size / (sst.table_ids.len() as u64);
1251 }
1252 }
1253 }
1254 compact_task
1255 .existing_table_ids
1256 .retain(|table_id| existing_table_ids.contains(table_id));
1257
1258 let hybrid_vnode_count = self.env.opts.hybrid_partition_node_count;
1259 let default_partition_count = self.env.opts.partition_vnode_count;
1260 let compact_task_table_size_partition_threshold_low = self
1261 .env
1262 .opts
1263 .compact_task_table_size_partition_threshold_low;
1264 let compact_task_table_size_partition_threshold_high = self
1265 .env
1266 .opts
1267 .compact_task_table_size_partition_threshold_high;
1268
1269 let table_write_throughput_statistic_manager =
1271 self.table_write_throughput_statistic_manager.read();
1272 let timestamp = chrono::Utc::now().timestamp();
1273
1274 for (table_id, compact_table_size) in table_size_info {
1275 let write_throughput = table_write_throughput_statistic_manager
1276 .get_table_throughput_descending(table_id, timestamp)
1277 .peekable()
1278 .peek()
1279 .map(|item| item.throughput)
1280 .unwrap_or(0);
1281
1282 if compact_table_size > compact_task_table_size_partition_threshold_high
1283 && default_partition_count > 0
1284 {
1285 compact_task
1286 .table_vnode_partition
1287 .insert(table_id, default_partition_count);
1288 } else if (compact_table_size > compact_task_table_size_partition_threshold_low
1289 || (write_throughput > self.env.opts.table_high_write_throughput_threshold
1290 && compact_table_size > compaction_config.target_file_size_base))
1291 && hybrid_vnode_count > 0
1292 {
1293 compact_task
1294 .table_vnode_partition
1295 .insert(table_id, hybrid_vnode_count);
1296 } else if compact_table_size > compaction_config.target_file_size_base {
1297 compact_task.table_vnode_partition.insert(table_id, 1);
1298 }
1299 }
1300
1301 compact_task
1302 .table_vnode_partition
1303 .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id));
1304 }
1305
1306 pub(crate) async fn calculate_vnode_partition(
1307 &self,
1308 compact_task: &mut CompactTask,
1309 compaction_config: &CompactionConfig,
1310 levels: &Levels,
1311 ) -> Result<()> {
1312 if self
1314 .try_apply_vnode_aligned_partition(compact_task, compaction_config, levels)
1315 .await?
1316 {
1317 return Ok(());
1318 }
1319
1320 if compact_task.target_level > compact_task.base_level {
1325 return Ok(());
1326 }
1327
1328 self.apply_split_weight_by_vnode_partition(compact_task, compaction_config);
1330
1331 Ok(())
1332 }
1333
1334 pub fn compactor_manager_ref(&self) -> crate::hummock::CompactorManagerRef {
1335 self.compactor_manager.clone()
1336 }
1337}
1338
1339#[cfg(any(test, feature = "test"))]
1340impl HummockManager {
1341 pub async fn compaction_task_from_assignment_for_test(
1342 &self,
1343 task_id: u64,
1344 ) -> Option<CompactTaskAssignment> {
1345 let compaction_guard = self.compaction.read().await;
1346 let assignment_ref = &compaction_guard.compact_task_assignment;
1347 assignment_ref.get(&task_id).cloned()
1348 }
1349
1350 pub async fn report_compact_task_for_test(
1351 &self,
1352 task_id: u64,
1353 compact_task: Option<CompactTask>,
1354 task_status: TaskStatus,
1355 sorted_output_ssts: Vec<SstableInfo>,
1356 table_stats_change: Option<PbTableStatsMap>,
1357 ) -> Result<()> {
1358 if let Some(task) = compact_task {
1359 let mut guard = self.compaction.write().await;
1360 guard.compact_task_assignment.insert(
1361 task_id,
1362 CompactTaskAssignment {
1363 compact_task: Some(task.into()),
1364 context_id: 0.into(),
1365 },
1366 );
1367 }
1368
1369 self.report_compact_tasks(vec![ReportTask {
1372 task_id,
1373 task_status,
1374 sorted_output_ssts,
1375 table_stats_change: table_stats_change.unwrap_or_default(),
1376 object_timestamps: HashMap::default(),
1377 }])
1378 .await?;
1379 Ok(())
1380 }
1381}
1382
1383#[derive(Debug, Default)]
1384pub struct CompactionState {
1385 scheduled: Mutex<HashSet<(CompactionGroupId, compact_task::TaskType)>>,
1386}
1387
1388impl CompactionState {
1389 pub fn new() -> Self {
1390 Self {
1391 scheduled: Default::default(),
1392 }
1393 }
1394
1395 pub fn try_sched_compaction(
1397 &self,
1398 compaction_group: CompactionGroupId,
1399 task_type: TaskType,
1400 ) -> std::result::Result<bool, SendError<CompactionRequestChannelItem>> {
1401 let mut guard = self.scheduled.lock();
1402 let key = (compaction_group, task_type);
1403 if guard.contains(&key) {
1404 return Ok(false);
1405 }
1406 guard.insert(key);
1407 Ok(true)
1408 }
1409
1410 pub fn unschedule(
1411 &self,
1412 compaction_group: CompactionGroupId,
1413 task_type: compact_task::TaskType,
1414 ) {
1415 self.scheduled.lock().remove(&(compaction_group, task_type));
1416 }
1417
1418 pub fn auto_pick_type(&self, group: CompactionGroupId) -> Option<TaskType> {
1419 let guard = self.scheduled.lock();
1420 if guard.contains(&(group, compact_task::TaskType::Dynamic)) {
1421 Some(compact_task::TaskType::Dynamic)
1422 } else if guard.contains(&(group, compact_task::TaskType::SpaceReclaim)) {
1423 Some(compact_task::TaskType::SpaceReclaim)
1424 } else if guard.contains(&(group, compact_task::TaskType::Ttl)) {
1425 Some(compact_task::TaskType::Ttl)
1426 } else if guard.contains(&(group, compact_task::TaskType::Tombstone)) {
1427 Some(compact_task::TaskType::Tombstone)
1428 } else if guard.contains(&(group, compact_task::TaskType::VnodeWatermark)) {
1429 Some(compact_task::TaskType::VnodeWatermark)
1430 } else {
1431 None
1432 }
1433 }
1434}
1435
1436impl Compaction {
1437 pub fn get_compact_task_assignments_by_group_id(
1438 &self,
1439 compaction_group_id: CompactionGroupId,
1440 ) -> Vec<CompactTaskAssignment> {
1441 self.compact_task_assignment
1442 .iter()
1443 .filter_map(|(_, assignment)| {
1444 if assignment
1445 .compact_task
1446 .as_ref()
1447 .is_some_and(|task| task.compaction_group_id == compaction_group_id)
1448 {
1449 Some(CompactTaskAssignment {
1450 compact_task: assignment.compact_task.clone(),
1451 context_id: assignment.context_id,
1452 })
1453 } else {
1454 None
1455 }
1456 })
1457 .collect()
1458 }
1459}
1460
1461#[derive(Clone, Default)]
1462pub struct CompactionGroupStatistic {
1463 pub group_id: CompactionGroupId,
1464 pub group_size: u64,
1465 pub table_statistic: BTreeMap<StateTableId, u64>,
1466 pub compaction_group_config: CompactionGroup,
1467}
1468
1469fn update_table_stats_for_vnode_watermark_trivial_reclaim(
1471 table_stats: &mut PbTableStatsMap,
1472 task: &CompactTask,
1473) {
1474 if task.task_type != TaskType::VnodeWatermark {
1475 return;
1476 }
1477 let mut deleted_table_keys: HashMap<TableId, u64> = HashMap::default();
1478 for s in task.input_ssts.iter().flat_map(|l| l.table_infos.iter()) {
1479 assert_eq!(s.table_ids.len(), 1);
1480 let e = deleted_table_keys.entry(s.table_ids[0]).or_insert(0);
1481 *e += s.total_key_count;
1482 }
1483 for (table_id, delete_count) in deleted_table_keys {
1484 let Some(stats) = table_stats.get_mut(&table_id) else {
1485 continue;
1486 };
1487 if stats.total_key_count == 0 {
1488 continue;
1489 }
1490 let new_total_key_count = stats.total_key_count.saturating_sub(delete_count as i64);
1491 let ratio = new_total_key_count as f64 / stats.total_key_count as f64;
1492 stats.total_key_count = new_total_key_count;
1494 stats.total_key_size = (stats.total_key_size as f64 * ratio).ceil() as i64;
1496 stats.total_value_size = (stats.total_value_size as f64 * ratio).ceil() as i64;
1497 }
1498}
1499
1500#[derive(Debug, Clone)]
1501pub enum GroupState {
1502 Normal,
1504
1505 Emergency(String), WriteStop(String), }
1511
1512impl GroupState {
1513 pub fn is_write_stop(&self) -> bool {
1514 matches!(self, Self::WriteStop(_))
1515 }
1516
1517 pub fn is_emergency(&self) -> bool {
1518 matches!(self, Self::Emergency(_))
1519 }
1520
1521 pub fn reason(&self) -> Option<&str> {
1522 match self {
1523 Self::Emergency(reason) | Self::WriteStop(reason) => Some(reason),
1524 _ => None,
1525 }
1526 }
1527}
1528
1529#[derive(Clone, Default)]
1530pub struct GroupStateValidator;
1531
1532impl GroupStateValidator {
1533 pub fn write_stop_sub_level_count(
1534 level_count: usize,
1535 compaction_config: &CompactionConfig,
1536 ) -> bool {
1537 let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize;
1538 level_count > threshold
1539 }
1540
1541 pub fn write_stop_l0_size(l0_size: u64, compaction_config: &CompactionConfig) -> bool {
1542 l0_size
1543 > compaction_config
1544 .level0_stop_write_threshold_max_size
1545 .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1546 }
1547
1548 pub fn write_stop_l0_file_count(
1549 l0_file_count: usize,
1550 compaction_config: &CompactionConfig,
1551 ) -> bool {
1552 l0_file_count
1553 > compaction_config
1554 .level0_stop_write_threshold_max_sst_count
1555 .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1556 as usize
1557 }
1558
1559 pub fn emergency_l0_file_count(
1560 l0_file_count: usize,
1561 compaction_config: &CompactionConfig,
1562 ) -> bool {
1563 l0_file_count
1564 > compaction_config
1565 .emergency_level0_sst_file_count
1566 .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1567 as usize
1568 }
1569
1570 pub fn emergency_l0_partition_count(
1571 last_l0_sub_level_partition_count: usize,
1572 compaction_config: &CompactionConfig,
1573 ) -> bool {
1574 last_l0_sub_level_partition_count
1575 > compaction_config
1576 .emergency_level0_sub_level_partition
1577 .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1578 as usize
1579 }
1580
1581 pub fn check_single_group_write_stop(
1582 levels: &Levels,
1583 compaction_config: &CompactionConfig,
1584 ) -> GroupState {
1585 if Self::write_stop_sub_level_count(levels.l0.sub_levels.len(), compaction_config) {
1586 return GroupState::WriteStop(format!(
1587 "WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels",
1588 levels.l0.sub_levels.len(),
1589 compaction_config.level0_stop_write_threshold_sub_level_number
1590 ));
1591 }
1592
1593 if Self::write_stop_l0_file_count(
1594 levels
1595 .l0
1596 .sub_levels
1597 .iter()
1598 .map(|l| l.table_infos.len())
1599 .sum(),
1600 compaction_config,
1601 ) {
1602 return GroupState::WriteStop(format!(
1603 "WriteStop(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1604 levels
1605 .l0
1606 .sub_levels
1607 .iter()
1608 .map(|l| l.table_infos.len())
1609 .sum::<usize>(),
1610 compaction_config
1611 .level0_stop_write_threshold_max_sst_count
1612 .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1613 ));
1614 }
1615
1616 if Self::write_stop_l0_size(levels.l0.total_file_size, compaction_config) {
1617 return GroupState::WriteStop(format!(
1618 "WriteStop(l0_size: {}, threshold: {}) too large L0 size",
1619 levels.l0.total_file_size,
1620 compaction_config
1621 .level0_stop_write_threshold_max_size
1622 .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1623 ));
1624 }
1625
1626 GroupState::Normal
1627 }
1628
1629 pub fn check_single_group_emergency(
1630 levels: &Levels,
1631 compaction_config: &CompactionConfig,
1632 ) -> GroupState {
1633 if Self::emergency_l0_file_count(
1634 levels
1635 .l0
1636 .sub_levels
1637 .iter()
1638 .map(|l| l.table_infos.len())
1639 .sum(),
1640 compaction_config,
1641 ) {
1642 return GroupState::Emergency(format!(
1643 "Emergency(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1644 levels
1645 .l0
1646 .sub_levels
1647 .iter()
1648 .map(|l| l.table_infos.len())
1649 .sum::<usize>(),
1650 compaction_config
1651 .emergency_level0_sst_file_count
1652 .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1653 ));
1654 }
1655
1656 if Self::emergency_l0_partition_count(
1657 levels
1658 .l0
1659 .sub_levels
1660 .first()
1661 .map(|l| l.table_infos.len())
1662 .unwrap_or(0),
1663 compaction_config,
1664 ) {
1665 return GroupState::Emergency(format!(
1666 "Emergency(l0_partition_count: {}, threshold: {}) too many L0 partitions",
1667 levels
1668 .l0
1669 .sub_levels
1670 .first()
1671 .map(|l| l.table_infos.len())
1672 .unwrap_or(0),
1673 compaction_config
1674 .emergency_level0_sub_level_partition
1675 .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1676 ));
1677 }
1678
1679 GroupState::Normal
1680 }
1681
1682 pub fn group_state(levels: &Levels, compaction_config: &CompactionConfig) -> GroupState {
1683 let state = Self::check_single_group_write_stop(levels, compaction_config);
1684 if state.is_write_stop() {
1685 return state;
1686 }
1687
1688 Self::check_single_group_emergency(levels, compaction_config)
1689 }
1690}