1use std::collections::{BTreeMap, HashMap, HashSet};
30use std::sync::{Arc, LazyLock};
31use std::time::Instant;
32
33use anyhow::Context;
34use compaction_event_loop::{
35 HummockCompactionEventDispatcher, HummockCompactionEventHandler, HummockCompactionEventLoop,
36 HummockCompactorDedicatedEventLoop,
37};
38use fail::fail_point;
39use itertools::Itertools;
40use parking_lot::Mutex;
41use rand::rng as thread_rng;
42use rand::seq::SliceRandom;
43use risingwave_common::catalog::TableId;
44use risingwave_common::config::meta::default::compaction_config;
45use risingwave_common::util::epoch::Epoch;
46use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask};
47use risingwave_hummock_sdk::compaction_group::StateTableId;
48use risingwave_hummock_sdk::key_range::KeyRange;
49use risingwave_hummock_sdk::level::Levels;
50use risingwave_hummock_sdk::sstable_info::SstableInfo;
51use risingwave_hummock_sdk::table_stats::{
52 PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats,
53};
54use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
55use risingwave_hummock_sdk::version::{GroupDelta, IntraLevelDelta};
56use risingwave_hummock_sdk::{
57 CompactionGroupId, HummockCompactionTaskId, HummockSstableId, HummockSstableObjectId,
58 HummockVersionId, compact_task_to_string, statistics_compact_task,
59};
60use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
61use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
62use risingwave_pb::hummock::{
63 CompactTaskAssignment, CompactionConfig, PbCompactStatus, PbCompactTaskAssignment,
64 SubscribeCompactionEventRequest, TableOption, TableSchema, compact_task,
65};
66use thiserror_ext::AsReport;
67use tokio::sync::RwLockWriteGuard;
68use tokio::sync::mpsc::UnboundedReceiver;
69use tokio::sync::mpsc::error::SendError;
70use tokio::sync::oneshot::Sender;
71use tokio::task::JoinHandle;
72use tonic::Streaming;
73use tracing::warn;
74
75use crate::hummock::compaction::selector::level_selector::PickerInfo;
76use crate::hummock::compaction::selector::{
77 DynamicLevelSelector, DynamicLevelSelectorCore, LocalSelectorStatistic, ManualCompactionOption,
78 ManualCompactionSelector, SpaceReclaimCompactionSelector, TombstoneCompactionSelector,
79 TtlCompactionSelector, VnodeWatermarkCompactionSelector,
80};
81use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig, CompactionSelector};
82use crate::hummock::error::{Error, Result};
83use crate::hummock::manager::transaction::{
84 HummockVersionStatsTransaction, HummockVersionTransaction,
85};
86use crate::hummock::manager::versioning::Versioning;
87use crate::hummock::metrics_utils::{
88 build_compact_task_level_type_metrics_label, trigger_compact_tasks_stat,
89 trigger_local_table_stat,
90};
91use crate::hummock::model::CompactionGroup;
92use crate::hummock::sequence::next_compaction_task_id;
93use crate::hummock::{HummockManager, commit_multi_var, start_measure_real_process_timer};
94use crate::manager::META_NODE_ID;
95use crate::model::BTreeMapTransaction;
96
97pub mod compaction_event_loop;
98pub mod compaction_group_manager;
99pub mod compaction_group_schedule;
100
101static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
102 [
103 TaskStatus::ManualCanceled,
104 TaskStatus::SendFailCanceled,
105 TaskStatus::AssignFailCanceled,
106 TaskStatus::HeartbeatCanceled,
107 TaskStatus::InvalidGroupCanceled,
108 TaskStatus::NoAvailMemoryResourceCanceled,
109 TaskStatus::NoAvailCpuResourceCanceled,
110 TaskStatus::HeartbeatProgressCanceled,
111 ]
112 .into_iter()
113 .collect()
114});
115
116type CompactionRequestChannelItem = (CompactionGroupId, compact_task::TaskType);
117
118fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> {
119 let mut compaction_selectors: HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> =
120 HashMap::default();
121 compaction_selectors.insert(
122 compact_task::TaskType::Dynamic,
123 Box::<DynamicLevelSelector>::default(),
124 );
125 compaction_selectors.insert(
126 compact_task::TaskType::SpaceReclaim,
127 Box::<SpaceReclaimCompactionSelector>::default(),
128 );
129 compaction_selectors.insert(
130 compact_task::TaskType::Ttl,
131 Box::<TtlCompactionSelector>::default(),
132 );
133 compaction_selectors.insert(
134 compact_task::TaskType::Tombstone,
135 Box::<TombstoneCompactionSelector>::default(),
136 );
137 compaction_selectors.insert(
138 compact_task::TaskType::VnodeWatermark,
139 Box::<VnodeWatermarkCompactionSelector>::default(),
140 );
141 compaction_selectors
142}
143
144impl HummockVersionTransaction<'_> {
145 fn apply_compact_task(&mut self, compact_task: &CompactTask) {
146 let mut version_delta = self.new_delta();
147 let trivial_move = compact_task.is_trivial_move_task();
148 version_delta.trivial_move = trivial_move;
149
150 let group_deltas = &mut version_delta
151 .group_deltas
152 .entry(compact_task.compaction_group_id)
153 .or_default()
154 .group_deltas;
155 let mut removed_table_ids_map: BTreeMap<u32, HashSet<HummockSstableId>> =
156 BTreeMap::default();
157
158 for level in &compact_task.input_ssts {
159 let level_idx = level.level_idx;
160
161 removed_table_ids_map
162 .entry(level_idx)
163 .or_default()
164 .extend(level.table_infos.iter().map(|sst| sst.sst_id));
165 }
166
167 for (level_idx, removed_table_ids) in removed_table_ids_map {
168 let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
169 level_idx,
170 0, removed_table_ids,
172 vec![], 0, compact_task.compaction_group_version_id,
175 ));
176
177 group_deltas.push(group_delta);
178 }
179
180 let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
181 compact_task.target_level,
182 compact_task.target_sub_level_id,
183 HashSet::new(), compact_task.sorted_output_ssts.clone(),
185 compact_task.split_weight_by_vnode,
186 compact_task.compaction_group_version_id,
187 ));
188
189 group_deltas.push(group_delta);
190 version_delta.pre_apply();
191 }
192}
193
194#[derive(Default)]
195pub struct Compaction {
196 pub compact_task_assignment: BTreeMap<HummockCompactionTaskId, PbCompactTaskAssignment>,
198 pub compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus>,
200
201 pub _deterministic_mode: bool,
202}
203
204impl HummockManager {
205 pub async fn get_assigned_compact_task_num(&self) -> u64 {
206 self.compaction.read().await.compact_task_assignment.len() as u64
207 }
208
209 pub async fn list_compaction_status(
210 &self,
211 ) -> (Vec<PbCompactStatus>, Vec<CompactTaskAssignment>) {
212 let compaction = self.compaction.read().await;
213 (
214 compaction.compaction_statuses.values().map_into().collect(),
215 compaction
216 .compact_task_assignment
217 .values()
218 .cloned()
219 .collect(),
220 )
221 }
222
223 pub async fn get_compaction_scores(
224 &self,
225 compaction_group_id: CompactionGroupId,
226 ) -> Vec<PickerInfo> {
227 let (status, levels, group) = {
228 let compaction = self.compaction.read().await;
229 let versioning = self.versioning.read().await;
230 let config_manager = self.compaction_group_manager.read().await;
231 match (
232 compaction.compaction_statuses.get(&compaction_group_id),
233 versioning.current_version.levels.get(&compaction_group_id),
234 config_manager.try_get_compaction_group_config(compaction_group_id),
235 ) {
236 (Some(cs), Some(v), Some(cf)) => (cs.to_owned(), v.to_owned(), cf),
237 _ => {
238 return vec![];
239 }
240 }
241 };
242 let dynamic_level_core = DynamicLevelSelectorCore::new(
243 group.compaction_config,
244 Arc::new(CompactionDeveloperConfig::default()),
245 );
246 let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers);
247 ctx.score_levels
248 }
249}
250
251impl HummockManager {
252 pub fn compaction_event_loop(
253 hummock_manager: Arc<Self>,
254 compactor_streams_change_rx: UnboundedReceiver<(
255 u32,
256 Streaming<SubscribeCompactionEventRequest>,
257 )>,
258 ) -> Vec<(JoinHandle<()>, Sender<()>)> {
259 let mut join_handle_vec = Vec::default();
260
261 let hummock_compaction_event_handler =
262 HummockCompactionEventHandler::new(hummock_manager.clone());
263
264 let dedicated_event_loop = HummockCompactorDedicatedEventLoop::new(
265 hummock_manager.clone(),
266 hummock_compaction_event_handler.clone(),
267 );
268
269 let (dedicated_event_loop_join_handle, event_tx, shutdown_tx) = dedicated_event_loop.run();
270 join_handle_vec.push((dedicated_event_loop_join_handle, shutdown_tx));
271
272 let hummock_compaction_event_dispatcher = HummockCompactionEventDispatcher::new(
273 hummock_manager.env.opts.clone(),
274 hummock_compaction_event_handler,
275 Some(event_tx),
276 );
277
278 let event_loop = HummockCompactionEventLoop::new(
279 hummock_compaction_event_dispatcher,
280 hummock_manager.metrics.clone(),
281 compactor_streams_change_rx,
282 );
283
284 let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
285 join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
286
287 join_handle_vec
288 }
289
290 pub fn add_compactor_stream(
291 &self,
292 context_id: u32,
293 req_stream: Streaming<SubscribeCompactionEventRequest>,
294 ) {
295 self.compactor_streams_change_tx
296 .send((context_id, req_stream))
297 .unwrap();
298 }
299
300 pub async fn auto_pick_compaction_group_and_type(
301 &self,
302 ) -> Option<(CompactionGroupId, compact_task::TaskType)> {
303 let mut compaction_group_ids = self.compaction_group_ids().await;
304 compaction_group_ids.shuffle(&mut thread_rng());
305
306 for cg_id in compaction_group_ids {
307 if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) {
308 return Some((cg_id, pick_type));
309 }
310 }
311
312 None
313 }
314
315 async fn auto_pick_compaction_groups_and_type(
318 &self,
319 ) -> (Vec<CompactionGroupId>, compact_task::TaskType) {
320 let mut compaction_group_ids = self.compaction_group_ids().await;
321 compaction_group_ids.shuffle(&mut thread_rng());
322
323 let mut normal_groups = vec![];
324 for cg_id in compaction_group_ids {
325 if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) {
326 if pick_type == TaskType::Dynamic {
327 normal_groups.push(cg_id);
328 } else if normal_groups.is_empty() {
329 return (vec![cg_id], pick_type);
330 }
331 }
332 }
333 (normal_groups, TaskType::Dynamic)
334 }
335}
336
337impl HummockManager {
338 pub async fn get_compact_tasks_impl(
339 &self,
340 compaction_groups: Vec<CompactionGroupId>,
341 max_select_count: usize,
342 selector: &mut Box<dyn CompactionSelector>,
343 ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
344 let deterministic_mode = self.env.opts.compaction_deterministic_test;
345
346 let mut compaction_guard = self.compaction.write().await;
347 let compaction: &mut Compaction = &mut compaction_guard;
348 let mut versioning_guard = self.versioning.write().await;
349 let versioning: &mut Versioning = &mut versioning_guard;
350
351 let _timer = start_measure_real_process_timer!(self, "get_compact_tasks_impl");
352
353 let start_time = Instant::now();
354 let mut compaction_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
355
356 let mut compact_task_assignment =
357 BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
358
359 let mut version = HummockVersionTransaction::new(
360 &mut versioning.current_version,
361 &mut versioning.hummock_version_deltas,
362 self.env.notification_manager(),
363 None,
364 &self.metrics,
365 );
366 let mut version_stats = HummockVersionStatsTransaction::new(
368 &mut versioning.version_stats,
369 self.env.notification_manager(),
370 );
371
372 if deterministic_mode {
373 version.disable_apply_to_txn();
374 }
375 let all_versioned_table_schemas = if self.env.opts.enable_dropped_column_reclaim {
376 self.metadata_manager
377 .catalog_controller
378 .get_versioned_table_schemas()
379 .await
380 .map_err(|e| Error::Internal(e.into()))?
381 } else {
382 HashMap::default()
383 };
384 let mut unschedule_groups = vec![];
385 let mut trivial_tasks = vec![];
386 let mut pick_tasks = vec![];
387 let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts(
388 &self.env.opts,
389 ));
390 'outside: for compaction_group_id in compaction_groups {
391 if pick_tasks.len() >= max_select_count {
392 break;
393 }
394
395 if !version
396 .latest_version()
397 .levels
398 .contains_key(&compaction_group_id)
399 {
400 continue;
401 }
402
403 let group_config = {
407 let config_manager = self.compaction_group_manager.read().await;
408
409 match config_manager.try_get_compaction_group_config(compaction_group_id) {
410 Some(config) => config,
411 None => continue,
412 }
413 };
414
415 let task_id = next_compaction_task_id(&self.env).await?;
417
418 if !compaction_statuses.contains_key(&compaction_group_id) {
419 compaction_statuses.insert(
421 compaction_group_id,
422 CompactStatus::new(
423 compaction_group_id,
424 group_config.compaction_config.max_level,
425 ),
426 );
427 }
428 let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap();
429
430 let can_trivial_move = matches!(selector.task_type(), TaskType::Dynamic)
431 || matches!(selector.task_type(), TaskType::Emergency);
432
433 let mut stats = LocalSelectorStatistic::default();
434 let member_table_ids: Vec<_> = version
435 .latest_version()
436 .state_table_info
437 .compaction_group_member_table_ids(compaction_group_id)
438 .iter()
439 .copied()
440 .collect();
441
442 let mut table_id_to_option: HashMap<TableId, _> = HashMap::default();
443
444 {
445 let guard = self.table_id_to_table_option.read();
446 for table_id in &member_table_ids {
447 if let Some(opts) = guard.get(table_id) {
448 table_id_to_option.insert(*table_id, *opts);
449 }
450 }
451 }
452
453 while let Some(compact_task) = compact_status.get_compact_task(
454 version
455 .latest_version()
456 .get_compaction_group_levels(compaction_group_id),
457 version
458 .latest_version()
459 .state_table_info
460 .compaction_group_member_table_ids(compaction_group_id),
461 task_id as HummockCompactionTaskId,
462 &group_config,
463 &mut stats,
464 selector,
465 &table_id_to_option,
466 developer_config.clone(),
467 &version.latest_version().table_watermarks,
468 &version.latest_version().state_table_info,
469 ) {
470 let target_level_id = compact_task.input.target_level as u32;
471 let compaction_group_version_id = version
472 .latest_version()
473 .get_compaction_group_levels(compaction_group_id)
474 .compaction_group_version_id;
475 let compression_algorithm = match compact_task.compression_algorithm.as_str() {
476 "Lz4" => 1,
477 "Zstd" => 2,
478 _ => 0,
479 };
480 let vnode_partition_count = compact_task.input.vnode_partition_count;
481 let mut compact_task = CompactTask {
482 input_ssts: compact_task.input.input_levels,
483 splits: vec![KeyRange::inf()],
484 sorted_output_ssts: vec![],
485 task_id,
486 target_level: target_level_id,
487 gc_delete_keys: version
490 .latest_version()
491 .get_compaction_group_levels(compaction_group_id)
492 .is_last_level(target_level_id),
493 base_level: compact_task.base_level as u32,
494 task_status: TaskStatus::Pending,
495 compaction_group_id: group_config.group_id,
496 compaction_group_version_id,
497 existing_table_ids: member_table_ids.clone(),
498 compression_algorithm,
499 target_file_size: compact_task.target_file_size,
500 table_options: table_id_to_option
501 .iter()
502 .map(|(table_id, table_option)| {
503 (*table_id, TableOption::from(table_option))
504 })
505 .collect(),
506 current_epoch_time: Epoch::now().0,
507 compaction_filter_mask: group_config.compaction_config.compaction_filter_mask,
508 target_sub_level_id: compact_task.input.target_sub_level_id,
509 task_type: compact_task.compaction_task_type,
510 split_weight_by_vnode: vnode_partition_count,
511 max_sub_compaction: group_config.compaction_config.max_sub_compaction,
512 ..Default::default()
513 };
514
515 let is_trivial_reclaim = compact_task.is_trivial_reclaim();
516 let is_trivial_move = compact_task.is_trivial_move_task();
517 if is_trivial_reclaim || (is_trivial_move && can_trivial_move) {
518 let log_label = if is_trivial_reclaim {
519 "TrivialReclaim"
520 } else {
521 "TrivialMove"
522 };
523 let label = if is_trivial_reclaim {
524 "trivial-space-reclaim"
525 } else {
526 "trivial-move"
527 };
528
529 tracing::debug!(
530 "{} for compaction group {}: input: {:?}, cost time: {:?}",
531 log_label,
532 compact_task.compaction_group_id,
533 compact_task.input_ssts,
534 start_time.elapsed()
535 );
536 compact_task.task_status = TaskStatus::Success;
537 compact_status.report_compact_task(&compact_task);
538 if !is_trivial_reclaim {
539 compact_task
540 .sorted_output_ssts
541 .clone_from(&compact_task.input_ssts[0].table_infos);
542 }
543 update_table_stats_for_vnode_watermark_trivial_reclaim(
544 &mut version_stats.table_stats,
545 &compact_task,
546 );
547 self.metrics
548 .compact_frequency
549 .with_label_values(&[
550 label,
551 &compact_task.compaction_group_id.to_string(),
552 selector.task_type().as_str_name(),
553 "SUCCESS",
554 ])
555 .inc();
556
557 version.apply_compact_task(&compact_task);
558 trivial_tasks.push(compact_task);
559 if trivial_tasks.len() >= self.env.opts.max_trivial_move_task_count_per_loop {
560 break 'outside;
561 }
562 } else {
563 self.calculate_vnode_partition(
564 &mut compact_task,
565 group_config.compaction_config.as_ref(),
566 );
567
568 let table_ids_to_be_compacted = compact_task.build_compact_table_ids();
569
570 let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks) = version
571 .latest_version()
572 .safe_epoch_table_watermarks(&table_ids_to_be_compacted)
573 .into_iter()
574 .partition(|(_table_id, table_watermarke)| {
575 matches!(
576 table_watermarke.watermark_type,
577 WatermarkSerdeType::PkPrefix
578 )
579 });
580
581 compact_task.pk_prefix_table_watermarks = pk_prefix_table_watermarks;
582 compact_task.non_pk_prefix_table_watermarks = non_pk_prefix_table_watermarks;
583
584 compact_task.table_schemas = compact_task
585 .existing_table_ids
586 .iter()
587 .filter_map(|table_id| {
588 all_versioned_table_schemas.get(table_id).map(|column_ids| {
589 (
590 *table_id,
591 TableSchema {
592 column_ids: column_ids.clone(),
593 },
594 )
595 })
596 })
597 .collect();
598
599 compact_task_assignment.insert(
600 compact_task.task_id,
601 CompactTaskAssignment {
602 compact_task: Some(compact_task.clone().into()),
603 context_id: META_NODE_ID, },
605 );
606
607 pick_tasks.push(compact_task);
608 break;
609 }
610
611 stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
612 stats = LocalSelectorStatistic::default();
613 }
614 if pick_tasks
615 .last()
616 .map(|task| task.compaction_group_id != compaction_group_id)
617 .unwrap_or(true)
618 {
619 unschedule_groups.push(compaction_group_id);
620 }
621 stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
622 }
623
624 if !trivial_tasks.is_empty() {
625 commit_multi_var!(
626 self.meta_store_ref(),
627 compaction_statuses,
628 compact_task_assignment,
629 version,
630 version_stats
631 )?;
632 self.metrics
633 .compact_task_batch_count
634 .with_label_values(&["batch_trivial_move"])
635 .observe(trivial_tasks.len() as f64);
636
637 for trivial_task in &trivial_tasks {
638 self.metrics
639 .compact_task_trivial_move_sst_count
640 .with_label_values(&[&trivial_task.compaction_group_id.to_string()])
641 .observe(trivial_task.input_ssts[0].table_infos.len() as _);
642 }
643
644 drop(versioning_guard);
645 } else {
646 drop(versioning_guard);
649 commit_multi_var!(
650 self.meta_store_ref(),
651 compaction_statuses,
652 compact_task_assignment
653 )?;
654 }
655 drop(compaction_guard);
656 if !pick_tasks.is_empty() {
657 self.metrics
658 .compact_task_batch_count
659 .with_label_values(&["batch_get_compact_task"])
660 .observe(pick_tasks.len() as f64);
661 }
662
663 for compact_task in &mut pick_tasks {
664 let compaction_group_id = compact_task.compaction_group_id;
665
666 self.compactor_manager
668 .initiate_task_heartbeat(compact_task.clone());
669
670 compact_task.task_status = TaskStatus::Pending;
672 let compact_task_statistics = statistics_compact_task(compact_task);
673
674 let level_type_label = build_compact_task_level_type_metrics_label(
675 compact_task.input_ssts[0].level_idx as usize,
676 compact_task.input_ssts.last().unwrap().level_idx as usize,
677 );
678
679 let level_count = compact_task.input_ssts.len();
680 if compact_task.input_ssts[0].level_idx == 0 {
681 self.metrics
682 .l0_compact_level_count
683 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
684 .observe(level_count as _);
685 }
686
687 self.metrics
688 .compact_task_size
689 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
690 .observe(compact_task_statistics.total_file_size as _);
691
692 self.metrics
693 .compact_task_size
694 .with_label_values(&[
695 &compaction_group_id.to_string(),
696 &format!("{} uncompressed", level_type_label),
697 ])
698 .observe(compact_task_statistics.total_uncompressed_file_size as _);
699
700 self.metrics
701 .compact_task_file_count
702 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
703 .observe(compact_task_statistics.total_file_count as _);
704
705 tracing::trace!(
706 "For compaction group {}: pick up {} {} sub_level in level {} to compact to target {}. cost time: {:?} compact_task_statistics {:?}",
707 compaction_group_id,
708 level_count,
709 compact_task.input_ssts[0].level_type.as_str_name(),
710 compact_task.input_ssts[0].level_idx,
711 compact_task.target_level,
712 start_time.elapsed(),
713 compact_task_statistics
714 );
715 }
716
717 #[cfg(test)]
718 {
719 self.check_state_consistency().await;
720 }
721 pick_tasks.extend(trivial_tasks);
722 Ok((pick_tasks, unschedule_groups))
723 }
724
725 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
727 fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore(
728 anyhow::anyhow!("failpoint metastore err")
729 )));
730 let ret = self
731 .cancel_compact_task_impl(vec![task_id], task_status)
732 .await?;
733 Ok(ret[0])
734 }
735
736 pub async fn cancel_compact_tasks(
737 &self,
738 tasks: Vec<u64>,
739 task_status: TaskStatus,
740 ) -> Result<Vec<bool>> {
741 self.cancel_compact_task_impl(tasks, task_status).await
742 }
743
744 async fn cancel_compact_task_impl(
745 &self,
746 task_ids: Vec<u64>,
747 task_status: TaskStatus,
748 ) -> Result<Vec<bool>> {
749 assert!(CANCEL_STATUS_SET.contains(&task_status));
750 let tasks = task_ids
751 .into_iter()
752 .map(|task_id| ReportTask {
753 task_id,
754 task_status,
755 sorted_output_ssts: vec![],
756 table_stats_change: HashMap::default(),
757 object_timestamps: HashMap::default(),
758 })
759 .collect_vec();
760 let rets = self.report_compact_tasks(tasks).await?;
761 #[cfg(test)]
762 {
763 self.check_state_consistency().await;
764 }
765 Ok(rets)
766 }
767
768 async fn get_compact_tasks(
769 &self,
770 mut compaction_groups: Vec<CompactionGroupId>,
771 max_select_count: usize,
772 selector: &mut Box<dyn CompactionSelector>,
773 ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
774 fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
775 anyhow::anyhow!("failpoint metastore error")
776 )));
777 compaction_groups.shuffle(&mut thread_rng());
778 let (mut tasks, groups) = self
779 .get_compact_tasks_impl(compaction_groups, max_select_count, selector)
780 .await?;
781 tasks.retain(|task| {
782 if task.task_status == TaskStatus::Success {
783 debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
784 false
785 } else {
786 true
787 }
788 });
789 Ok((tasks, groups))
790 }
791
792 pub async fn get_compact_task(
793 &self,
794 compaction_group_id: CompactionGroupId,
795 selector: &mut Box<dyn CompactionSelector>,
796 ) -> Result<Option<CompactTask>> {
797 fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
798 anyhow::anyhow!("failpoint metastore error")
799 )));
800
801 let (normal_tasks, _) = self
802 .get_compact_tasks_impl(vec![compaction_group_id], 1, selector)
803 .await?;
804 for task in normal_tasks {
805 if task.task_status != TaskStatus::Success {
806 return Ok(Some(task));
807 }
808 debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
809 }
810 Ok(None)
811 }
812
813 pub async fn manual_get_compact_task(
814 &self,
815 compaction_group_id: CompactionGroupId,
816 manual_compaction_option: ManualCompactionOption,
817 ) -> Result<Option<CompactTask>> {
818 let mut selector: Box<dyn CompactionSelector> =
819 Box::new(ManualCompactionSelector::new(manual_compaction_option));
820 self.get_compact_task(compaction_group_id, &mut selector)
821 .await
822 }
823
824 pub async fn report_compact_task(
825 &self,
826 task_id: u64,
827 task_status: TaskStatus,
828 sorted_output_ssts: Vec<SstableInfo>,
829 table_stats_change: Option<PbTableStatsMap>,
830 object_timestamps: HashMap<HummockSstableObjectId, u64>,
831 ) -> Result<bool> {
832 let rets = self
833 .report_compact_tasks(vec![ReportTask {
834 task_id,
835 task_status,
836 sorted_output_ssts,
837 table_stats_change: table_stats_change.unwrap_or_default(),
838 object_timestamps,
839 }])
840 .await?;
841 Ok(rets[0])
842 }
843
844 pub async fn report_compact_tasks(&self, report_tasks: Vec<ReportTask>) -> Result<Vec<bool>> {
845 let compaction_guard = self.compaction.write().await;
846 let versioning_guard = self.versioning.write().await;
847
848 self.report_compact_tasks_impl(report_tasks, compaction_guard, versioning_guard)
849 .await
850 }
851
852 pub async fn report_compact_tasks_impl(
860 &self,
861 report_tasks: Vec<ReportTask>,
862 mut compaction_guard: RwLockWriteGuard<'_, Compaction>,
863 mut versioning_guard: RwLockWriteGuard<'_, Versioning>,
864 ) -> Result<Vec<bool>> {
865 let deterministic_mode = self.env.opts.compaction_deterministic_test;
866 let compaction: &mut Compaction = &mut compaction_guard;
867 let start_time = Instant::now();
868 let original_keys = compaction.compaction_statuses.keys().cloned().collect_vec();
869 let mut compact_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
870 let mut rets = vec![false; report_tasks.len()];
871 let mut compact_task_assignment =
872 BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
873 let versioning: &mut Versioning = &mut versioning_guard;
875 let _timer = start_measure_real_process_timer!(self, "report_compact_tasks");
876
877 for group_id in original_keys {
879 if !versioning.current_version.levels.contains_key(&group_id) {
880 compact_statuses.remove(group_id);
881 }
882 }
883 let mut tasks = vec![];
884
885 let mut version = HummockVersionTransaction::new(
886 &mut versioning.current_version,
887 &mut versioning.hummock_version_deltas,
888 self.env.notification_manager(),
889 None,
890 &self.metrics,
891 );
892
893 if deterministic_mode {
894 version.disable_apply_to_txn();
895 }
896
897 let mut version_stats = HummockVersionStatsTransaction::new(
898 &mut versioning.version_stats,
899 self.env.notification_manager(),
900 );
901 let mut success_count = 0;
902 for (idx, task) in report_tasks.into_iter().enumerate() {
903 rets[idx] = true;
904 let mut compact_task = match compact_task_assignment.remove(task.task_id) {
905 Some(compact_task) => CompactTask::from(compact_task.compact_task.unwrap()),
906 None => {
907 tracing::warn!("{}", format!("compact task {} not found", task.task_id));
908 rets[idx] = false;
909 continue;
910 }
911 };
912
913 {
914 compact_task.task_status = task.task_status;
916 compact_task.sorted_output_ssts = task.sorted_output_ssts;
917 }
918
919 match compact_statuses.get_mut(compact_task.compaction_group_id) {
920 Some(mut compact_status) => {
921 compact_status.report_compact_task(&compact_task);
922 }
923 None => {
924 compact_task.task_status = TaskStatus::InvalidGroupCanceled;
930 }
931 }
932
933 let is_success = if let TaskStatus::Success = compact_task.task_status {
934 match self
935 .report_compaction_sanity_check(&task.object_timestamps)
936 .await
937 {
938 Err(e) => {
939 warn!(
940 "failed to commit compaction task {} {}",
941 compact_task.task_id,
942 e.as_report()
943 );
944 compact_task.task_status = TaskStatus::RetentionTimeRejected;
945 false
946 }
947 _ => {
948 let group = version
949 .latest_version()
950 .levels
951 .get(&compact_task.compaction_group_id)
952 .unwrap();
953 let is_expired = compact_task.is_expired(group.compaction_group_version_id);
954 if is_expired {
955 compact_task.task_status = TaskStatus::InputOutdatedCanceled;
956 warn!(
957 "The task may be expired because of group split, task:\n {:?}",
958 compact_task_to_string(&compact_task)
959 );
960 }
961 !is_expired
962 }
963 }
964 } else {
965 false
966 };
967 if is_success {
968 success_count += 1;
969 version.apply_compact_task(&compact_task);
970 if purge_prost_table_stats(&mut version_stats.table_stats, version.latest_version())
971 {
972 self.metrics.version_stats.reset();
973 versioning.local_metrics.clear();
974 }
975 add_prost_table_stats_map(&mut version_stats.table_stats, &task.table_stats_change);
976 trigger_local_table_stat(
977 &self.metrics,
978 &mut versioning.local_metrics,
979 &version_stats,
980 &task.table_stats_change,
981 );
982 }
983 tasks.push(compact_task);
984 }
985 if success_count > 0 {
986 commit_multi_var!(
987 self.meta_store_ref(),
988 compact_statuses,
989 compact_task_assignment,
990 version,
991 version_stats
992 )?;
993
994 self.metrics
995 .compact_task_batch_count
996 .with_label_values(&["batch_report_task"])
997 .observe(success_count as f64);
998 } else {
999 commit_multi_var!(
1001 self.meta_store_ref(),
1002 compact_statuses,
1003 compact_task_assignment
1004 )?;
1005 }
1006
1007 let mut success_groups = vec![];
1008 for compact_task in &tasks {
1009 self.compactor_manager
1010 .remove_task_heartbeat(compact_task.task_id);
1011 tracing::trace!(
1012 "Reported compaction task. {}. cost time: {:?}",
1013 compact_task_to_string(compact_task),
1014 start_time.elapsed(),
1015 );
1016
1017 if !deterministic_mode
1018 && (matches!(compact_task.task_type, compact_task::TaskType::Dynamic)
1019 || matches!(compact_task.task_type, compact_task::TaskType::Emergency))
1020 {
1021 self.try_send_compaction_request(
1023 compact_task.compaction_group_id,
1024 compact_task::TaskType::Dynamic,
1025 );
1026 }
1027
1028 if compact_task.task_status == TaskStatus::Success {
1029 success_groups.push(compact_task.compaction_group_id);
1030 }
1031 }
1032
1033 trigger_compact_tasks_stat(
1034 &self.metrics,
1035 &tasks,
1036 &compaction.compaction_statuses,
1037 &versioning_guard.current_version,
1038 );
1039 drop(versioning_guard);
1040 if !success_groups.is_empty() {
1041 self.try_update_write_limits(&success_groups).await;
1042 }
1043 Ok(rets)
1044 }
1045
1046 pub async fn trigger_compaction_deterministic(
1049 &self,
1050 _base_version_id: HummockVersionId,
1051 compaction_groups: Vec<CompactionGroupId>,
1052 ) -> Result<()> {
1053 self.on_current_version(|old_version| {
1054 tracing::info!(
1055 "Trigger compaction for version {}, groups {:?}",
1056 old_version.id,
1057 compaction_groups
1058 );
1059 })
1060 .await;
1061
1062 if compaction_groups.is_empty() {
1063 return Ok(());
1064 }
1065 for compaction_group in compaction_groups {
1066 self.try_send_compaction_request(compaction_group, compact_task::TaskType::Dynamic);
1067 }
1068 Ok(())
1069 }
1070
1071 pub async fn trigger_manual_compaction(
1072 &self,
1073 compaction_group: CompactionGroupId,
1074 manual_compaction_option: ManualCompactionOption,
1075 ) -> Result<()> {
1076 let start_time = Instant::now();
1077
1078 let compactor = match self.compactor_manager.next_compactor() {
1080 Some(compactor) => compactor,
1081 None => {
1082 tracing::warn!("trigger_manual_compaction No compactor is available.");
1083 return Err(anyhow::anyhow!(
1084 "trigger_manual_compaction No compactor is available. compaction_group {}",
1085 compaction_group
1086 )
1087 .into());
1088 }
1089 };
1090
1091 let compact_task = self
1093 .manual_get_compact_task(compaction_group, manual_compaction_option)
1094 .await;
1095 let compact_task = match compact_task {
1096 Ok(Some(compact_task)) => compact_task,
1097 Ok(None) => {
1098 return Err(anyhow::anyhow!(
1100 "trigger_manual_compaction No compaction_task is available. compaction_group {}",
1101 compaction_group
1102 )
1103 .into());
1104 }
1105 Err(err) => {
1106 tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
1107
1108 return Err(anyhow::anyhow!(err)
1109 .context(format!(
1110 "Failed to get compaction task for compaction_group {}",
1111 compaction_group,
1112 ))
1113 .into());
1114 }
1115 };
1116
1117 let compact_task_string = compact_task_to_string(&compact_task);
1119 compactor
1121 .send_event(ResponseEvent::CompactTask(compact_task.into()))
1122 .with_context(|| {
1123 format!(
1124 "Failed to trigger compaction task for compaction_group {}",
1125 compaction_group,
1126 )
1127 })?;
1128
1129 tracing::info!(
1130 "Trigger manual compaction task. {}. cost time: {:?}",
1131 &compact_task_string,
1132 start_time.elapsed(),
1133 );
1134
1135 Ok(())
1136 }
1137
1138 pub fn try_send_compaction_request(
1140 &self,
1141 compaction_group: CompactionGroupId,
1142 task_type: compact_task::TaskType,
1143 ) -> bool {
1144 match self
1145 .compaction_state
1146 .try_sched_compaction(compaction_group, task_type)
1147 {
1148 Ok(_) => true,
1149 Err(e) => {
1150 tracing::error!(
1151 error = %e.as_report(),
1152 "failed to send compaction request for compaction group {}",
1153 compaction_group,
1154 );
1155 false
1156 }
1157 }
1158 }
1159
1160 pub(crate) fn calculate_vnode_partition(
1161 &self,
1162 compact_task: &mut CompactTask,
1163 compaction_config: &CompactionConfig,
1164 ) {
1165 if compact_task.target_level > compact_task.base_level {
1169 return;
1170 }
1171 if compaction_config.split_weight_by_vnode > 0 {
1172 for table_id in &compact_task.existing_table_ids {
1173 compact_task
1174 .table_vnode_partition
1175 .insert(*table_id, compact_task.split_weight_by_vnode);
1176 }
1177 } else {
1178 let mut table_size_info: HashMap<TableId, u64> = HashMap::default();
1179 let mut existing_table_ids: HashSet<TableId> = HashSet::default();
1180 for input_ssts in &compact_task.input_ssts {
1181 for sst in &input_ssts.table_infos {
1182 existing_table_ids.extend(sst.table_ids.iter());
1183 for table_id in &sst.table_ids {
1184 *table_size_info.entry(*table_id).or_default() +=
1185 sst.sst_size / (sst.table_ids.len() as u64);
1186 }
1187 }
1188 }
1189 compact_task
1190 .existing_table_ids
1191 .retain(|table_id| existing_table_ids.contains(table_id));
1192
1193 let hybrid_vnode_count = self.env.opts.hybrid_partition_node_count;
1194 let default_partition_count = self.env.opts.partition_vnode_count;
1195 let compact_task_table_size_partition_threshold_low = self
1197 .env
1198 .opts
1199 .compact_task_table_size_partition_threshold_low;
1200 let compact_task_table_size_partition_threshold_high = self
1201 .env
1202 .opts
1203 .compact_task_table_size_partition_threshold_high;
1204 let table_write_throughput_statistic_manager =
1206 self.table_write_throughput_statistic_manager.read();
1207 let timestamp = chrono::Utc::now().timestamp();
1208 for (table_id, compact_table_size) in table_size_info {
1209 let write_throughput = table_write_throughput_statistic_manager
1210 .get_table_throughput_descending(table_id, timestamp)
1211 .peekable()
1212 .peek()
1213 .map(|item| item.throughput)
1214 .unwrap_or(0);
1215 if compact_table_size > compact_task_table_size_partition_threshold_high
1216 && default_partition_count > 0
1217 {
1218 compact_task
1219 .table_vnode_partition
1220 .insert(table_id, default_partition_count);
1221 } else if (compact_table_size > compact_task_table_size_partition_threshold_low
1222 || (write_throughput > self.env.opts.table_high_write_throughput_threshold
1223 && compact_table_size > compaction_config.target_file_size_base))
1224 && hybrid_vnode_count > 0
1225 {
1226 compact_task
1228 .table_vnode_partition
1229 .insert(table_id, hybrid_vnode_count);
1230 } else if compact_table_size > compaction_config.target_file_size_base {
1231 compact_task.table_vnode_partition.insert(table_id, 1);
1233 }
1234 }
1235 compact_task
1236 .table_vnode_partition
1237 .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id));
1238 }
1239 }
1240
1241 pub fn compactor_manager_ref(&self) -> crate::hummock::CompactorManagerRef {
1242 self.compactor_manager.clone()
1243 }
1244}
1245
1246#[cfg(any(test, feature = "test"))]
1247impl HummockManager {
1248 pub async fn compaction_task_from_assignment_for_test(
1249 &self,
1250 task_id: u64,
1251 ) -> Option<CompactTaskAssignment> {
1252 let compaction_guard = self.compaction.read().await;
1253 let assignment_ref = &compaction_guard.compact_task_assignment;
1254 assignment_ref.get(&task_id).cloned()
1255 }
1256
1257 pub async fn report_compact_task_for_test(
1258 &self,
1259 task_id: u64,
1260 compact_task: Option<CompactTask>,
1261 task_status: TaskStatus,
1262 sorted_output_ssts: Vec<SstableInfo>,
1263 table_stats_change: Option<PbTableStatsMap>,
1264 ) -> Result<()> {
1265 if let Some(task) = compact_task {
1266 let mut guard = self.compaction.write().await;
1267 guard.compact_task_assignment.insert(
1268 task_id,
1269 CompactTaskAssignment {
1270 compact_task: Some(task.into()),
1271 context_id: 0,
1272 },
1273 );
1274 }
1275
1276 self.report_compact_tasks(vec![ReportTask {
1279 task_id,
1280 task_status,
1281 sorted_output_ssts,
1282 table_stats_change: table_stats_change.unwrap_or_default(),
1283 object_timestamps: HashMap::default(),
1284 }])
1285 .await?;
1286 Ok(())
1287 }
1288}
1289
1290#[derive(Debug, Default)]
1291pub struct CompactionState {
1292 scheduled: Mutex<HashSet<(CompactionGroupId, compact_task::TaskType)>>,
1293}
1294
1295impl CompactionState {
1296 pub fn new() -> Self {
1297 Self {
1298 scheduled: Default::default(),
1299 }
1300 }
1301
1302 pub fn try_sched_compaction(
1304 &self,
1305 compaction_group: CompactionGroupId,
1306 task_type: TaskType,
1307 ) -> std::result::Result<bool, SendError<CompactionRequestChannelItem>> {
1308 let mut guard = self.scheduled.lock();
1309 let key = (compaction_group, task_type);
1310 if guard.contains(&key) {
1311 return Ok(false);
1312 }
1313 guard.insert(key);
1314 Ok(true)
1315 }
1316
1317 pub fn unschedule(
1318 &self,
1319 compaction_group: CompactionGroupId,
1320 task_type: compact_task::TaskType,
1321 ) {
1322 self.scheduled.lock().remove(&(compaction_group, task_type));
1323 }
1324
1325 pub fn auto_pick_type(&self, group: CompactionGroupId) -> Option<TaskType> {
1326 let guard = self.scheduled.lock();
1327 if guard.contains(&(group, compact_task::TaskType::Dynamic)) {
1328 Some(compact_task::TaskType::Dynamic)
1329 } else if guard.contains(&(group, compact_task::TaskType::SpaceReclaim)) {
1330 Some(compact_task::TaskType::SpaceReclaim)
1331 } else if guard.contains(&(group, compact_task::TaskType::Ttl)) {
1332 Some(compact_task::TaskType::Ttl)
1333 } else if guard.contains(&(group, compact_task::TaskType::Tombstone)) {
1334 Some(compact_task::TaskType::Tombstone)
1335 } else if guard.contains(&(group, compact_task::TaskType::VnodeWatermark)) {
1336 Some(compact_task::TaskType::VnodeWatermark)
1337 } else {
1338 None
1339 }
1340 }
1341}
1342
1343impl Compaction {
1344 pub fn get_compact_task_assignments_by_group_id(
1345 &self,
1346 compaction_group_id: CompactionGroupId,
1347 ) -> Vec<CompactTaskAssignment> {
1348 self.compact_task_assignment
1349 .iter()
1350 .filter_map(|(_, assignment)| {
1351 if assignment
1352 .compact_task
1353 .as_ref()
1354 .is_some_and(|task| task.compaction_group_id == compaction_group_id)
1355 {
1356 Some(CompactTaskAssignment {
1357 compact_task: assignment.compact_task.clone(),
1358 context_id: assignment.context_id,
1359 })
1360 } else {
1361 None
1362 }
1363 })
1364 .collect()
1365 }
1366}
1367
1368#[derive(Clone, Default)]
1369pub struct CompactionGroupStatistic {
1370 pub group_id: CompactionGroupId,
1371 pub group_size: u64,
1372 pub table_statistic: BTreeMap<StateTableId, u64>,
1373 pub compaction_group_config: CompactionGroup,
1374}
1375
1376fn update_table_stats_for_vnode_watermark_trivial_reclaim(
1378 table_stats: &mut PbTableStatsMap,
1379 task: &CompactTask,
1380) {
1381 if task.task_type != TaskType::VnodeWatermark {
1382 return;
1383 }
1384 let mut deleted_table_keys: HashMap<TableId, u64> = HashMap::default();
1385 for s in task.input_ssts.iter().flat_map(|l| l.table_infos.iter()) {
1386 assert_eq!(s.table_ids.len(), 1);
1387 let e = deleted_table_keys.entry(s.table_ids[0]).or_insert(0);
1388 *e += s.total_key_count;
1389 }
1390 for (table_id, delete_count) in deleted_table_keys {
1391 let Some(stats) = table_stats.get_mut(&table_id.as_raw_id()) else {
1392 continue;
1393 };
1394 if stats.total_key_count == 0 {
1395 continue;
1396 }
1397 let new_total_key_count = stats.total_key_count.saturating_sub(delete_count as i64);
1398 let ratio = new_total_key_count as f64 / stats.total_key_count as f64;
1399 stats.total_key_count = new_total_key_count;
1401 stats.total_key_size = (stats.total_key_size as f64 * ratio).ceil() as i64;
1403 stats.total_value_size = (stats.total_value_size as f64 * ratio).ceil() as i64;
1404 }
1405}
1406
1407#[derive(Debug, Clone)]
1408pub enum GroupState {
1409 Normal,
1411
1412 Emergency(String), WriteStop(String), }
1418
1419impl GroupState {
1420 pub fn is_write_stop(&self) -> bool {
1421 matches!(self, Self::WriteStop(_))
1422 }
1423
1424 pub fn is_emergency(&self) -> bool {
1425 matches!(self, Self::Emergency(_))
1426 }
1427
1428 pub fn reason(&self) -> Option<&str> {
1429 match self {
1430 Self::Emergency(reason) | Self::WriteStop(reason) => Some(reason),
1431 _ => None,
1432 }
1433 }
1434}
1435
1436#[derive(Clone, Default)]
1437pub struct GroupStateValidator;
1438
1439impl GroupStateValidator {
1440 pub fn write_stop_sub_level_count(
1441 level_count: usize,
1442 compaction_config: &CompactionConfig,
1443 ) -> bool {
1444 let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize;
1445 level_count > threshold
1446 }
1447
1448 pub fn write_stop_l0_size(l0_size: u64, compaction_config: &CompactionConfig) -> bool {
1449 l0_size
1450 > compaction_config
1451 .level0_stop_write_threshold_max_size
1452 .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1453 }
1454
1455 pub fn write_stop_l0_file_count(
1456 l0_file_count: usize,
1457 compaction_config: &CompactionConfig,
1458 ) -> bool {
1459 l0_file_count
1460 > compaction_config
1461 .level0_stop_write_threshold_max_sst_count
1462 .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1463 as usize
1464 }
1465
1466 pub fn emergency_l0_file_count(
1467 l0_file_count: usize,
1468 compaction_config: &CompactionConfig,
1469 ) -> bool {
1470 l0_file_count
1471 > compaction_config
1472 .emergency_level0_sst_file_count
1473 .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1474 as usize
1475 }
1476
1477 pub fn emergency_l0_partition_count(
1478 last_l0_sub_level_partition_count: usize,
1479 compaction_config: &CompactionConfig,
1480 ) -> bool {
1481 last_l0_sub_level_partition_count
1482 > compaction_config
1483 .emergency_level0_sub_level_partition
1484 .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1485 as usize
1486 }
1487
1488 pub fn check_single_group_write_stop(
1489 levels: &Levels,
1490 compaction_config: &CompactionConfig,
1491 ) -> GroupState {
1492 if Self::write_stop_sub_level_count(levels.l0.sub_levels.len(), compaction_config) {
1493 return GroupState::WriteStop(format!(
1494 "WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels",
1495 levels.l0.sub_levels.len(),
1496 compaction_config.level0_stop_write_threshold_sub_level_number
1497 ));
1498 }
1499
1500 if Self::write_stop_l0_file_count(
1501 levels
1502 .l0
1503 .sub_levels
1504 .iter()
1505 .map(|l| l.table_infos.len())
1506 .sum(),
1507 compaction_config,
1508 ) {
1509 return GroupState::WriteStop(format!(
1510 "WriteStop(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1511 levels
1512 .l0
1513 .sub_levels
1514 .iter()
1515 .map(|l| l.table_infos.len())
1516 .sum::<usize>(),
1517 compaction_config
1518 .level0_stop_write_threshold_max_sst_count
1519 .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1520 ));
1521 }
1522
1523 if Self::write_stop_l0_size(levels.l0.total_file_size, compaction_config) {
1524 return GroupState::WriteStop(format!(
1525 "WriteStop(l0_size: {}, threshold: {}) too large L0 size",
1526 levels.l0.total_file_size,
1527 compaction_config
1528 .level0_stop_write_threshold_max_size
1529 .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1530 ));
1531 }
1532
1533 GroupState::Normal
1534 }
1535
1536 pub fn check_single_group_emergency(
1537 levels: &Levels,
1538 compaction_config: &CompactionConfig,
1539 ) -> GroupState {
1540 if Self::emergency_l0_file_count(
1541 levels
1542 .l0
1543 .sub_levels
1544 .iter()
1545 .map(|l| l.table_infos.len())
1546 .sum(),
1547 compaction_config,
1548 ) {
1549 return GroupState::Emergency(format!(
1550 "Emergency(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1551 levels
1552 .l0
1553 .sub_levels
1554 .iter()
1555 .map(|l| l.table_infos.len())
1556 .sum::<usize>(),
1557 compaction_config
1558 .emergency_level0_sst_file_count
1559 .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1560 ));
1561 }
1562
1563 if Self::emergency_l0_partition_count(
1564 levels
1565 .l0
1566 .sub_levels
1567 .first()
1568 .map(|l| l.table_infos.len())
1569 .unwrap_or(0),
1570 compaction_config,
1571 ) {
1572 return GroupState::Emergency(format!(
1573 "Emergency(l0_partition_count: {}, threshold: {}) too many L0 partitions",
1574 levels
1575 .l0
1576 .sub_levels
1577 .first()
1578 .map(|l| l.table_infos.len())
1579 .unwrap_or(0),
1580 compaction_config
1581 .emergency_level0_sub_level_partition
1582 .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1583 ));
1584 }
1585
1586 GroupState::Normal
1587 }
1588
1589 pub fn group_state(levels: &Levels, compaction_config: &CompactionConfig) -> GroupState {
1590 let state = Self::check_single_group_write_stop(levels, compaction_config);
1591 if state.is_write_stop() {
1592 return state;
1593 }
1594
1595 Self::check_single_group_emergency(levels, compaction_config)
1596 }
1597}