1use std::collections::{BTreeMap, HashMap, HashSet};
30use std::sync::{Arc, LazyLock};
31use std::time::Instant;
32
33use anyhow::Context;
34use compaction_event_loop::{
35 HummockCompactionEventDispatcher, HummockCompactionEventHandler, HummockCompactionEventLoop,
36 HummockCompactorDedicatedEventLoop,
37};
38use fail::fail_point;
39use itertools::Itertools;
40use parking_lot::Mutex;
41use rand::rng as thread_rng;
42use rand::seq::SliceRandom;
43use risingwave_common::config::meta::default::compaction_config;
44use risingwave_common::util::epoch::Epoch;
45use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask};
46use risingwave_hummock_sdk::compaction_group::StateTableId;
47use risingwave_hummock_sdk::key_range::KeyRange;
48use risingwave_hummock_sdk::level::Levels;
49use risingwave_hummock_sdk::sstable_info::SstableInfo;
50use risingwave_hummock_sdk::table_stats::{
51 PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats,
52};
53use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
54use risingwave_hummock_sdk::version::{GroupDelta, IntraLevelDelta};
55use risingwave_hummock_sdk::{
56 CompactionGroupId, HummockCompactionTaskId, HummockSstableId, HummockSstableObjectId,
57 HummockVersionId, compact_task_to_string, statistics_compact_task,
58};
59use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
60use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
61use risingwave_pb::hummock::{
62 CompactTaskAssignment, CompactionConfig, PbCompactStatus, PbCompactTaskAssignment,
63 SubscribeCompactionEventRequest, TableOption, TableSchema, compact_task,
64};
65use thiserror_ext::AsReport;
66use tokio::sync::RwLockWriteGuard;
67use tokio::sync::mpsc::UnboundedReceiver;
68use tokio::sync::mpsc::error::SendError;
69use tokio::sync::oneshot::Sender;
70use tokio::task::JoinHandle;
71use tonic::Streaming;
72use tracing::warn;
73
74use crate::hummock::compaction::selector::level_selector::PickerInfo;
75use crate::hummock::compaction::selector::{
76 DynamicLevelSelector, DynamicLevelSelectorCore, LocalSelectorStatistic, ManualCompactionOption,
77 ManualCompactionSelector, SpaceReclaimCompactionSelector, TombstoneCompactionSelector,
78 TtlCompactionSelector, VnodeWatermarkCompactionSelector,
79};
80use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig, CompactionSelector};
81use crate::hummock::error::{Error, Result};
82use crate::hummock::manager::transaction::{
83 HummockVersionStatsTransaction, HummockVersionTransaction,
84};
85use crate::hummock::manager::versioning::Versioning;
86use crate::hummock::metrics_utils::{
87 build_compact_task_level_type_metrics_label, trigger_compact_tasks_stat,
88 trigger_local_table_stat,
89};
90use crate::hummock::model::CompactionGroup;
91use crate::hummock::sequence::next_compaction_task_id;
92use crate::hummock::{HummockManager, commit_multi_var, start_measure_real_process_timer};
93use crate::manager::META_NODE_ID;
94use crate::model::BTreeMapTransaction;
95
96pub mod compaction_event_loop;
97pub mod compaction_group_manager;
98pub mod compaction_group_schedule;
99
100static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
101 [
102 TaskStatus::ManualCanceled,
103 TaskStatus::SendFailCanceled,
104 TaskStatus::AssignFailCanceled,
105 TaskStatus::HeartbeatCanceled,
106 TaskStatus::InvalidGroupCanceled,
107 TaskStatus::NoAvailMemoryResourceCanceled,
108 TaskStatus::NoAvailCpuResourceCanceled,
109 TaskStatus::HeartbeatProgressCanceled,
110 ]
111 .into_iter()
112 .collect()
113});
114
115type CompactionRequestChannelItem = (CompactionGroupId, compact_task::TaskType);
116
117fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> {
118 let mut compaction_selectors: HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> =
119 HashMap::default();
120 compaction_selectors.insert(
121 compact_task::TaskType::Dynamic,
122 Box::<DynamicLevelSelector>::default(),
123 );
124 compaction_selectors.insert(
125 compact_task::TaskType::SpaceReclaim,
126 Box::<SpaceReclaimCompactionSelector>::default(),
127 );
128 compaction_selectors.insert(
129 compact_task::TaskType::Ttl,
130 Box::<TtlCompactionSelector>::default(),
131 );
132 compaction_selectors.insert(
133 compact_task::TaskType::Tombstone,
134 Box::<TombstoneCompactionSelector>::default(),
135 );
136 compaction_selectors.insert(
137 compact_task::TaskType::VnodeWatermark,
138 Box::<VnodeWatermarkCompactionSelector>::default(),
139 );
140 compaction_selectors
141}
142
143impl HummockVersionTransaction<'_> {
144 fn apply_compact_task(&mut self, compact_task: &CompactTask) {
145 let mut version_delta = self.new_delta();
146 let trivial_move = compact_task.is_trivial_move_task();
147 version_delta.trivial_move = trivial_move;
148
149 let group_deltas = &mut version_delta
150 .group_deltas
151 .entry(compact_task.compaction_group_id)
152 .or_default()
153 .group_deltas;
154 let mut removed_table_ids_map: BTreeMap<u32, HashSet<HummockSstableId>> =
155 BTreeMap::default();
156
157 for level in &compact_task.input_ssts {
158 let level_idx = level.level_idx;
159
160 removed_table_ids_map
161 .entry(level_idx)
162 .or_default()
163 .extend(level.table_infos.iter().map(|sst| sst.sst_id));
164 }
165
166 for (level_idx, removed_table_ids) in removed_table_ids_map {
167 let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
168 level_idx,
169 0, removed_table_ids,
171 vec![], 0, compact_task.compaction_group_version_id,
174 ));
175
176 group_deltas.push(group_delta);
177 }
178
179 let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
180 compact_task.target_level,
181 compact_task.target_sub_level_id,
182 HashSet::new(), compact_task.sorted_output_ssts.clone(),
184 compact_task.split_weight_by_vnode,
185 compact_task.compaction_group_version_id,
186 ));
187
188 group_deltas.push(group_delta);
189 version_delta.pre_apply();
190 }
191}
192
193#[derive(Default)]
194pub struct Compaction {
195 pub compact_task_assignment: BTreeMap<HummockCompactionTaskId, PbCompactTaskAssignment>,
197 pub compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus>,
199
200 pub _deterministic_mode: bool,
201}
202
203impl HummockManager {
204 pub async fn get_assigned_compact_task_num(&self) -> u64 {
205 self.compaction.read().await.compact_task_assignment.len() as u64
206 }
207
208 pub async fn list_compaction_status(
209 &self,
210 ) -> (Vec<PbCompactStatus>, Vec<CompactTaskAssignment>) {
211 let compaction = self.compaction.read().await;
212 (
213 compaction.compaction_statuses.values().map_into().collect(),
214 compaction
215 .compact_task_assignment
216 .values()
217 .cloned()
218 .collect(),
219 )
220 }
221
222 pub async fn get_compaction_scores(
223 &self,
224 compaction_group_id: CompactionGroupId,
225 ) -> Vec<PickerInfo> {
226 let (status, levels, group) = {
227 let compaction = self.compaction.read().await;
228 let versioning = self.versioning.read().await;
229 let config_manager = self.compaction_group_manager.read().await;
230 match (
231 compaction.compaction_statuses.get(&compaction_group_id),
232 versioning.current_version.levels.get(&compaction_group_id),
233 config_manager.try_get_compaction_group_config(compaction_group_id),
234 ) {
235 (Some(cs), Some(v), Some(cf)) => (cs.to_owned(), v.to_owned(), cf),
236 _ => {
237 return vec![];
238 }
239 }
240 };
241 let dynamic_level_core = DynamicLevelSelectorCore::new(
242 group.compaction_config,
243 Arc::new(CompactionDeveloperConfig::default()),
244 );
245 let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers);
246 ctx.score_levels
247 }
248}
249
250impl HummockManager {
251 pub fn compaction_event_loop(
252 hummock_manager: Arc<Self>,
253 compactor_streams_change_rx: UnboundedReceiver<(
254 u32,
255 Streaming<SubscribeCompactionEventRequest>,
256 )>,
257 ) -> Vec<(JoinHandle<()>, Sender<()>)> {
258 let mut join_handle_vec = Vec::default();
259
260 let hummock_compaction_event_handler =
261 HummockCompactionEventHandler::new(hummock_manager.clone());
262
263 let dedicated_event_loop = HummockCompactorDedicatedEventLoop::new(
264 hummock_manager.clone(),
265 hummock_compaction_event_handler.clone(),
266 );
267
268 let (dedicated_event_loop_join_handle, event_tx, shutdown_tx) = dedicated_event_loop.run();
269 join_handle_vec.push((dedicated_event_loop_join_handle, shutdown_tx));
270
271 let hummock_compaction_event_dispatcher = HummockCompactionEventDispatcher::new(
272 hummock_manager.env.opts.clone(),
273 hummock_compaction_event_handler,
274 Some(event_tx),
275 );
276
277 let event_loop = HummockCompactionEventLoop::new(
278 hummock_compaction_event_dispatcher,
279 hummock_manager.metrics.clone(),
280 compactor_streams_change_rx,
281 );
282
283 let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
284 join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
285
286 join_handle_vec
287 }
288
289 pub fn add_compactor_stream(
290 &self,
291 context_id: u32,
292 req_stream: Streaming<SubscribeCompactionEventRequest>,
293 ) {
294 self.compactor_streams_change_tx
295 .send((context_id, req_stream))
296 .unwrap();
297 }
298
299 pub async fn auto_pick_compaction_group_and_type(
300 &self,
301 ) -> Option<(CompactionGroupId, compact_task::TaskType)> {
302 let mut compaction_group_ids = self.compaction_group_ids().await;
303 compaction_group_ids.shuffle(&mut thread_rng());
304
305 for cg_id in compaction_group_ids {
306 if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) {
307 return Some((cg_id, pick_type));
308 }
309 }
310
311 None
312 }
313
314 async fn auto_pick_compaction_groups_and_type(
317 &self,
318 ) -> (Vec<CompactionGroupId>, compact_task::TaskType) {
319 let mut compaction_group_ids = self.compaction_group_ids().await;
320 compaction_group_ids.shuffle(&mut thread_rng());
321
322 let mut normal_groups = vec![];
323 for cg_id in compaction_group_ids {
324 if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) {
325 if pick_type == TaskType::Dynamic {
326 normal_groups.push(cg_id);
327 } else if normal_groups.is_empty() {
328 return (vec![cg_id], pick_type);
329 }
330 }
331 }
332 (normal_groups, TaskType::Dynamic)
333 }
334}
335
336impl HummockManager {
337 pub async fn get_compact_tasks_impl(
338 &self,
339 compaction_groups: Vec<CompactionGroupId>,
340 max_select_count: usize,
341 selector: &mut Box<dyn CompactionSelector>,
342 ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
343 let deterministic_mode = self.env.opts.compaction_deterministic_test;
344
345 let mut compaction_guard = self.compaction.write().await;
346 let compaction: &mut Compaction = &mut compaction_guard;
347 let mut versioning_guard = self.versioning.write().await;
348 let versioning: &mut Versioning = &mut versioning_guard;
349
350 let _timer = start_measure_real_process_timer!(self, "get_compact_tasks_impl");
351
352 let start_time = Instant::now();
353 let mut compaction_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
354
355 let mut compact_task_assignment =
356 BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
357
358 let mut version = HummockVersionTransaction::new(
359 &mut versioning.current_version,
360 &mut versioning.hummock_version_deltas,
361 self.env.notification_manager(),
362 None,
363 &self.metrics,
364 );
365 let mut version_stats = HummockVersionStatsTransaction::new(
367 &mut versioning.version_stats,
368 self.env.notification_manager(),
369 );
370
371 if deterministic_mode {
372 version.disable_apply_to_txn();
373 }
374 let all_versioned_table_schemas = if self.env.opts.enable_dropped_column_reclaim {
375 self.metadata_manager
376 .catalog_controller
377 .get_versioned_table_schemas()
378 .await
379 .map_err(|e| Error::Internal(e.into()))?
380 } else {
381 HashMap::default()
382 };
383 let mut unschedule_groups = vec![];
384 let mut trivial_tasks = vec![];
385 let mut pick_tasks = vec![];
386 let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts(
387 &self.env.opts,
388 ));
389 'outside: for compaction_group_id in compaction_groups {
390 if pick_tasks.len() >= max_select_count {
391 break;
392 }
393
394 if !version
395 .latest_version()
396 .levels
397 .contains_key(&compaction_group_id)
398 {
399 continue;
400 }
401
402 let group_config = {
406 let config_manager = self.compaction_group_manager.read().await;
407
408 match config_manager.try_get_compaction_group_config(compaction_group_id) {
409 Some(config) => config,
410 None => continue,
411 }
412 };
413
414 let task_id = next_compaction_task_id(&self.env).await?;
416
417 if !compaction_statuses.contains_key(&compaction_group_id) {
418 compaction_statuses.insert(
420 compaction_group_id,
421 CompactStatus::new(
422 compaction_group_id,
423 group_config.compaction_config.max_level,
424 ),
425 );
426 }
427 let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap();
428
429 let can_trivial_move = matches!(selector.task_type(), TaskType::Dynamic)
430 || matches!(selector.task_type(), TaskType::Emergency);
431
432 let mut stats = LocalSelectorStatistic::default();
433 let member_table_ids: Vec<_> = version
434 .latest_version()
435 .state_table_info
436 .compaction_group_member_table_ids(compaction_group_id)
437 .iter()
438 .map(|table_id| table_id.table_id)
439 .collect();
440
441 let mut table_id_to_option: HashMap<u32, _> = HashMap::default();
442
443 {
444 let guard = self.table_id_to_table_option.read();
445 for table_id in &member_table_ids {
446 if let Some(opts) = guard.get(table_id) {
447 table_id_to_option.insert(*table_id, *opts);
448 }
449 }
450 }
451
452 while let Some(compact_task) = compact_status.get_compact_task(
453 version
454 .latest_version()
455 .get_compaction_group_levels(compaction_group_id),
456 version
457 .latest_version()
458 .state_table_info
459 .compaction_group_member_table_ids(compaction_group_id),
460 task_id as HummockCompactionTaskId,
461 &group_config,
462 &mut stats,
463 selector,
464 &table_id_to_option,
465 developer_config.clone(),
466 &version.latest_version().table_watermarks,
467 &version.latest_version().state_table_info,
468 ) {
469 let target_level_id = compact_task.input.target_level as u32;
470 let compaction_group_version_id = version
471 .latest_version()
472 .get_compaction_group_levels(compaction_group_id)
473 .compaction_group_version_id;
474 let compression_algorithm = match compact_task.compression_algorithm.as_str() {
475 "Lz4" => 1,
476 "Zstd" => 2,
477 _ => 0,
478 };
479 let vnode_partition_count = compact_task.input.vnode_partition_count;
480 let mut compact_task = CompactTask {
481 input_ssts: compact_task.input.input_levels,
482 splits: vec![KeyRange::inf()],
483 sorted_output_ssts: vec![],
484 task_id,
485 target_level: target_level_id,
486 gc_delete_keys: version
489 .latest_version()
490 .get_compaction_group_levels(compaction_group_id)
491 .is_last_level(target_level_id),
492 base_level: compact_task.base_level as u32,
493 task_status: TaskStatus::Pending,
494 compaction_group_id: group_config.group_id,
495 compaction_group_version_id,
496 existing_table_ids: member_table_ids.clone(),
497 compression_algorithm,
498 target_file_size: compact_task.target_file_size,
499 table_options: table_id_to_option
500 .iter()
501 .map(|(table_id, table_option)| {
502 (*table_id, TableOption::from(table_option))
503 })
504 .collect(),
505 current_epoch_time: Epoch::now().0,
506 compaction_filter_mask: group_config.compaction_config.compaction_filter_mask,
507 target_sub_level_id: compact_task.input.target_sub_level_id,
508 task_type: compact_task.compaction_task_type,
509 split_weight_by_vnode: vnode_partition_count,
510 max_sub_compaction: group_config.compaction_config.max_sub_compaction,
511 ..Default::default()
512 };
513
514 let is_trivial_reclaim = compact_task.is_trivial_reclaim();
515 let is_trivial_move = compact_task.is_trivial_move_task();
516 if is_trivial_reclaim || (is_trivial_move && can_trivial_move) {
517 let log_label = if is_trivial_reclaim {
518 "TrivialReclaim"
519 } else {
520 "TrivialMove"
521 };
522 let label = if is_trivial_reclaim {
523 "trivial-space-reclaim"
524 } else {
525 "trivial-move"
526 };
527
528 tracing::debug!(
529 "{} for compaction group {}: input: {:?}, cost time: {:?}",
530 log_label,
531 compact_task.compaction_group_id,
532 compact_task.input_ssts,
533 start_time.elapsed()
534 );
535 compact_task.task_status = TaskStatus::Success;
536 compact_status.report_compact_task(&compact_task);
537 if !is_trivial_reclaim {
538 compact_task
539 .sorted_output_ssts
540 .clone_from(&compact_task.input_ssts[0].table_infos);
541 }
542 update_table_stats_for_vnode_watermark_trivial_reclaim(
543 &mut version_stats.table_stats,
544 &compact_task,
545 );
546 self.metrics
547 .compact_frequency
548 .with_label_values(&[
549 label,
550 &compact_task.compaction_group_id.to_string(),
551 selector.task_type().as_str_name(),
552 "SUCCESS",
553 ])
554 .inc();
555
556 version.apply_compact_task(&compact_task);
557 trivial_tasks.push(compact_task);
558 if trivial_tasks.len() >= self.env.opts.max_trivial_move_task_count_per_loop {
559 break 'outside;
560 }
561 } else {
562 self.calculate_vnode_partition(
563 &mut compact_task,
564 group_config.compaction_config.as_ref(),
565 );
566
567 let table_ids_to_be_compacted = compact_task.build_compact_table_ids();
568
569 let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks) = version
570 .latest_version()
571 .safe_epoch_table_watermarks(&table_ids_to_be_compacted)
572 .into_iter()
573 .partition(|(_table_id, table_watermarke)| {
574 matches!(
575 table_watermarke.watermark_type,
576 WatermarkSerdeType::PkPrefix
577 )
578 });
579
580 compact_task.pk_prefix_table_watermarks = pk_prefix_table_watermarks;
581 compact_task.non_pk_prefix_table_watermarks = non_pk_prefix_table_watermarks;
582
583 compact_task.table_schemas = compact_task
584 .existing_table_ids
585 .iter()
586 .filter_map(|table_id| {
587 let id = (*table_id).try_into().unwrap();
588 all_versioned_table_schemas.get(&id).map(|column_ids| {
589 (
590 *table_id,
591 TableSchema {
592 column_ids: column_ids.clone(),
593 },
594 )
595 })
596 })
597 .collect();
598
599 compact_task_assignment.insert(
600 compact_task.task_id,
601 CompactTaskAssignment {
602 compact_task: Some(compact_task.clone().into()),
603 context_id: META_NODE_ID, },
605 );
606
607 pick_tasks.push(compact_task);
608 break;
609 }
610
611 stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
612 stats = LocalSelectorStatistic::default();
613 }
614 if pick_tasks
615 .last()
616 .map(|task| task.compaction_group_id != compaction_group_id)
617 .unwrap_or(true)
618 {
619 unschedule_groups.push(compaction_group_id);
620 }
621 stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
622 }
623
624 if !trivial_tasks.is_empty() {
625 commit_multi_var!(
626 self.meta_store_ref(),
627 compaction_statuses,
628 compact_task_assignment,
629 version,
630 version_stats
631 )?;
632 self.metrics
633 .compact_task_batch_count
634 .with_label_values(&["batch_trivial_move"])
635 .observe(trivial_tasks.len() as f64);
636
637 for trivial_task in &trivial_tasks {
638 self.metrics
639 .compact_task_trivial_move_sst_count
640 .with_label_values(&[&trivial_task.compaction_group_id.to_string()])
641 .observe(trivial_task.input_ssts[0].table_infos.len() as _);
642 }
643
644 drop(versioning_guard);
645 } else {
646 drop(versioning_guard);
649 commit_multi_var!(
650 self.meta_store_ref(),
651 compaction_statuses,
652 compact_task_assignment
653 )?;
654 }
655 drop(compaction_guard);
656 if !pick_tasks.is_empty() {
657 self.metrics
658 .compact_task_batch_count
659 .with_label_values(&["batch_get_compact_task"])
660 .observe(pick_tasks.len() as f64);
661 }
662
663 for compact_task in &mut pick_tasks {
664 let compaction_group_id = compact_task.compaction_group_id;
665
666 self.compactor_manager
668 .initiate_task_heartbeat(compact_task.clone());
669
670 compact_task.task_status = TaskStatus::Pending;
672 let compact_task_statistics = statistics_compact_task(compact_task);
673
674 let level_type_label = build_compact_task_level_type_metrics_label(
675 compact_task.input_ssts[0].level_idx as usize,
676 compact_task.input_ssts.last().unwrap().level_idx as usize,
677 );
678
679 let level_count = compact_task.input_ssts.len();
680 if compact_task.input_ssts[0].level_idx == 0 {
681 self.metrics
682 .l0_compact_level_count
683 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
684 .observe(level_count as _);
685 }
686
687 self.metrics
688 .compact_task_size
689 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
690 .observe(compact_task_statistics.total_file_size as _);
691
692 self.metrics
693 .compact_task_size
694 .with_label_values(&[
695 &compaction_group_id.to_string(),
696 &format!("{} uncompressed", level_type_label),
697 ])
698 .observe(compact_task_statistics.total_uncompressed_file_size as _);
699
700 self.metrics
701 .compact_task_file_count
702 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
703 .observe(compact_task_statistics.total_file_count as _);
704
705 tracing::trace!(
706 "For compaction group {}: pick up {} {} sub_level in level {} to compact to target {}. cost time: {:?} compact_task_statistics {:?}",
707 compaction_group_id,
708 level_count,
709 compact_task.input_ssts[0].level_type.as_str_name(),
710 compact_task.input_ssts[0].level_idx,
711 compact_task.target_level,
712 start_time.elapsed(),
713 compact_task_statistics
714 );
715 }
716
717 #[cfg(test)]
718 {
719 self.check_state_consistency().await;
720 }
721 pick_tasks.extend(trivial_tasks);
722 Ok((pick_tasks, unschedule_groups))
723 }
724
725 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
727 fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore(
728 anyhow::anyhow!("failpoint metastore err")
729 )));
730 let ret = self
731 .cancel_compact_task_impl(vec![task_id], task_status)
732 .await?;
733 Ok(ret[0])
734 }
735
736 pub async fn cancel_compact_tasks(
737 &self,
738 tasks: Vec<u64>,
739 task_status: TaskStatus,
740 ) -> Result<Vec<bool>> {
741 self.cancel_compact_task_impl(tasks, task_status).await
742 }
743
744 async fn cancel_compact_task_impl(
745 &self,
746 task_ids: Vec<u64>,
747 task_status: TaskStatus,
748 ) -> Result<Vec<bool>> {
749 assert!(CANCEL_STATUS_SET.contains(&task_status));
750 let tasks = task_ids
751 .into_iter()
752 .map(|task_id| ReportTask {
753 task_id,
754 task_status,
755 sorted_output_ssts: vec![],
756 table_stats_change: HashMap::default(),
757 object_timestamps: HashMap::default(),
758 })
759 .collect_vec();
760 let rets = self.report_compact_tasks(tasks).await?;
761 #[cfg(test)]
762 {
763 self.check_state_consistency().await;
764 }
765 Ok(rets)
766 }
767
768 async fn get_compact_tasks(
769 &self,
770 mut compaction_groups: Vec<CompactionGroupId>,
771 max_select_count: usize,
772 selector: &mut Box<dyn CompactionSelector>,
773 ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
774 fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
775 anyhow::anyhow!("failpoint metastore error")
776 )));
777 compaction_groups.shuffle(&mut thread_rng());
778 let (mut tasks, groups) = self
779 .get_compact_tasks_impl(compaction_groups, max_select_count, selector)
780 .await?;
781 tasks.retain(|task| {
782 if task.task_status == TaskStatus::Success {
783 debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
784 false
785 } else {
786 true
787 }
788 });
789 Ok((tasks, groups))
790 }
791
792 pub async fn get_compact_task(
793 &self,
794 compaction_group_id: CompactionGroupId,
795 selector: &mut Box<dyn CompactionSelector>,
796 ) -> Result<Option<CompactTask>> {
797 fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
798 anyhow::anyhow!("failpoint metastore error")
799 )));
800
801 let (normal_tasks, _) = self
802 .get_compact_tasks_impl(vec![compaction_group_id], 1, selector)
803 .await?;
804 for task in normal_tasks {
805 if task.task_status != TaskStatus::Success {
806 return Ok(Some(task));
807 }
808 debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
809 }
810 Ok(None)
811 }
812
813 pub async fn manual_get_compact_task(
814 &self,
815 compaction_group_id: CompactionGroupId,
816 manual_compaction_option: ManualCompactionOption,
817 ) -> Result<Option<CompactTask>> {
818 let mut selector: Box<dyn CompactionSelector> =
819 Box::new(ManualCompactionSelector::new(manual_compaction_option));
820 self.get_compact_task(compaction_group_id, &mut selector)
821 .await
822 }
823
824 pub async fn report_compact_task(
825 &self,
826 task_id: u64,
827 task_status: TaskStatus,
828 sorted_output_ssts: Vec<SstableInfo>,
829 table_stats_change: Option<PbTableStatsMap>,
830 object_timestamps: HashMap<HummockSstableObjectId, u64>,
831 ) -> Result<bool> {
832 let rets = self
833 .report_compact_tasks(vec![ReportTask {
834 task_id,
835 task_status,
836 sorted_output_ssts,
837 table_stats_change: table_stats_change.unwrap_or_default(),
838 object_timestamps,
839 }])
840 .await?;
841 Ok(rets[0])
842 }
843
844 pub async fn report_compact_tasks(&self, report_tasks: Vec<ReportTask>) -> Result<Vec<bool>> {
845 let compaction_guard = self.compaction.write().await;
846 let versioning_guard = self.versioning.write().await;
847
848 self.report_compact_tasks_impl(report_tasks, compaction_guard, versioning_guard)
849 .await
850 }
851
852 pub async fn report_compact_tasks_impl(
860 &self,
861 report_tasks: Vec<ReportTask>,
862 mut compaction_guard: RwLockWriteGuard<'_, Compaction>,
863 mut versioning_guard: RwLockWriteGuard<'_, Versioning>,
864 ) -> Result<Vec<bool>> {
865 let deterministic_mode = self.env.opts.compaction_deterministic_test;
866 let compaction: &mut Compaction = &mut compaction_guard;
867 let start_time = Instant::now();
868 let original_keys = compaction.compaction_statuses.keys().cloned().collect_vec();
869 let mut compact_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
870 let mut rets = vec![false; report_tasks.len()];
871 let mut compact_task_assignment =
872 BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
873 let versioning: &mut Versioning = &mut versioning_guard;
875 let _timer = start_measure_real_process_timer!(self, "report_compact_tasks");
876
877 for group_id in original_keys {
879 if !versioning.current_version.levels.contains_key(&group_id) {
880 compact_statuses.remove(group_id);
881 }
882 }
883 let mut tasks = vec![];
884
885 let mut version = HummockVersionTransaction::new(
886 &mut versioning.current_version,
887 &mut versioning.hummock_version_deltas,
888 self.env.notification_manager(),
889 None,
890 &self.metrics,
891 );
892
893 if deterministic_mode {
894 version.disable_apply_to_txn();
895 }
896
897 let mut version_stats = HummockVersionStatsTransaction::new(
898 &mut versioning.version_stats,
899 self.env.notification_manager(),
900 );
901 let mut success_count = 0;
902 for (idx, task) in report_tasks.into_iter().enumerate() {
903 rets[idx] = true;
904 let mut compact_task = match compact_task_assignment.remove(task.task_id) {
905 Some(compact_task) => CompactTask::from(compact_task.compact_task.unwrap()),
906 None => {
907 tracing::warn!("{}", format!("compact task {} not found", task.task_id));
908 rets[idx] = false;
909 continue;
910 }
911 };
912
913 {
914 compact_task.task_status = task.task_status;
916 compact_task.sorted_output_ssts = task.sorted_output_ssts;
917 }
918
919 match compact_statuses.get_mut(compact_task.compaction_group_id) {
920 Some(mut compact_status) => {
921 compact_status.report_compact_task(&compact_task);
922 }
923 None => {
924 compact_task.task_status = TaskStatus::InvalidGroupCanceled;
930 }
931 }
932
933 let is_success = if let TaskStatus::Success = compact_task.task_status {
934 match self
935 .report_compaction_sanity_check(&task.object_timestamps)
936 .await
937 {
938 Err(e) => {
939 warn!(
940 "failed to commit compaction task {} {}",
941 compact_task.task_id,
942 e.as_report()
943 );
944 compact_task.task_status = TaskStatus::RetentionTimeRejected;
945 false
946 }
947 _ => {
948 let group = version
949 .latest_version()
950 .levels
951 .get(&compact_task.compaction_group_id)
952 .unwrap();
953 let is_expired = compact_task.is_expired(group.compaction_group_version_id);
954 if is_expired {
955 compact_task.task_status = TaskStatus::InputOutdatedCanceled;
956 warn!(
957 "The task may be expired because of group split, task:\n {:?}",
958 compact_task_to_string(&compact_task)
959 );
960 }
961 !is_expired
962 }
963 }
964 } else {
965 false
966 };
967 if is_success {
968 success_count += 1;
969 version.apply_compact_task(&compact_task);
970 if purge_prost_table_stats(&mut version_stats.table_stats, version.latest_version())
971 {
972 self.metrics.version_stats.reset();
973 versioning.local_metrics.clear();
974 }
975 add_prost_table_stats_map(&mut version_stats.table_stats, &task.table_stats_change);
976 trigger_local_table_stat(
977 &self.metrics,
978 &mut versioning.local_metrics,
979 &version_stats,
980 &task.table_stats_change,
981 );
982 }
983 tasks.push(compact_task);
984 }
985 if success_count > 0 {
986 commit_multi_var!(
987 self.meta_store_ref(),
988 compact_statuses,
989 compact_task_assignment,
990 version,
991 version_stats
992 )?;
993
994 self.metrics
995 .compact_task_batch_count
996 .with_label_values(&["batch_report_task"])
997 .observe(success_count as f64);
998 } else {
999 commit_multi_var!(
1001 self.meta_store_ref(),
1002 compact_statuses,
1003 compact_task_assignment
1004 )?;
1005 }
1006
1007 let mut success_groups = vec![];
1008 for compact_task in &tasks {
1009 self.compactor_manager
1010 .remove_task_heartbeat(compact_task.task_id);
1011 tracing::trace!(
1012 "Reported compaction task. {}. cost time: {:?}",
1013 compact_task_to_string(compact_task),
1014 start_time.elapsed(),
1015 );
1016
1017 if !deterministic_mode
1018 && (matches!(compact_task.task_type, compact_task::TaskType::Dynamic)
1019 || matches!(compact_task.task_type, compact_task::TaskType::Emergency))
1020 {
1021 self.try_send_compaction_request(
1023 compact_task.compaction_group_id,
1024 compact_task::TaskType::Dynamic,
1025 );
1026 }
1027
1028 if compact_task.task_status == TaskStatus::Success {
1029 success_groups.push(compact_task.compaction_group_id);
1030 }
1031 }
1032
1033 trigger_compact_tasks_stat(
1034 &self.metrics,
1035 &tasks,
1036 &compaction.compaction_statuses,
1037 &versioning_guard.current_version,
1038 );
1039 drop(versioning_guard);
1040 if !success_groups.is_empty() {
1041 self.try_update_write_limits(&success_groups).await;
1042 }
1043 Ok(rets)
1044 }
1045
1046 pub async fn trigger_compaction_deterministic(
1049 &self,
1050 _base_version_id: HummockVersionId,
1051 compaction_groups: Vec<CompactionGroupId>,
1052 ) -> Result<()> {
1053 self.on_current_version(|old_version| {
1054 tracing::info!(
1055 "Trigger compaction for version {}, groups {:?}",
1056 old_version.id,
1057 compaction_groups
1058 );
1059 })
1060 .await;
1061
1062 if compaction_groups.is_empty() {
1063 return Ok(());
1064 }
1065 for compaction_group in compaction_groups {
1066 self.try_send_compaction_request(compaction_group, compact_task::TaskType::Dynamic);
1067 }
1068 Ok(())
1069 }
1070
1071 pub async fn trigger_manual_compaction(
1072 &self,
1073 compaction_group: CompactionGroupId,
1074 manual_compaction_option: ManualCompactionOption,
1075 ) -> Result<()> {
1076 let start_time = Instant::now();
1077
1078 let compactor = match self.compactor_manager.next_compactor() {
1080 Some(compactor) => compactor,
1081 None => {
1082 tracing::warn!("trigger_manual_compaction No compactor is available.");
1083 return Err(anyhow::anyhow!(
1084 "trigger_manual_compaction No compactor is available. compaction_group {}",
1085 compaction_group
1086 )
1087 .into());
1088 }
1089 };
1090
1091 let compact_task = self
1093 .manual_get_compact_task(compaction_group, manual_compaction_option)
1094 .await;
1095 let compact_task = match compact_task {
1096 Ok(Some(compact_task)) => compact_task,
1097 Ok(None) => {
1098 return Err(anyhow::anyhow!(
1100 "trigger_manual_compaction No compaction_task is available. compaction_group {}",
1101 compaction_group
1102 )
1103 .into());
1104 }
1105 Err(err) => {
1106 tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
1107
1108 return Err(anyhow::anyhow!(err)
1109 .context(format!(
1110 "Failed to get compaction task for compaction_group {}",
1111 compaction_group,
1112 ))
1113 .into());
1114 }
1115 };
1116
1117 let compact_task_string = compact_task_to_string(&compact_task);
1119 compactor
1121 .send_event(ResponseEvent::CompactTask(compact_task.into()))
1122 .with_context(|| {
1123 format!(
1124 "Failed to trigger compaction task for compaction_group {}",
1125 compaction_group,
1126 )
1127 })?;
1128
1129 tracing::info!(
1130 "Trigger manual compaction task. {}. cost time: {:?}",
1131 &compact_task_string,
1132 start_time.elapsed(),
1133 );
1134
1135 Ok(())
1136 }
1137
1138 pub fn try_send_compaction_request(
1140 &self,
1141 compaction_group: CompactionGroupId,
1142 task_type: compact_task::TaskType,
1143 ) -> bool {
1144 match self
1145 .compaction_state
1146 .try_sched_compaction(compaction_group, task_type)
1147 {
1148 Ok(_) => true,
1149 Err(e) => {
1150 tracing::error!(
1151 error = %e.as_report(),
1152 "failed to send compaction request for compaction group {}",
1153 compaction_group,
1154 );
1155 false
1156 }
1157 }
1158 }
1159
1160 pub(crate) fn calculate_vnode_partition(
1161 &self,
1162 compact_task: &mut CompactTask,
1163 compaction_config: &CompactionConfig,
1164 ) {
1165 if compact_task.target_level > compact_task.base_level {
1169 return;
1170 }
1171 if compaction_config.split_weight_by_vnode > 0 {
1172 for table_id in &compact_task.existing_table_ids {
1173 compact_task
1174 .table_vnode_partition
1175 .insert(*table_id, compact_task.split_weight_by_vnode);
1176 }
1177 } else {
1178 let mut table_size_info: HashMap<u32, u64> = HashMap::default();
1179 let mut existing_table_ids: HashSet<u32> = HashSet::default();
1180 for input_ssts in &compact_task.input_ssts {
1181 for sst in &input_ssts.table_infos {
1182 existing_table_ids.extend(sst.table_ids.iter());
1183 for table_id in &sst.table_ids {
1184 *table_size_info.entry(*table_id).or_default() +=
1185 sst.sst_size / (sst.table_ids.len() as u64);
1186 }
1187 }
1188 }
1189 compact_task
1190 .existing_table_ids
1191 .retain(|table_id| existing_table_ids.contains(table_id));
1192
1193 let hybrid_vnode_count = self.env.opts.hybrid_partition_node_count;
1194 let default_partition_count = self.env.opts.partition_vnode_count;
1195 let compact_task_table_size_partition_threshold_low = self
1197 .env
1198 .opts
1199 .compact_task_table_size_partition_threshold_low;
1200 let compact_task_table_size_partition_threshold_high = self
1201 .env
1202 .opts
1203 .compact_task_table_size_partition_threshold_high;
1204 let table_write_throughput_statistic_manager =
1206 self.table_write_throughput_statistic_manager.read();
1207 let timestamp = chrono::Utc::now().timestamp();
1208 for (table_id, compact_table_size) in table_size_info {
1209 let write_throughput = table_write_throughput_statistic_manager
1210 .get_table_throughput_descending(table_id, timestamp)
1211 .peekable()
1212 .peek()
1213 .map(|item| item.throughput)
1214 .unwrap_or(0);
1215 if compact_table_size > compact_task_table_size_partition_threshold_high
1216 && default_partition_count > 0
1217 {
1218 compact_task
1219 .table_vnode_partition
1220 .insert(table_id, default_partition_count);
1221 } else if (compact_table_size > compact_task_table_size_partition_threshold_low
1222 || (write_throughput > self.env.opts.table_high_write_throughput_threshold
1223 && compact_table_size > compaction_config.target_file_size_base))
1224 && hybrid_vnode_count > 0
1225 {
1226 compact_task
1228 .table_vnode_partition
1229 .insert(table_id, hybrid_vnode_count);
1230 } else if compact_table_size > compaction_config.target_file_size_base {
1231 compact_task.table_vnode_partition.insert(table_id, 1);
1233 }
1234 }
1235 compact_task
1236 .table_vnode_partition
1237 .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id));
1238 }
1239 }
1240
1241 pub fn compactor_manager_ref(&self) -> crate::hummock::CompactorManagerRef {
1242 self.compactor_manager.clone()
1243 }
1244}
1245
1246#[cfg(any(test, feature = "test"))]
1247impl HummockManager {
1248 pub async fn compaction_task_from_assignment_for_test(
1249 &self,
1250 task_id: u64,
1251 ) -> Option<CompactTaskAssignment> {
1252 let compaction_guard = self.compaction.read().await;
1253 let assignment_ref = &compaction_guard.compact_task_assignment;
1254 assignment_ref.get(&task_id).cloned()
1255 }
1256
1257 pub async fn report_compact_task_for_test(
1258 &self,
1259 task_id: u64,
1260 compact_task: Option<CompactTask>,
1261 task_status: TaskStatus,
1262 sorted_output_ssts: Vec<SstableInfo>,
1263 table_stats_change: Option<PbTableStatsMap>,
1264 ) -> Result<()> {
1265 if let Some(task) = compact_task {
1266 let mut guard = self.compaction.write().await;
1267 guard.compact_task_assignment.insert(
1268 task_id,
1269 CompactTaskAssignment {
1270 compact_task: Some(task.into()),
1271 context_id: 0,
1272 },
1273 );
1274 }
1275
1276 self.report_compact_tasks(vec![ReportTask {
1279 task_id,
1280 task_status,
1281 sorted_output_ssts,
1282 table_stats_change: table_stats_change.unwrap_or_default(),
1283 object_timestamps: HashMap::default(),
1284 }])
1285 .await?;
1286 Ok(())
1287 }
1288}
1289
1290#[derive(Debug, Default)]
1291pub struct CompactionState {
1292 scheduled: Mutex<HashSet<(CompactionGroupId, compact_task::TaskType)>>,
1293}
1294
1295impl CompactionState {
1296 pub fn new() -> Self {
1297 Self {
1298 scheduled: Default::default(),
1299 }
1300 }
1301
1302 pub fn try_sched_compaction(
1304 &self,
1305 compaction_group: CompactionGroupId,
1306 task_type: TaskType,
1307 ) -> std::result::Result<bool, SendError<CompactionRequestChannelItem>> {
1308 let mut guard = self.scheduled.lock();
1309 let key = (compaction_group, task_type);
1310 if guard.contains(&key) {
1311 return Ok(false);
1312 }
1313 guard.insert(key);
1314 Ok(true)
1315 }
1316
1317 pub fn unschedule(
1318 &self,
1319 compaction_group: CompactionGroupId,
1320 task_type: compact_task::TaskType,
1321 ) {
1322 self.scheduled.lock().remove(&(compaction_group, task_type));
1323 }
1324
1325 pub fn auto_pick_type(&self, group: CompactionGroupId) -> Option<TaskType> {
1326 let guard = self.scheduled.lock();
1327 if guard.contains(&(group, compact_task::TaskType::Dynamic)) {
1328 Some(compact_task::TaskType::Dynamic)
1329 } else if guard.contains(&(group, compact_task::TaskType::SpaceReclaim)) {
1330 Some(compact_task::TaskType::SpaceReclaim)
1331 } else if guard.contains(&(group, compact_task::TaskType::Ttl)) {
1332 Some(compact_task::TaskType::Ttl)
1333 } else if guard.contains(&(group, compact_task::TaskType::Tombstone)) {
1334 Some(compact_task::TaskType::Tombstone)
1335 } else if guard.contains(&(group, compact_task::TaskType::VnodeWatermark)) {
1336 Some(compact_task::TaskType::VnodeWatermark)
1337 } else {
1338 None
1339 }
1340 }
1341}
1342
1343impl Compaction {
1344 pub fn get_compact_task_assignments_by_group_id(
1345 &self,
1346 compaction_group_id: CompactionGroupId,
1347 ) -> Vec<CompactTaskAssignment> {
1348 self.compact_task_assignment
1349 .iter()
1350 .filter_map(|(_, assignment)| {
1351 if assignment
1352 .compact_task
1353 .as_ref()
1354 .is_some_and(|task| task.compaction_group_id == compaction_group_id)
1355 {
1356 Some(CompactTaskAssignment {
1357 compact_task: assignment.compact_task.clone(),
1358 context_id: assignment.context_id,
1359 })
1360 } else {
1361 None
1362 }
1363 })
1364 .collect()
1365 }
1366}
1367
1368#[derive(Clone, Default)]
1369pub struct CompactionGroupStatistic {
1370 pub group_id: CompactionGroupId,
1371 pub group_size: u64,
1372 pub table_statistic: BTreeMap<StateTableId, u64>,
1373 pub compaction_group_config: CompactionGroup,
1374}
1375
1376fn update_table_stats_for_vnode_watermark_trivial_reclaim(
1378 table_stats: &mut PbTableStatsMap,
1379 task: &CompactTask,
1380) {
1381 if task.task_type != TaskType::VnodeWatermark {
1382 return;
1383 }
1384 let mut deleted_table_keys: HashMap<u32, u64> = HashMap::default();
1385 for s in task.input_ssts.iter().flat_map(|l| l.table_infos.iter()) {
1386 assert_eq!(s.table_ids.len(), 1);
1387 let e = deleted_table_keys.entry(s.table_ids[0]).or_insert(0);
1388 *e += s.total_key_count;
1389 }
1390 for (table_id, delete_count) in deleted_table_keys {
1391 let Some(stats) = table_stats.get_mut(&table_id) else {
1392 continue;
1393 };
1394 if stats.total_key_count == 0 {
1395 continue;
1396 }
1397 let new_total_key_count = stats.total_key_count.saturating_sub(delete_count as i64);
1398 let ratio = new_total_key_count as f64 / stats.total_key_count as f64;
1399 stats.total_key_count = new_total_key_count;
1401 stats.total_key_size = (stats.total_key_size as f64 * ratio).ceil() as i64;
1403 stats.total_value_size = (stats.total_value_size as f64 * ratio).ceil() as i64;
1404 }
1405}
1406
1407#[derive(Debug, Clone)]
1408pub enum GroupState {
1409 Normal,
1411
1412 Emergency(String), WriteStop(String), }
1418
1419impl GroupState {
1420 pub fn is_write_stop(&self) -> bool {
1421 matches!(self, Self::WriteStop(_))
1422 }
1423
1424 pub fn is_emergency(&self) -> bool {
1425 matches!(self, Self::Emergency(_))
1426 }
1427
1428 pub fn reason(&self) -> Option<&str> {
1429 match self {
1430 Self::Emergency(reason) | Self::WriteStop(reason) => Some(reason),
1431 _ => None,
1432 }
1433 }
1434}
1435
1436#[derive(Clone, Default)]
1437pub struct GroupStateValidator;
1438
1439impl GroupStateValidator {
1440 pub fn write_stop_sub_level_count(
1441 level_count: usize,
1442 compaction_config: &CompactionConfig,
1443 ) -> bool {
1444 let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize;
1445 level_count > threshold
1446 }
1447
1448 pub fn write_stop_l0_size(l0_size: u64, compaction_config: &CompactionConfig) -> bool {
1449 l0_size
1450 > compaction_config
1451 .level0_stop_write_threshold_max_size
1452 .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1453 }
1454
1455 pub fn write_stop_l0_file_count(
1456 l0_file_count: usize,
1457 compaction_config: &CompactionConfig,
1458 ) -> bool {
1459 l0_file_count
1460 > compaction_config
1461 .level0_stop_write_threshold_max_sst_count
1462 .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1463 as usize
1464 }
1465
1466 pub fn emergency_l0_file_count(
1467 l0_file_count: usize,
1468 compaction_config: &CompactionConfig,
1469 ) -> bool {
1470 l0_file_count
1471 > compaction_config
1472 .emergency_level0_sst_file_count
1473 .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1474 as usize
1475 }
1476
1477 pub fn emergency_l0_partition_count(
1478 last_l0_sub_level_partition_count: usize,
1479 compaction_config: &CompactionConfig,
1480 ) -> bool {
1481 last_l0_sub_level_partition_count
1482 > compaction_config
1483 .emergency_level0_sub_level_partition
1484 .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1485 as usize
1486 }
1487
1488 pub fn check_single_group_write_stop(
1489 levels: &Levels,
1490 compaction_config: &CompactionConfig,
1491 ) -> GroupState {
1492 if Self::write_stop_sub_level_count(levels.l0.sub_levels.len(), compaction_config) {
1493 return GroupState::WriteStop(format!(
1494 "WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels",
1495 levels.l0.sub_levels.len(),
1496 compaction_config.level0_stop_write_threshold_sub_level_number
1497 ));
1498 }
1499
1500 if Self::write_stop_l0_file_count(
1501 levels
1502 .l0
1503 .sub_levels
1504 .iter()
1505 .map(|l| l.table_infos.len())
1506 .sum(),
1507 compaction_config,
1508 ) {
1509 return GroupState::WriteStop(format!(
1510 "WriteStop(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1511 levels
1512 .l0
1513 .sub_levels
1514 .iter()
1515 .map(|l| l.table_infos.len())
1516 .sum::<usize>(),
1517 compaction_config
1518 .level0_stop_write_threshold_max_sst_count
1519 .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1520 ));
1521 }
1522
1523 if Self::write_stop_l0_size(levels.l0.total_file_size, compaction_config) {
1524 return GroupState::WriteStop(format!(
1525 "WriteStop(l0_size: {}, threshold: {}) too large L0 size",
1526 levels.l0.total_file_size,
1527 compaction_config
1528 .level0_stop_write_threshold_max_size
1529 .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1530 ));
1531 }
1532
1533 GroupState::Normal
1534 }
1535
1536 pub fn check_single_group_emergency(
1537 levels: &Levels,
1538 compaction_config: &CompactionConfig,
1539 ) -> GroupState {
1540 if Self::emergency_l0_file_count(
1541 levels
1542 .l0
1543 .sub_levels
1544 .iter()
1545 .map(|l| l.table_infos.len())
1546 .sum(),
1547 compaction_config,
1548 ) {
1549 return GroupState::Emergency(format!(
1550 "Emergency(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1551 levels
1552 .l0
1553 .sub_levels
1554 .iter()
1555 .map(|l| l.table_infos.len())
1556 .sum::<usize>(),
1557 compaction_config
1558 .emergency_level0_sst_file_count
1559 .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1560 ));
1561 }
1562
1563 if Self::emergency_l0_partition_count(
1564 levels
1565 .l0
1566 .sub_levels
1567 .first()
1568 .map(|l| l.table_infos.len())
1569 .unwrap_or(0),
1570 compaction_config,
1571 ) {
1572 return GroupState::Emergency(format!(
1573 "Emergency(l0_partition_count: {}, threshold: {}) too many L0 partitions",
1574 levels
1575 .l0
1576 .sub_levels
1577 .first()
1578 .map(|l| l.table_infos.len())
1579 .unwrap_or(0),
1580 compaction_config
1581 .emergency_level0_sub_level_partition
1582 .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1583 ));
1584 }
1585
1586 GroupState::Normal
1587 }
1588
1589 pub fn group_state(levels: &Levels, compaction_config: &CompactionConfig) -> GroupState {
1590 let state = Self::check_single_group_write_stop(levels, compaction_config);
1591 if state.is_write_stop() {
1592 return state;
1593 }
1594
1595 Self::check_single_group_emergency(levels, compaction_config)
1596 }
1597}