1use std::collections::{BTreeMap, HashMap, HashSet};
30use std::sync::{Arc, LazyLock};
31use std::time::Instant;
32
33use anyhow::Context;
34use compaction_event_loop::{
35 HummockCompactionEventDispatcher, HummockCompactionEventHandler, HummockCompactionEventLoop,
36 HummockCompactorDedicatedEventLoop,
37};
38use fail::fail_point;
39use itertools::Itertools;
40use parking_lot::Mutex;
41use rand::rng as thread_rng;
42use rand::seq::SliceRandom;
43use risingwave_common::config::default::compaction_config;
44use risingwave_common::util::epoch::Epoch;
45use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask};
46use risingwave_hummock_sdk::compaction_group::StateTableId;
47use risingwave_hummock_sdk::key_range::KeyRange;
48use risingwave_hummock_sdk::level::Levels;
49use risingwave_hummock_sdk::sstable_info::SstableInfo;
50use risingwave_hummock_sdk::table_stats::{
51 PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats,
52};
53use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
54use risingwave_hummock_sdk::version::{GroupDelta, IntraLevelDelta};
55use risingwave_hummock_sdk::{
56 CompactionGroupId, HummockCompactionTaskId, HummockSstableId, HummockSstableObjectId,
57 HummockVersionId, compact_task_to_string, statistics_compact_task,
58};
59use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
60use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
61use risingwave_pb::hummock::{
62 CompactTaskAssignment, CompactionConfig, PbCompactStatus, PbCompactTaskAssignment,
63 SubscribeCompactionEventRequest, TableOption, TableSchema, compact_task,
64};
65use thiserror_ext::AsReport;
66use tokio::sync::RwLockWriteGuard;
67use tokio::sync::mpsc::UnboundedReceiver;
68use tokio::sync::mpsc::error::SendError;
69use tokio::sync::oneshot::Sender;
70use tokio::task::JoinHandle;
71use tonic::Streaming;
72use tracing::warn;
73
74use crate::hummock::compaction::selector::level_selector::PickerInfo;
75use crate::hummock::compaction::selector::{
76 DynamicLevelSelector, DynamicLevelSelectorCore, LocalSelectorStatistic, ManualCompactionOption,
77 ManualCompactionSelector, SpaceReclaimCompactionSelector, TombstoneCompactionSelector,
78 TtlCompactionSelector, VnodeWatermarkCompactionSelector,
79};
80use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig, CompactionSelector};
81use crate::hummock::error::{Error, Result};
82use crate::hummock::manager::transaction::{
83 HummockVersionStatsTransaction, HummockVersionTransaction,
84};
85use crate::hummock::manager::versioning::Versioning;
86use crate::hummock::metrics_utils::{
87 build_compact_task_level_type_metrics_label, trigger_compact_tasks_stat,
88 trigger_local_table_stat,
89};
90use crate::hummock::model::CompactionGroup;
91use crate::hummock::sequence::next_compaction_task_id;
92use crate::hummock::{HummockManager, commit_multi_var, start_measure_real_process_timer};
93use crate::manager::META_NODE_ID;
94use crate::model::BTreeMapTransaction;
95
96pub mod compaction_event_loop;
97pub mod compaction_group_manager;
98pub mod compaction_group_schedule;
99
100static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
101 [
102 TaskStatus::ManualCanceled,
103 TaskStatus::SendFailCanceled,
104 TaskStatus::AssignFailCanceled,
105 TaskStatus::HeartbeatCanceled,
106 TaskStatus::InvalidGroupCanceled,
107 TaskStatus::NoAvailMemoryResourceCanceled,
108 TaskStatus::NoAvailCpuResourceCanceled,
109 TaskStatus::HeartbeatProgressCanceled,
110 ]
111 .into_iter()
112 .collect()
113});
114
115type CompactionRequestChannelItem = (CompactionGroupId, compact_task::TaskType);
116
117fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> {
118 let mut compaction_selectors: HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> =
119 HashMap::default();
120 compaction_selectors.insert(
121 compact_task::TaskType::Dynamic,
122 Box::<DynamicLevelSelector>::default(),
123 );
124 compaction_selectors.insert(
125 compact_task::TaskType::SpaceReclaim,
126 Box::<SpaceReclaimCompactionSelector>::default(),
127 );
128 compaction_selectors.insert(
129 compact_task::TaskType::Ttl,
130 Box::<TtlCompactionSelector>::default(),
131 );
132 compaction_selectors.insert(
133 compact_task::TaskType::Tombstone,
134 Box::<TombstoneCompactionSelector>::default(),
135 );
136 compaction_selectors.insert(
137 compact_task::TaskType::VnodeWatermark,
138 Box::<VnodeWatermarkCompactionSelector>::default(),
139 );
140 compaction_selectors
141}
142
143impl HummockVersionTransaction<'_> {
144 fn apply_compact_task(&mut self, compact_task: &CompactTask) {
145 let mut version_delta = self.new_delta();
146 let trivial_move = compact_task.is_trivial_move_task();
147 version_delta.trivial_move = trivial_move;
148
149 let group_deltas = &mut version_delta
150 .group_deltas
151 .entry(compact_task.compaction_group_id)
152 .or_default()
153 .group_deltas;
154 let mut removed_table_ids_map: BTreeMap<u32, HashSet<HummockSstableId>> =
155 BTreeMap::default();
156
157 for level in &compact_task.input_ssts {
158 let level_idx = level.level_idx;
159
160 removed_table_ids_map
161 .entry(level_idx)
162 .or_default()
163 .extend(level.table_infos.iter().map(|sst| sst.sst_id));
164 }
165
166 for (level_idx, removed_table_ids) in removed_table_ids_map {
167 let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
168 level_idx,
169 0, removed_table_ids,
171 vec![], 0, compact_task.compaction_group_version_id,
174 ));
175
176 group_deltas.push(group_delta);
177 }
178
179 let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
180 compact_task.target_level,
181 compact_task.target_sub_level_id,
182 HashSet::new(), compact_task.sorted_output_ssts.clone(),
184 compact_task.split_weight_by_vnode,
185 compact_task.compaction_group_version_id,
186 ));
187
188 group_deltas.push(group_delta);
189 version_delta.pre_apply();
190 }
191}
192
193#[derive(Default)]
194pub struct Compaction {
195 pub compact_task_assignment: BTreeMap<HummockCompactionTaskId, PbCompactTaskAssignment>,
197 pub compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus>,
199
200 pub _deterministic_mode: bool,
201}
202
203impl HummockManager {
204 pub async fn get_assigned_compact_task_num(&self) -> u64 {
205 self.compaction.read().await.compact_task_assignment.len() as u64
206 }
207
208 pub async fn list_compaction_status(
209 &self,
210 ) -> (Vec<PbCompactStatus>, Vec<CompactTaskAssignment>) {
211 let compaction = self.compaction.read().await;
212 (
213 compaction.compaction_statuses.values().map_into().collect(),
214 compaction
215 .compact_task_assignment
216 .values()
217 .cloned()
218 .collect(),
219 )
220 }
221
222 pub async fn get_compaction_scores(
223 &self,
224 compaction_group_id: CompactionGroupId,
225 ) -> Vec<PickerInfo> {
226 let (status, levels, group) = {
227 let compaction = self.compaction.read().await;
228 let versioning = self.versioning.read().await;
229 let config_manager = self.compaction_group_manager.read().await;
230 match (
231 compaction.compaction_statuses.get(&compaction_group_id),
232 versioning.current_version.levels.get(&compaction_group_id),
233 config_manager.try_get_compaction_group_config(compaction_group_id),
234 ) {
235 (Some(cs), Some(v), Some(cf)) => (cs.to_owned(), v.to_owned(), cf),
236 _ => {
237 return vec![];
238 }
239 }
240 };
241 let dynamic_level_core = DynamicLevelSelectorCore::new(
242 group.compaction_config,
243 Arc::new(CompactionDeveloperConfig::default()),
244 );
245 let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers);
246 ctx.score_levels
247 }
248}
249
250impl HummockManager {
251 pub fn compaction_event_loop(
252 hummock_manager: Arc<Self>,
253 compactor_streams_change_rx: UnboundedReceiver<(
254 u32,
255 Streaming<SubscribeCompactionEventRequest>,
256 )>,
257 ) -> Vec<(JoinHandle<()>, Sender<()>)> {
258 let mut join_handle_vec = Vec::default();
259
260 let hummock_compaction_event_handler =
261 HummockCompactionEventHandler::new(hummock_manager.clone());
262
263 let dedicated_event_loop = HummockCompactorDedicatedEventLoop::new(
264 hummock_manager.clone(),
265 hummock_compaction_event_handler.clone(),
266 );
267
268 let (dedicated_event_loop_join_handle, event_tx, shutdown_tx) = dedicated_event_loop.run();
269 join_handle_vec.push((dedicated_event_loop_join_handle, shutdown_tx));
270
271 let hummock_compaction_event_dispatcher = HummockCompactionEventDispatcher::new(
272 hummock_manager.env.opts.clone(),
273 hummock_compaction_event_handler,
274 Some(event_tx),
275 );
276
277 let event_loop = HummockCompactionEventLoop::new(
278 hummock_compaction_event_dispatcher,
279 hummock_manager.metrics.clone(),
280 compactor_streams_change_rx,
281 );
282
283 let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
284 join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
285
286 join_handle_vec
287 }
288
289 pub fn add_compactor_stream(
290 &self,
291 context_id: u32,
292 req_stream: Streaming<SubscribeCompactionEventRequest>,
293 ) {
294 self.compactor_streams_change_tx
295 .send((context_id, req_stream))
296 .unwrap();
297 }
298
299 pub async fn auto_pick_compaction_group_and_type(
300 &self,
301 ) -> Option<(CompactionGroupId, compact_task::TaskType)> {
302 let mut compaction_group_ids = self.compaction_group_ids().await;
303 compaction_group_ids.shuffle(&mut thread_rng());
304
305 for cg_id in compaction_group_ids {
306 if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) {
307 return Some((cg_id, pick_type));
308 }
309 }
310
311 None
312 }
313
314 async fn auto_pick_compaction_groups_and_type(
317 &self,
318 ) -> (Vec<CompactionGroupId>, compact_task::TaskType) {
319 let mut compaction_group_ids = self.compaction_group_ids().await;
320 compaction_group_ids.shuffle(&mut thread_rng());
321
322 let mut normal_groups = vec![];
323 for cg_id in compaction_group_ids {
324 if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) {
325 if pick_type == TaskType::Dynamic {
326 normal_groups.push(cg_id);
327 } else if normal_groups.is_empty() {
328 return (vec![cg_id], pick_type);
329 }
330 }
331 }
332 (normal_groups, TaskType::Dynamic)
333 }
334}
335
336impl HummockManager {
337 pub async fn get_compact_tasks_impl(
338 &self,
339 compaction_groups: Vec<CompactionGroupId>,
340 max_select_count: usize,
341 selector: &mut Box<dyn CompactionSelector>,
342 ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
343 let deterministic_mode = self.env.opts.compaction_deterministic_test;
344
345 let mut compaction_guard = self.compaction.write().await;
346 let compaction: &mut Compaction = &mut compaction_guard;
347 let mut versioning_guard = self.versioning.write().await;
348 let versioning: &mut Versioning = &mut versioning_guard;
349
350 let _timer = start_measure_real_process_timer!(self, "get_compact_tasks_impl");
351
352 let start_time = Instant::now();
353 let mut compaction_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
354
355 let mut compact_task_assignment =
356 BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
357
358 let mut version = HummockVersionTransaction::new(
359 &mut versioning.current_version,
360 &mut versioning.hummock_version_deltas,
361 self.env.notification_manager(),
362 None,
363 &self.metrics,
364 );
365 let mut version_stats = HummockVersionStatsTransaction::new(
367 &mut versioning.version_stats,
368 self.env.notification_manager(),
369 );
370
371 if deterministic_mode {
372 version.disable_apply_to_txn();
373 }
374 let all_versioned_table_schemas = if self.env.opts.enable_dropped_column_reclaim {
375 self.metadata_manager
376 .catalog_controller
377 .get_versioned_table_schemas()
378 .await
379 .map_err(|e| Error::Internal(e.into()))?
380 } else {
381 HashMap::default()
382 };
383 let mut unschedule_groups = vec![];
384 let mut trivial_tasks = vec![];
385 let mut pick_tasks = vec![];
386 let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts(
387 &self.env.opts,
388 ));
389 'outside: for compaction_group_id in compaction_groups {
390 if pick_tasks.len() >= max_select_count {
391 break;
392 }
393
394 if !version
395 .latest_version()
396 .levels
397 .contains_key(&compaction_group_id)
398 {
399 continue;
400 }
401
402 let group_config = {
406 let config_manager = self.compaction_group_manager.read().await;
407
408 match config_manager.try_get_compaction_group_config(compaction_group_id) {
409 Some(config) => config,
410 None => continue,
411 }
412 };
413
414 let task_id = next_compaction_task_id(&self.env).await?;
416
417 if !compaction_statuses.contains_key(&compaction_group_id) {
418 compaction_statuses.insert(
420 compaction_group_id,
421 CompactStatus::new(
422 compaction_group_id,
423 group_config.compaction_config.max_level,
424 ),
425 );
426 }
427 let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap();
428
429 let can_trivial_move = matches!(selector.task_type(), TaskType::Dynamic)
430 || matches!(selector.task_type(), TaskType::Emergency);
431
432 let mut stats = LocalSelectorStatistic::default();
433 let member_table_ids: Vec<_> = version
434 .latest_version()
435 .state_table_info
436 .compaction_group_member_table_ids(compaction_group_id)
437 .iter()
438 .map(|table_id| table_id.table_id)
439 .collect();
440
441 let mut table_id_to_option: HashMap<u32, _> = HashMap::default();
442
443 {
444 let guard = self.table_id_to_table_option.read();
445 for table_id in &member_table_ids {
446 if let Some(opts) = guard.get(table_id) {
447 table_id_to_option.insert(*table_id, *opts);
448 }
449 }
450 }
451
452 while let Some(compact_task) = compact_status.get_compact_task(
453 version
454 .latest_version()
455 .get_compaction_group_levels(compaction_group_id),
456 version
457 .latest_version()
458 .state_table_info
459 .compaction_group_member_table_ids(compaction_group_id),
460 task_id as HummockCompactionTaskId,
461 &group_config,
462 &mut stats,
463 selector,
464 &table_id_to_option,
465 developer_config.clone(),
466 &version.latest_version().table_watermarks,
467 &version.latest_version().state_table_info,
468 ) {
469 let target_level_id = compact_task.input.target_level as u32;
470 let compaction_group_version_id = version
471 .latest_version()
472 .get_compaction_group_levels(compaction_group_id)
473 .compaction_group_version_id;
474 let compression_algorithm = match compact_task.compression_algorithm.as_str() {
475 "Lz4" => 1,
476 "Zstd" => 2,
477 _ => 0,
478 };
479 let vnode_partition_count = compact_task.input.vnode_partition_count;
480 let mut compact_task = CompactTask {
481 input_ssts: compact_task.input.input_levels,
482 splits: vec![KeyRange::inf()],
483 sorted_output_ssts: vec![],
484 task_id,
485 target_level: target_level_id,
486 gc_delete_keys: version
489 .latest_version()
490 .get_compaction_group_levels(compaction_group_id)
491 .is_last_level(target_level_id),
492 base_level: compact_task.base_level as u32,
493 task_status: TaskStatus::Pending,
494 compaction_group_id: group_config.group_id,
495 compaction_group_version_id,
496 existing_table_ids: member_table_ids.clone(),
497 compression_algorithm,
498 target_file_size: compact_task.target_file_size,
499 table_options: table_id_to_option
500 .iter()
501 .map(|(table_id, table_option)| {
502 (*table_id, TableOption::from(table_option))
503 })
504 .collect(),
505 current_epoch_time: Epoch::now().0,
506 compaction_filter_mask: group_config.compaction_config.compaction_filter_mask,
507 target_sub_level_id: compact_task.input.target_sub_level_id,
508 task_type: compact_task.compaction_task_type,
509 split_weight_by_vnode: vnode_partition_count,
510 max_sub_compaction: group_config.compaction_config.max_sub_compaction,
511 ..Default::default()
512 };
513
514 let is_trivial_reclaim = compact_task.is_trivial_reclaim();
515 let is_trivial_move = compact_task.is_trivial_move_task();
516 if is_trivial_reclaim || (is_trivial_move && can_trivial_move) {
517 let log_label = if is_trivial_reclaim {
518 "TrivialReclaim"
519 } else {
520 "TrivialMove"
521 };
522 let label = if is_trivial_reclaim {
523 "trivial-space-reclaim"
524 } else {
525 "trivial-move"
526 };
527
528 tracing::debug!(
529 "{} for compaction group {}: input: {:?}, cost time: {:?}",
530 log_label,
531 compact_task.compaction_group_id,
532 compact_task.input_ssts,
533 start_time.elapsed()
534 );
535 compact_task.task_status = TaskStatus::Success;
536 compact_status.report_compact_task(&compact_task);
537 if !is_trivial_reclaim {
538 compact_task
539 .sorted_output_ssts
540 .clone_from(&compact_task.input_ssts[0].table_infos);
541 }
542 update_table_stats_for_vnode_watermark_trivial_reclaim(
543 &mut version_stats.table_stats,
544 &compact_task,
545 );
546 self.metrics
547 .compact_frequency
548 .with_label_values(&[
549 label,
550 &compact_task.compaction_group_id.to_string(),
551 selector.task_type().as_str_name(),
552 "SUCCESS",
553 ])
554 .inc();
555
556 version.apply_compact_task(&compact_task);
557 trivial_tasks.push(compact_task);
558 if trivial_tasks.len() >= self.env.opts.max_trivial_move_task_count_per_loop {
559 break 'outside;
560 }
561 } else {
562 self.calculate_vnode_partition(
563 &mut compact_task,
564 group_config.compaction_config.as_ref(),
565 );
566 let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks) = version
567 .latest_version()
568 .safe_epoch_table_watermarks(&compact_task.existing_table_ids)
569 .into_iter()
570 .partition(|(_table_id, table_watermarke)| {
571 matches!(
572 table_watermarke.watermark_type,
573 WatermarkSerdeType::PkPrefix
574 )
575 });
576
577 compact_task.pk_prefix_table_watermarks = pk_prefix_table_watermarks;
578 compact_task.non_pk_prefix_table_watermarks = non_pk_prefix_table_watermarks;
579
580 compact_task.table_schemas = compact_task
581 .existing_table_ids
582 .iter()
583 .filter_map(|table_id| {
584 let id = (*table_id).try_into().unwrap();
585 all_versioned_table_schemas.get(&id).map(|column_ids| {
586 (
587 *table_id,
588 TableSchema {
589 column_ids: column_ids.clone(),
590 },
591 )
592 })
593 })
594 .collect();
595
596 compact_task_assignment.insert(
597 compact_task.task_id,
598 CompactTaskAssignment {
599 compact_task: Some(compact_task.clone().into()),
600 context_id: META_NODE_ID, },
602 );
603
604 pick_tasks.push(compact_task);
605 break;
606 }
607
608 stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
609 stats = LocalSelectorStatistic::default();
610 }
611 if pick_tasks
612 .last()
613 .map(|task| task.compaction_group_id != compaction_group_id)
614 .unwrap_or(true)
615 {
616 unschedule_groups.push(compaction_group_id);
617 }
618 stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
619 }
620
621 if !trivial_tasks.is_empty() {
622 commit_multi_var!(
623 self.meta_store_ref(),
624 compaction_statuses,
625 compact_task_assignment,
626 version,
627 version_stats
628 )?;
629 self.metrics
630 .compact_task_batch_count
631 .with_label_values(&["batch_trivial_move"])
632 .observe(trivial_tasks.len() as f64);
633
634 for trivial_task in &trivial_tasks {
635 self.metrics
636 .compact_task_trivial_move_sst_count
637 .with_label_values(&[&trivial_task.compaction_group_id.to_string()])
638 .observe(trivial_task.input_ssts[0].table_infos.len() as _);
639 }
640
641 drop(versioning_guard);
642 } else {
643 drop(versioning_guard);
646 commit_multi_var!(
647 self.meta_store_ref(),
648 compaction_statuses,
649 compact_task_assignment
650 )?;
651 }
652 drop(compaction_guard);
653 if !pick_tasks.is_empty() {
654 self.metrics
655 .compact_task_batch_count
656 .with_label_values(&["batch_get_compact_task"])
657 .observe(pick_tasks.len() as f64);
658 }
659
660 for compact_task in &mut pick_tasks {
661 let compaction_group_id = compact_task.compaction_group_id;
662
663 self.compactor_manager
665 .initiate_task_heartbeat(compact_task.clone());
666
667 compact_task.task_status = TaskStatus::Pending;
669 let compact_task_statistics = statistics_compact_task(compact_task);
670
671 let level_type_label = build_compact_task_level_type_metrics_label(
672 compact_task.input_ssts[0].level_idx as usize,
673 compact_task.input_ssts.last().unwrap().level_idx as usize,
674 );
675
676 let level_count = compact_task.input_ssts.len();
677 if compact_task.input_ssts[0].level_idx == 0 {
678 self.metrics
679 .l0_compact_level_count
680 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
681 .observe(level_count as _);
682 }
683
684 self.metrics
685 .compact_task_size
686 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
687 .observe(compact_task_statistics.total_file_size as _);
688
689 self.metrics
690 .compact_task_size
691 .with_label_values(&[
692 &compaction_group_id.to_string(),
693 &format!("{} uncompressed", level_type_label),
694 ])
695 .observe(compact_task_statistics.total_uncompressed_file_size as _);
696
697 self.metrics
698 .compact_task_file_count
699 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
700 .observe(compact_task_statistics.total_file_count as _);
701
702 tracing::trace!(
703 "For compaction group {}: pick up {} {} sub_level in level {} to compact to target {}. cost time: {:?} compact_task_statistics {:?}",
704 compaction_group_id,
705 level_count,
706 compact_task.input_ssts[0].level_type.as_str_name(),
707 compact_task.input_ssts[0].level_idx,
708 compact_task.target_level,
709 start_time.elapsed(),
710 compact_task_statistics
711 );
712 }
713
714 #[cfg(test)]
715 {
716 self.check_state_consistency().await;
717 }
718 pick_tasks.extend(trivial_tasks);
719 Ok((pick_tasks, unschedule_groups))
720 }
721
722 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
724 fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore(
725 anyhow::anyhow!("failpoint metastore err")
726 )));
727 let ret = self
728 .cancel_compact_task_impl(vec![task_id], task_status)
729 .await?;
730 Ok(ret[0])
731 }
732
733 pub async fn cancel_compact_tasks(
734 &self,
735 tasks: Vec<u64>,
736 task_status: TaskStatus,
737 ) -> Result<Vec<bool>> {
738 self.cancel_compact_task_impl(tasks, task_status).await
739 }
740
741 async fn cancel_compact_task_impl(
742 &self,
743 task_ids: Vec<u64>,
744 task_status: TaskStatus,
745 ) -> Result<Vec<bool>> {
746 assert!(CANCEL_STATUS_SET.contains(&task_status));
747 let tasks = task_ids
748 .into_iter()
749 .map(|task_id| ReportTask {
750 task_id,
751 task_status,
752 sorted_output_ssts: vec![],
753 table_stats_change: HashMap::default(),
754 object_timestamps: HashMap::default(),
755 })
756 .collect_vec();
757 let rets = self.report_compact_tasks(tasks).await?;
758 #[cfg(test)]
759 {
760 self.check_state_consistency().await;
761 }
762 Ok(rets)
763 }
764
765 async fn get_compact_tasks(
766 &self,
767 mut compaction_groups: Vec<CompactionGroupId>,
768 max_select_count: usize,
769 selector: &mut Box<dyn CompactionSelector>,
770 ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
771 fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
772 anyhow::anyhow!("failpoint metastore error")
773 )));
774 compaction_groups.shuffle(&mut thread_rng());
775 let (mut tasks, groups) = self
776 .get_compact_tasks_impl(compaction_groups, max_select_count, selector)
777 .await?;
778 tasks.retain(|task| {
779 if task.task_status == TaskStatus::Success {
780 debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
781 false
782 } else {
783 true
784 }
785 });
786 Ok((tasks, groups))
787 }
788
789 pub async fn get_compact_task(
790 &self,
791 compaction_group_id: CompactionGroupId,
792 selector: &mut Box<dyn CompactionSelector>,
793 ) -> Result<Option<CompactTask>> {
794 fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
795 anyhow::anyhow!("failpoint metastore error")
796 )));
797
798 let (normal_tasks, _) = self
799 .get_compact_tasks_impl(vec![compaction_group_id], 1, selector)
800 .await?;
801 for task in normal_tasks {
802 if task.task_status != TaskStatus::Success {
803 return Ok(Some(task));
804 }
805 debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
806 }
807 Ok(None)
808 }
809
810 pub async fn manual_get_compact_task(
811 &self,
812 compaction_group_id: CompactionGroupId,
813 manual_compaction_option: ManualCompactionOption,
814 ) -> Result<Option<CompactTask>> {
815 let mut selector: Box<dyn CompactionSelector> =
816 Box::new(ManualCompactionSelector::new(manual_compaction_option));
817 self.get_compact_task(compaction_group_id, &mut selector)
818 .await
819 }
820
821 pub async fn report_compact_task(
822 &self,
823 task_id: u64,
824 task_status: TaskStatus,
825 sorted_output_ssts: Vec<SstableInfo>,
826 table_stats_change: Option<PbTableStatsMap>,
827 object_timestamps: HashMap<HummockSstableObjectId, u64>,
828 ) -> Result<bool> {
829 let rets = self
830 .report_compact_tasks(vec![ReportTask {
831 task_id,
832 task_status,
833 sorted_output_ssts,
834 table_stats_change: table_stats_change.unwrap_or_default(),
835 object_timestamps,
836 }])
837 .await?;
838 Ok(rets[0])
839 }
840
841 pub async fn report_compact_tasks(&self, report_tasks: Vec<ReportTask>) -> Result<Vec<bool>> {
842 let compaction_guard = self.compaction.write().await;
843 let versioning_guard = self.versioning.write().await;
844
845 self.report_compact_tasks_impl(report_tasks, compaction_guard, versioning_guard)
846 .await
847 }
848
849 pub async fn report_compact_tasks_impl(
857 &self,
858 report_tasks: Vec<ReportTask>,
859 mut compaction_guard: RwLockWriteGuard<'_, Compaction>,
860 mut versioning_guard: RwLockWriteGuard<'_, Versioning>,
861 ) -> Result<Vec<bool>> {
862 let deterministic_mode = self.env.opts.compaction_deterministic_test;
863 let compaction: &mut Compaction = &mut compaction_guard;
864 let start_time = Instant::now();
865 let original_keys = compaction.compaction_statuses.keys().cloned().collect_vec();
866 let mut compact_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
867 let mut rets = vec![false; report_tasks.len()];
868 let mut compact_task_assignment =
869 BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
870 let versioning: &mut Versioning = &mut versioning_guard;
872 let _timer = start_measure_real_process_timer!(self, "report_compact_tasks");
873
874 for group_id in original_keys {
876 if !versioning.current_version.levels.contains_key(&group_id) {
877 compact_statuses.remove(group_id);
878 }
879 }
880 let mut tasks = vec![];
881
882 let mut version = HummockVersionTransaction::new(
883 &mut versioning.current_version,
884 &mut versioning.hummock_version_deltas,
885 self.env.notification_manager(),
886 None,
887 &self.metrics,
888 );
889
890 if deterministic_mode {
891 version.disable_apply_to_txn();
892 }
893
894 let mut version_stats = HummockVersionStatsTransaction::new(
895 &mut versioning.version_stats,
896 self.env.notification_manager(),
897 );
898 let mut success_count = 0;
899 for (idx, task) in report_tasks.into_iter().enumerate() {
900 rets[idx] = true;
901 let mut compact_task = match compact_task_assignment.remove(task.task_id) {
902 Some(compact_task) => CompactTask::from(compact_task.compact_task.unwrap()),
903 None => {
904 tracing::warn!("{}", format!("compact task {} not found", task.task_id));
905 rets[idx] = false;
906 continue;
907 }
908 };
909
910 {
911 compact_task.task_status = task.task_status;
913 compact_task.sorted_output_ssts = task.sorted_output_ssts;
914 }
915
916 match compact_statuses.get_mut(compact_task.compaction_group_id) {
917 Some(mut compact_status) => {
918 compact_status.report_compact_task(&compact_task);
919 }
920 None => {
921 compact_task.task_status = TaskStatus::InvalidGroupCanceled;
927 }
928 }
929
930 let is_success = if let TaskStatus::Success = compact_task.task_status {
931 match self
932 .report_compaction_sanity_check(&task.object_timestamps)
933 .await
934 {
935 Err(e) => {
936 warn!(
937 "failed to commit compaction task {} {}",
938 compact_task.task_id,
939 e.as_report()
940 );
941 compact_task.task_status = TaskStatus::RetentionTimeRejected;
942 false
943 }
944 _ => {
945 let group = version
946 .latest_version()
947 .levels
948 .get(&compact_task.compaction_group_id)
949 .unwrap();
950 let is_expired = compact_task.is_expired(group.compaction_group_version_id);
951 if is_expired {
952 compact_task.task_status = TaskStatus::InputOutdatedCanceled;
953 warn!(
954 "The task may be expired because of group split, task:\n {:?}",
955 compact_task_to_string(&compact_task)
956 );
957 }
958 !is_expired
959 }
960 }
961 } else {
962 false
963 };
964 if is_success {
965 success_count += 1;
966 version.apply_compact_task(&compact_task);
967 if purge_prost_table_stats(&mut version_stats.table_stats, version.latest_version())
968 {
969 self.metrics.version_stats.reset();
970 versioning.local_metrics.clear();
971 }
972 add_prost_table_stats_map(&mut version_stats.table_stats, &task.table_stats_change);
973 trigger_local_table_stat(
974 &self.metrics,
975 &mut versioning.local_metrics,
976 &version_stats,
977 &task.table_stats_change,
978 );
979 }
980 tasks.push(compact_task);
981 }
982 if success_count > 0 {
983 commit_multi_var!(
984 self.meta_store_ref(),
985 compact_statuses,
986 compact_task_assignment,
987 version,
988 version_stats
989 )?;
990
991 self.metrics
992 .compact_task_batch_count
993 .with_label_values(&["batch_report_task"])
994 .observe(success_count as f64);
995 } else {
996 commit_multi_var!(
998 self.meta_store_ref(),
999 compact_statuses,
1000 compact_task_assignment
1001 )?;
1002 }
1003
1004 let mut success_groups = vec![];
1005 for compact_task in &tasks {
1006 self.compactor_manager
1007 .remove_task_heartbeat(compact_task.task_id);
1008 tracing::trace!(
1009 "Reported compaction task. {}. cost time: {:?}",
1010 compact_task_to_string(compact_task),
1011 start_time.elapsed(),
1012 );
1013
1014 if !deterministic_mode
1015 && (matches!(compact_task.task_type, compact_task::TaskType::Dynamic)
1016 || matches!(compact_task.task_type, compact_task::TaskType::Emergency))
1017 {
1018 self.try_send_compaction_request(
1020 compact_task.compaction_group_id,
1021 compact_task::TaskType::Dynamic,
1022 );
1023 }
1024
1025 if compact_task.task_status == TaskStatus::Success {
1026 success_groups.push(compact_task.compaction_group_id);
1027 }
1028 }
1029
1030 trigger_compact_tasks_stat(
1031 &self.metrics,
1032 &tasks,
1033 &compaction.compaction_statuses,
1034 &versioning_guard.current_version,
1035 );
1036 drop(versioning_guard);
1037 if !success_groups.is_empty() {
1038 self.try_update_write_limits(&success_groups).await;
1039 }
1040 Ok(rets)
1041 }
1042
1043 pub async fn trigger_compaction_deterministic(
1046 &self,
1047 _base_version_id: HummockVersionId,
1048 compaction_groups: Vec<CompactionGroupId>,
1049 ) -> Result<()> {
1050 self.on_current_version(|old_version| {
1051 tracing::info!(
1052 "Trigger compaction for version {}, groups {:?}",
1053 old_version.id,
1054 compaction_groups
1055 );
1056 })
1057 .await;
1058
1059 if compaction_groups.is_empty() {
1060 return Ok(());
1061 }
1062 for compaction_group in compaction_groups {
1063 self.try_send_compaction_request(compaction_group, compact_task::TaskType::Dynamic);
1064 }
1065 Ok(())
1066 }
1067
1068 pub async fn trigger_manual_compaction(
1069 &self,
1070 compaction_group: CompactionGroupId,
1071 manual_compaction_option: ManualCompactionOption,
1072 ) -> Result<()> {
1073 let start_time = Instant::now();
1074
1075 let compactor = match self.compactor_manager.next_compactor() {
1077 Some(compactor) => compactor,
1078 None => {
1079 tracing::warn!("trigger_manual_compaction No compactor is available.");
1080 return Err(anyhow::anyhow!(
1081 "trigger_manual_compaction No compactor is available. compaction_group {}",
1082 compaction_group
1083 )
1084 .into());
1085 }
1086 };
1087
1088 let compact_task = self
1090 .manual_get_compact_task(compaction_group, manual_compaction_option)
1091 .await;
1092 let compact_task = match compact_task {
1093 Ok(Some(compact_task)) => compact_task,
1094 Ok(None) => {
1095 return Err(anyhow::anyhow!(
1097 "trigger_manual_compaction No compaction_task is available. compaction_group {}",
1098 compaction_group
1099 )
1100 .into());
1101 }
1102 Err(err) => {
1103 tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
1104
1105 return Err(anyhow::anyhow!(err)
1106 .context(format!(
1107 "Failed to get compaction task for compaction_group {}",
1108 compaction_group,
1109 ))
1110 .into());
1111 }
1112 };
1113
1114 let compact_task_string = compact_task_to_string(&compact_task);
1116 compactor
1118 .send_event(ResponseEvent::CompactTask(compact_task.into()))
1119 .with_context(|| {
1120 format!(
1121 "Failed to trigger compaction task for compaction_group {}",
1122 compaction_group,
1123 )
1124 })?;
1125
1126 tracing::info!(
1127 "Trigger manual compaction task. {}. cost time: {:?}",
1128 &compact_task_string,
1129 start_time.elapsed(),
1130 );
1131
1132 Ok(())
1133 }
1134
1135 pub fn try_send_compaction_request(
1137 &self,
1138 compaction_group: CompactionGroupId,
1139 task_type: compact_task::TaskType,
1140 ) -> bool {
1141 match self
1142 .compaction_state
1143 .try_sched_compaction(compaction_group, task_type)
1144 {
1145 Ok(_) => true,
1146 Err(e) => {
1147 tracing::error!(
1148 error = %e.as_report(),
1149 "failed to send compaction request for compaction group {}",
1150 compaction_group,
1151 );
1152 false
1153 }
1154 }
1155 }
1156
1157 pub(crate) fn calculate_vnode_partition(
1158 &self,
1159 compact_task: &mut CompactTask,
1160 compaction_config: &CompactionConfig,
1161 ) {
1162 if compact_task.target_level > compact_task.base_level {
1166 return;
1167 }
1168 if compaction_config.split_weight_by_vnode > 0 {
1169 for table_id in &compact_task.existing_table_ids {
1170 compact_task
1171 .table_vnode_partition
1172 .insert(*table_id, compact_task.split_weight_by_vnode);
1173 }
1174 } else {
1175 let mut table_size_info: HashMap<u32, u64> = HashMap::default();
1176 let mut existing_table_ids: HashSet<u32> = HashSet::default();
1177 for input_ssts in &compact_task.input_ssts {
1178 for sst in &input_ssts.table_infos {
1179 existing_table_ids.extend(sst.table_ids.iter());
1180 for table_id in &sst.table_ids {
1181 *table_size_info.entry(*table_id).or_default() +=
1182 sst.sst_size / (sst.table_ids.len() as u64);
1183 }
1184 }
1185 }
1186 compact_task
1187 .existing_table_ids
1188 .retain(|table_id| existing_table_ids.contains(table_id));
1189
1190 let hybrid_vnode_count = self.env.opts.hybrid_partition_node_count;
1191 let default_partition_count = self.env.opts.partition_vnode_count;
1192 let compact_task_table_size_partition_threshold_low = self
1194 .env
1195 .opts
1196 .compact_task_table_size_partition_threshold_low;
1197 let compact_task_table_size_partition_threshold_high = self
1198 .env
1199 .opts
1200 .compact_task_table_size_partition_threshold_high;
1201 let table_write_throughput_statistic_manager =
1203 self.table_write_throughput_statistic_manager.read();
1204 let timestamp = chrono::Utc::now().timestamp();
1205 for (table_id, compact_table_size) in table_size_info {
1206 let write_throughput = table_write_throughput_statistic_manager
1207 .get_table_throughput_descending(table_id, timestamp)
1208 .peekable()
1209 .peek()
1210 .map(|item| item.throughput)
1211 .unwrap_or(0);
1212 if compact_table_size > compact_task_table_size_partition_threshold_high
1213 && default_partition_count > 0
1214 {
1215 compact_task
1216 .table_vnode_partition
1217 .insert(table_id, default_partition_count);
1218 } else if (compact_table_size > compact_task_table_size_partition_threshold_low
1219 || (write_throughput > self.env.opts.table_high_write_throughput_threshold
1220 && compact_table_size > compaction_config.target_file_size_base))
1221 && hybrid_vnode_count > 0
1222 {
1223 compact_task
1225 .table_vnode_partition
1226 .insert(table_id, hybrid_vnode_count);
1227 } else if compact_table_size > compaction_config.target_file_size_base {
1228 compact_task.table_vnode_partition.insert(table_id, 1);
1230 }
1231 }
1232 compact_task
1233 .table_vnode_partition
1234 .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id));
1235 }
1236 }
1237
1238 pub fn compactor_manager_ref(&self) -> crate::hummock::CompactorManagerRef {
1239 self.compactor_manager.clone()
1240 }
1241}
1242
1243#[cfg(any(test, feature = "test"))]
1244impl HummockManager {
1245 pub async fn compaction_task_from_assignment_for_test(
1246 &self,
1247 task_id: u64,
1248 ) -> Option<CompactTaskAssignment> {
1249 let compaction_guard = self.compaction.read().await;
1250 let assignment_ref = &compaction_guard.compact_task_assignment;
1251 assignment_ref.get(&task_id).cloned()
1252 }
1253
1254 pub async fn report_compact_task_for_test(
1255 &self,
1256 task_id: u64,
1257 compact_task: Option<CompactTask>,
1258 task_status: TaskStatus,
1259 sorted_output_ssts: Vec<SstableInfo>,
1260 table_stats_change: Option<PbTableStatsMap>,
1261 ) -> Result<()> {
1262 if let Some(task) = compact_task {
1263 let mut guard = self.compaction.write().await;
1264 guard.compact_task_assignment.insert(
1265 task_id,
1266 CompactTaskAssignment {
1267 compact_task: Some(task.into()),
1268 context_id: 0,
1269 },
1270 );
1271 }
1272
1273 self.report_compact_tasks(vec![ReportTask {
1276 task_id,
1277 task_status,
1278 sorted_output_ssts,
1279 table_stats_change: table_stats_change.unwrap_or_default(),
1280 object_timestamps: HashMap::default(),
1281 }])
1282 .await?;
1283 Ok(())
1284 }
1285}
1286
1287#[derive(Debug, Default)]
1288pub struct CompactionState {
1289 scheduled: Mutex<HashSet<(CompactionGroupId, compact_task::TaskType)>>,
1290}
1291
1292impl CompactionState {
1293 pub fn new() -> Self {
1294 Self {
1295 scheduled: Default::default(),
1296 }
1297 }
1298
1299 pub fn try_sched_compaction(
1301 &self,
1302 compaction_group: CompactionGroupId,
1303 task_type: TaskType,
1304 ) -> std::result::Result<bool, SendError<CompactionRequestChannelItem>> {
1305 let mut guard = self.scheduled.lock();
1306 let key = (compaction_group, task_type);
1307 if guard.contains(&key) {
1308 return Ok(false);
1309 }
1310 guard.insert(key);
1311 Ok(true)
1312 }
1313
1314 pub fn unschedule(
1315 &self,
1316 compaction_group: CompactionGroupId,
1317 task_type: compact_task::TaskType,
1318 ) {
1319 self.scheduled.lock().remove(&(compaction_group, task_type));
1320 }
1321
1322 pub fn auto_pick_type(&self, group: CompactionGroupId) -> Option<TaskType> {
1323 let guard = self.scheduled.lock();
1324 if guard.contains(&(group, compact_task::TaskType::Dynamic)) {
1325 Some(compact_task::TaskType::Dynamic)
1326 } else if guard.contains(&(group, compact_task::TaskType::SpaceReclaim)) {
1327 Some(compact_task::TaskType::SpaceReclaim)
1328 } else if guard.contains(&(group, compact_task::TaskType::Ttl)) {
1329 Some(compact_task::TaskType::Ttl)
1330 } else if guard.contains(&(group, compact_task::TaskType::Tombstone)) {
1331 Some(compact_task::TaskType::Tombstone)
1332 } else if guard.contains(&(group, compact_task::TaskType::VnodeWatermark)) {
1333 Some(compact_task::TaskType::VnodeWatermark)
1334 } else {
1335 None
1336 }
1337 }
1338}
1339
1340impl Compaction {
1341 pub fn get_compact_task_assignments_by_group_id(
1342 &self,
1343 compaction_group_id: CompactionGroupId,
1344 ) -> Vec<CompactTaskAssignment> {
1345 self.compact_task_assignment
1346 .iter()
1347 .filter_map(|(_, assignment)| {
1348 if assignment
1349 .compact_task
1350 .as_ref()
1351 .is_some_and(|task| task.compaction_group_id == compaction_group_id)
1352 {
1353 Some(CompactTaskAssignment {
1354 compact_task: assignment.compact_task.clone(),
1355 context_id: assignment.context_id,
1356 })
1357 } else {
1358 None
1359 }
1360 })
1361 .collect()
1362 }
1363}
1364
1365#[derive(Clone, Default)]
1366pub struct CompactionGroupStatistic {
1367 pub group_id: CompactionGroupId,
1368 pub group_size: u64,
1369 pub table_statistic: BTreeMap<StateTableId, u64>,
1370 pub compaction_group_config: CompactionGroup,
1371}
1372
1373fn update_table_stats_for_vnode_watermark_trivial_reclaim(
1375 table_stats: &mut PbTableStatsMap,
1376 task: &CompactTask,
1377) {
1378 if task.task_type != TaskType::VnodeWatermark {
1379 return;
1380 }
1381 let mut deleted_table_keys: HashMap<u32, u64> = HashMap::default();
1382 for s in task.input_ssts.iter().flat_map(|l| l.table_infos.iter()) {
1383 assert_eq!(s.table_ids.len(), 1);
1384 let e = deleted_table_keys.entry(s.table_ids[0]).or_insert(0);
1385 *e += s.total_key_count;
1386 }
1387 for (table_id, delete_count) in deleted_table_keys {
1388 let Some(stats) = table_stats.get_mut(&table_id) else {
1389 continue;
1390 };
1391 if stats.total_key_count == 0 {
1392 continue;
1393 }
1394 let new_total_key_count = stats.total_key_count.saturating_sub(delete_count as i64);
1395 let ratio = new_total_key_count as f64 / stats.total_key_count as f64;
1396 stats.total_key_count = new_total_key_count;
1398 stats.total_key_size = (stats.total_key_size as f64 * ratio).ceil() as i64;
1400 stats.total_value_size = (stats.total_value_size as f64 * ratio).ceil() as i64;
1401 }
1402}
1403
1404#[derive(Debug, Clone)]
1405pub enum GroupState {
1406 Normal,
1408
1409 Emergency(String), WriteStop(String), }
1415
1416impl GroupState {
1417 pub fn is_write_stop(&self) -> bool {
1418 matches!(self, Self::WriteStop(_))
1419 }
1420
1421 pub fn is_emergency(&self) -> bool {
1422 matches!(self, Self::Emergency(_))
1423 }
1424
1425 pub fn reason(&self) -> Option<&str> {
1426 match self {
1427 Self::Emergency(reason) | Self::WriteStop(reason) => Some(reason),
1428 _ => None,
1429 }
1430 }
1431}
1432
1433#[derive(Clone, Default)]
1434pub struct GroupStateValidator;
1435
1436impl GroupStateValidator {
1437 pub fn write_stop_sub_level_count(
1438 level_count: usize,
1439 compaction_config: &CompactionConfig,
1440 ) -> bool {
1441 let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize;
1442 level_count > threshold
1443 }
1444
1445 pub fn write_stop_l0_size(l0_size: u64, compaction_config: &CompactionConfig) -> bool {
1446 l0_size
1447 > compaction_config
1448 .level0_stop_write_threshold_max_size
1449 .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1450 }
1451
1452 pub fn write_stop_l0_file_count(
1453 l0_file_count: usize,
1454 compaction_config: &CompactionConfig,
1455 ) -> bool {
1456 l0_file_count
1457 > compaction_config
1458 .level0_stop_write_threshold_max_sst_count
1459 .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1460 as usize
1461 }
1462
1463 pub fn emergency_l0_file_count(
1464 l0_file_count: usize,
1465 compaction_config: &CompactionConfig,
1466 ) -> bool {
1467 l0_file_count
1468 > compaction_config
1469 .emergency_level0_sst_file_count
1470 .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1471 as usize
1472 }
1473
1474 pub fn emergency_l0_partition_count(
1475 last_l0_sub_level_partition_count: usize,
1476 compaction_config: &CompactionConfig,
1477 ) -> bool {
1478 last_l0_sub_level_partition_count
1479 > compaction_config
1480 .emergency_level0_sub_level_partition
1481 .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1482 as usize
1483 }
1484
1485 pub fn check_single_group_write_stop(
1486 levels: &Levels,
1487 compaction_config: &CompactionConfig,
1488 ) -> GroupState {
1489 if Self::write_stop_sub_level_count(levels.l0.sub_levels.len(), compaction_config) {
1490 return GroupState::WriteStop(format!(
1491 "WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels",
1492 levels.l0.sub_levels.len(),
1493 compaction_config.level0_stop_write_threshold_sub_level_number
1494 ));
1495 }
1496
1497 if Self::write_stop_l0_file_count(
1498 levels
1499 .l0
1500 .sub_levels
1501 .iter()
1502 .map(|l| l.table_infos.len())
1503 .sum(),
1504 compaction_config,
1505 ) {
1506 return GroupState::WriteStop(format!(
1507 "WriteStop(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1508 levels
1509 .l0
1510 .sub_levels
1511 .iter()
1512 .map(|l| l.table_infos.len())
1513 .sum::<usize>(),
1514 compaction_config
1515 .level0_stop_write_threshold_max_sst_count
1516 .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1517 ));
1518 }
1519
1520 if Self::write_stop_l0_size(levels.l0.total_file_size, compaction_config) {
1521 return GroupState::WriteStop(format!(
1522 "WriteStop(l0_size: {}, threshold: {}) too large L0 size",
1523 levels.l0.total_file_size,
1524 compaction_config
1525 .level0_stop_write_threshold_max_size
1526 .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1527 ));
1528 }
1529
1530 GroupState::Normal
1531 }
1532
1533 pub fn check_single_group_emergency(
1534 levels: &Levels,
1535 compaction_config: &CompactionConfig,
1536 ) -> GroupState {
1537 if Self::emergency_l0_file_count(
1538 levels
1539 .l0
1540 .sub_levels
1541 .iter()
1542 .map(|l| l.table_infos.len())
1543 .sum(),
1544 compaction_config,
1545 ) {
1546 return GroupState::Emergency(format!(
1547 "Emergency(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1548 levels
1549 .l0
1550 .sub_levels
1551 .iter()
1552 .map(|l| l.table_infos.len())
1553 .sum::<usize>(),
1554 compaction_config
1555 .emergency_level0_sst_file_count
1556 .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1557 ));
1558 }
1559
1560 if Self::emergency_l0_partition_count(
1561 levels
1562 .l0
1563 .sub_levels
1564 .first()
1565 .map(|l| l.table_infos.len())
1566 .unwrap_or(0),
1567 compaction_config,
1568 ) {
1569 return GroupState::Emergency(format!(
1570 "Emergency(l0_partition_count: {}, threshold: {}) too many L0 partitions",
1571 levels
1572 .l0
1573 .sub_levels
1574 .first()
1575 .map(|l| l.table_infos.len())
1576 .unwrap_or(0),
1577 compaction_config
1578 .emergency_level0_sub_level_partition
1579 .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1580 ));
1581 }
1582
1583 GroupState::Normal
1584 }
1585
1586 pub fn group_state(levels: &Levels, compaction_config: &CompactionConfig) -> GroupState {
1587 let state = Self::check_single_group_write_stop(levels, compaction_config);
1588 if state.is_write_stop() {
1589 return state;
1590 }
1591
1592 Self::check_single_group_emergency(levels, compaction_config)
1593 }
1594}