1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17use std::time::Instant;
18
19use anyhow::Context;
20use compaction_event_loop::{
21 HummockCompactionEventDispatcher, HummockCompactionEventHandler, HummockCompactionEventLoop,
22 HummockCompactorDedicatedEventLoop,
23};
24use fail::fail_point;
25use itertools::Itertools;
26use parking_lot::Mutex;
27use rand::rng as thread_rng;
28use rand::seq::SliceRandom;
29use risingwave_common::catalog::TableId;
30use risingwave_common::config::meta::default::compaction_config;
31use risingwave_common::util::epoch::Epoch;
32use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask};
33use risingwave_hummock_sdk::compaction_group::StateTableId;
34use risingwave_hummock_sdk::compaction_group::hummock_version_ext::safe_epoch_table_watermarks_impl;
35use risingwave_hummock_sdk::level::Levels;
36use risingwave_hummock_sdk::sstable_info::SstableInfo;
37use risingwave_hummock_sdk::table_stats::{
38 PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats,
39};
40use risingwave_hummock_sdk::table_watermark::TableWatermarks;
41use risingwave_hummock_sdk::version::{GroupDelta, IntraLevelDelta};
42use risingwave_hummock_sdk::{
43 CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockSstableId,
44 HummockSstableObjectId, HummockVersionId, compact_task_to_string, statistics_compact_task,
45};
46use risingwave_meta_model::hummock_sequence::COMPACTION_TASK_ID;
47use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
48use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
49use risingwave_pb::hummock::{
50 CompactTaskAssignment, CompactionConfig, PbCompactStatus, PbCompactTaskAssignment,
51 SubscribeCompactionEventRequest, TableOption, compact_task,
52};
53use thiserror_ext::AsReport;
54use tokio::sync::RwLockWriteGuard;
55use tokio::sync::mpsc::UnboundedReceiver;
56use tokio::sync::oneshot::{Receiver, Sender};
57use tokio::task::JoinHandle;
58use tonic::Streaming;
59use tracing::warn;
60
61use crate::hummock::compaction::selector::level_selector::PickerInfo;
62use crate::hummock::compaction::selector::{
63 DynamicLevelSelector, DynamicLevelSelectorCore, LocalSelectorStatistic, ManualCompactionOption,
64 ManualCompactionSelector, SpaceReclaimCompactionSelector, TombstoneCompactionSelector,
65 TtlCompactionSelector, VnodeWatermarkCompactionSelector,
66};
67use crate::hummock::compaction::{
68 CompactStatus, CompactionDeveloperConfig, CompactionSelector,
69 CompactionTask as PickedCompactionTask,
70};
71use crate::hummock::error::{Error, Result};
72use crate::hummock::manager::CompactionTaskReportResult;
73use crate::hummock::manager::compaction::compact_task_builder::{
74 CompactTaskBuildContext, attach_compact_task_table_metadata, build_base_compact_task,
75};
76use crate::hummock::manager::transaction::{
77 HummockVersionStatsTransaction, HummockVersionTransaction,
78};
79use crate::hummock::manager::versioning::Versioning;
80use crate::hummock::metrics_utils::{
81 build_compact_task_level_type_metrics_label, trigger_compact_tasks_stat,
82 trigger_local_table_stat,
83};
84use crate::hummock::model::CompactionGroup;
85use crate::hummock::{HummockManager, commit_multi_var, start_measure_real_process_timer};
86use crate::manager::META_NODE_ID;
87use crate::model::BTreeMapTransaction;
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90pub enum ManualCompactionTriggerResult {
91 Submitted,
92 Retry,
93}
94
95mod compact_task_builder;
96pub mod compaction_event_loop;
97pub mod compaction_group_manager;
98pub mod compaction_group_schedule;
99
100static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
101 [
102 TaskStatus::ManualCanceled,
103 TaskStatus::SendFailCanceled,
104 TaskStatus::AssignFailCanceled,
105 TaskStatus::HeartbeatCanceled,
106 TaskStatus::InvalidGroupCanceled,
107 TaskStatus::NoAvailMemoryResourceCanceled,
108 TaskStatus::NoAvailCpuResourceCanceled,
109 TaskStatus::HeartbeatProgressCanceled,
110 ]
111 .into_iter()
112 .collect()
113});
114
115fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> {
116 let mut compaction_selectors: HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> =
117 HashMap::default();
118 compaction_selectors.insert(
119 compact_task::TaskType::Dynamic,
120 Box::<DynamicLevelSelector>::default(),
121 );
122 compaction_selectors.insert(
123 compact_task::TaskType::SpaceReclaim,
124 Box::<SpaceReclaimCompactionSelector>::default(),
125 );
126 compaction_selectors.insert(
127 compact_task::TaskType::Ttl,
128 Box::<TtlCompactionSelector>::default(),
129 );
130 compaction_selectors.insert(
131 compact_task::TaskType::Tombstone,
132 Box::<TombstoneCompactionSelector>::default(),
133 );
134 compaction_selectors.insert(
135 compact_task::TaskType::VnodeWatermark,
136 Box::<VnodeWatermarkCompactionSelector>::default(),
137 );
138 compaction_selectors
139}
140
141enum BuiltCompactTask {
142 MetaFinished(CompactTask),
143 PendingAssignment(CompactTask),
144}
145
146impl HummockVersionTransaction<'_> {
147 fn apply_compact_task(&mut self, compact_task: &CompactTask) {
148 let mut version_delta = self.new_delta();
149 let trivial_move = compact_task.is_trivial_move_task();
150 version_delta.trivial_move = trivial_move;
151
152 let group_deltas = &mut version_delta
153 .group_deltas
154 .entry(compact_task.compaction_group_id)
155 .or_default()
156 .group_deltas;
157 let mut removed_table_ids_map: BTreeMap<u32, HashSet<HummockSstableId>> =
158 BTreeMap::default();
159
160 for level in &compact_task.input_ssts {
161 let level_idx = level.level_idx;
162
163 removed_table_ids_map
164 .entry(level_idx)
165 .or_default()
166 .extend(level.table_infos.iter().map(|sst| sst.sst_id));
167 }
168
169 for (level_idx, removed_table_ids) in removed_table_ids_map {
170 let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
171 level_idx,
172 0, removed_table_ids,
174 vec![], 0, compact_task.compaction_group_version_id,
177 ));
178
179 group_deltas.push(group_delta);
180 }
181
182 let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
183 compact_task.target_level,
184 compact_task.target_sub_level_id,
185 HashSet::new(), compact_task.sorted_output_ssts.clone(),
187 compact_task.split_weight_by_vnode,
188 compact_task.compaction_group_version_id,
189 ));
190
191 group_deltas.push(group_delta);
192 version_delta.pre_apply();
193 }
194}
195
196#[derive(Default)]
197pub struct Compaction {
198 pub compact_task_assignment: BTreeMap<HummockCompactionTaskId, PbCompactTaskAssignment>,
200 pub compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus>,
202
203 pub _deterministic_mode: bool,
204}
205
206impl HummockManager {
207 pub async fn get_assigned_compact_task_num(&self) -> u64 {
208 self.compaction.read().await.compact_task_assignment.len() as u64
209 }
210
211 pub async fn list_compaction_status(
212 &self,
213 ) -> (Vec<PbCompactStatus>, Vec<CompactTaskAssignment>) {
214 let compaction = self.compaction.read().await;
215 (
216 compaction.compaction_statuses.values().map_into().collect(),
217 compaction
218 .compact_task_assignment
219 .values()
220 .cloned()
221 .collect(),
222 )
223 }
224
225 pub async fn get_compaction_scores(
226 &self,
227 compaction_group_id: CompactionGroupId,
228 ) -> Vec<PickerInfo> {
229 let (status, levels, group) = {
230 let compaction = self.compaction.read().await;
231 let versioning = self.versioning.read().await;
232 let config_manager = self.compaction_group_manager.read().await;
233 match (
234 compaction.compaction_statuses.get(&compaction_group_id),
235 versioning.current_version.levels.get(&compaction_group_id),
236 config_manager.try_get_compaction_group_config(compaction_group_id),
237 ) {
238 (Some(cs), Some(v), Some(cf)) => (cs.to_owned(), v.to_owned(), cf),
239 _ => {
240 return vec![];
241 }
242 }
243 };
244 let dynamic_level_core = DynamicLevelSelectorCore::new(
245 group.compaction_config,
246 Arc::new(CompactionDeveloperConfig::default()),
247 );
248 let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers);
249 ctx.score_levels
250 }
251}
252
253impl HummockManager {
254 pub fn compaction_event_loop(
255 hummock_manager: Arc<Self>,
256 compactor_streams_change_rx: UnboundedReceiver<(
257 HummockContextId,
258 Streaming<SubscribeCompactionEventRequest>,
259 )>,
260 ) -> Vec<(JoinHandle<()>, Sender<()>)> {
261 let mut join_handle_vec = Vec::default();
262
263 let hummock_compaction_event_handler =
264 HummockCompactionEventHandler::new(hummock_manager.clone());
265
266 let dedicated_event_loop = HummockCompactorDedicatedEventLoop::new(
267 hummock_manager.clone(),
268 hummock_compaction_event_handler.clone(),
269 );
270
271 let (dedicated_event_loop_join_handle, event_tx, shutdown_tx) = dedicated_event_loop.run();
272 join_handle_vec.push((dedicated_event_loop_join_handle, shutdown_tx));
273
274 let hummock_compaction_event_dispatcher = HummockCompactionEventDispatcher::new(
275 hummock_manager.env.opts.clone(),
276 hummock_compaction_event_handler,
277 Some(event_tx),
278 );
279
280 let event_loop = HummockCompactionEventLoop::new(
281 hummock_compaction_event_dispatcher,
282 hummock_manager.metrics.clone(),
283 compactor_streams_change_rx,
284 );
285
286 let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
287 join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
288
289 join_handle_vec
290 }
291
292 pub fn add_compactor_stream(
293 &self,
294 context_id: HummockContextId,
295 req_stream: Streaming<SubscribeCompactionEventRequest>,
296 ) {
297 self.compactor_streams_change_tx
298 .send((context_id, req_stream))
299 .unwrap();
300 }
301}
302
303impl HummockManager {
304 async fn next_compaction_task_id_with_prefetch(&self, refill_capacity: u32) -> Result<u64> {
307 self.prefetched_compaction_task_ids
308 .next(refill_capacity, |count| async move {
309 self.env
310 .hummock_seq
311 .next_interval(COMPACTION_TASK_ID, count)
312 .await
313 })
314 .await
315 }
316
317 pub async fn get_compact_tasks_impl(
318 &self,
319 compaction_groups: Vec<CompactionGroupId>,
320 max_select_count: usize,
321 selector: &mut dyn CompactionSelector,
322 ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
323 let deterministic_mode = self.env.opts.compaction_deterministic_test;
324
325 let mut compaction_guard = self.compaction.write().await;
326 let compaction: &mut Compaction = &mut compaction_guard;
327 let mut versioning_guard = self.versioning.write().await;
328 let versioning: &mut Versioning = &mut versioning_guard;
329
330 let _timer = start_measure_real_process_timer!(self, "get_compact_tasks_impl");
331
332 let start_time = Instant::now();
333 let mut compaction_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
334
335 let mut compact_task_assignment =
336 BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
337
338 let mut version = HummockVersionTransaction::new(
339 &mut versioning.current_version,
340 &mut versioning.hummock_version_deltas,
341 &mut versioning.table_change_log,
342 self.env.notification_manager(),
343 None,
344 &self.metrics,
345 &self.env.opts,
346 );
347 let mut version_stats = HummockVersionStatsTransaction::new(
349 &mut versioning.version_stats,
350 self.env.notification_manager(),
351 );
352
353 if deterministic_mode {
354 version.disable_apply_to_txn();
355 }
356 let all_versioned_table_schemas = if self.env.opts.enable_dropped_column_reclaim {
357 self.metadata_manager
358 .catalog_controller
359 .get_versioned_table_schemas()
360 .await
361 .map_err(|e| Error::Internal(e.into()))?
362 } else {
363 HashMap::default()
364 };
365 let mut unschedule_groups = vec![];
366 let mut trivial_tasks = vec![];
367 let mut pick_tasks = vec![];
368 let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts(
369 &self.env.opts,
370 ));
371 'outside: for compaction_group_id in compaction_groups {
376 if pick_tasks.len() >= max_select_count {
377 break;
378 }
379
380 if !version
381 .latest_version()
382 .levels
383 .contains_key(&compaction_group_id)
384 {
385 continue;
386 }
387
388 let group_config = {
392 let config_manager = self.compaction_group_manager.read().await;
393
394 match config_manager.try_get_compaction_group_config(compaction_group_id) {
395 Some(config) => config,
396 None => continue,
397 }
398 };
399
400 let task_id = self
402 .next_compaction_task_id_with_prefetch(
403 self.env.opts.compaction_task_id_refill_capacity,
404 )
405 .await?;
406
407 if !compaction_statuses.contains_key(&compaction_group_id) {
408 compaction_statuses.insert(
410 compaction_group_id,
411 CompactStatus::new(
412 compaction_group_id,
413 group_config.compaction_config.max_level,
414 ),
415 );
416 }
417 let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap();
418
419 let mut stats = LocalSelectorStatistic::default();
420 let member_table_ids: Vec<_> = version
421 .latest_version()
422 .state_table_info
423 .compaction_group_member_table_ids(compaction_group_id)
424 .iter()
425 .copied()
426 .collect();
427
428 let mut table_id_to_option: HashMap<TableId, _> = HashMap::default();
429
430 {
431 let guard = self.table_id_to_table_option.read();
432 for table_id in &member_table_ids {
433 if let Some(opts) = guard.get(table_id) {
434 table_id_to_option.insert(*table_id, *opts);
435 }
436 }
437 }
438
439 while let Some(picked_task) = compact_status.get_compact_task(
440 version
441 .latest_version()
442 .get_compaction_group_levels(compaction_group_id),
443 version
444 .latest_version()
445 .state_table_info
446 .compaction_group_member_table_ids(compaction_group_id),
447 task_id as HummockCompactionTaskId,
448 &group_config,
449 &mut stats,
450 selector,
451 &table_id_to_option,
452 developer_config.clone(),
453 &version.latest_version().table_watermarks,
454 &version.latest_version().state_table_info,
455 ) {
456 let compaction_group_levels = version
457 .latest_version()
458 .get_compaction_group_levels(compaction_group_id);
459 let target_level_id = picked_task.input.target_level as u32;
460 let is_target_level_last = compaction_group_levels.is_last_level(target_level_id);
461 let table_options = table_id_to_option
462 .iter()
463 .map(|(table_id, table_option)| (*table_id, TableOption::from(table_option)))
464 .collect();
465 let built_compact_task = self.build_ready_compact_task(
466 picked_task,
467 CompactTaskBuildContext {
468 task_id,
469 compaction_group_id: group_config.group_id,
470 compaction_group_version_id: compaction_group_levels
471 .compaction_group_version_id,
472 existing_table_ids: member_table_ids.clone(),
473 table_options,
474 is_target_level_last,
475 compaction_config: group_config.compaction_config.clone(),
476 current_epoch_time: Epoch::now().0,
477 },
478 &version.latest_version().table_watermarks,
479 &all_versioned_table_schemas,
480 );
481
482 match built_compact_task {
483 BuiltCompactTask::MetaFinished(compact_task) => {
484 let label = compact_task.task_label();
485 tracing::debug!(
486 "{} for compaction group {}: input: {:?}, cost time: {:?}",
487 label,
488 compact_task.compaction_group_id,
489 compact_task.input_ssts,
490 start_time.elapsed()
491 );
492 compact_status.report_compact_task(&compact_task);
493 update_table_stats_for_vnode_watermark_trivial_reclaim(
494 &mut version_stats.table_stats,
495 &compact_task,
496 );
497 self.metrics
498 .compact_frequency
499 .with_label_values(&[
500 label,
501 &compact_task.compaction_group_id.to_string(),
502 selector.task_type().as_str_name(),
503 "SUCCESS",
504 ])
505 .inc();
506
507 version.apply_compact_task(&compact_task);
508 trivial_tasks.push(compact_task);
509 if trivial_tasks.len() >= self.env.opts.max_trivial_move_task_count_per_loop
510 {
511 break 'outside;
512 }
513 }
514 BuiltCompactTask::PendingAssignment(compact_task) => {
515 compact_task_assignment.insert(
516 compact_task.task_id,
517 CompactTaskAssignment {
518 compact_task: Some(compact_task.clone().into()),
519 context_id: META_NODE_ID, },
521 );
522
523 pick_tasks.push(compact_task);
524 break;
525 }
526 }
527
528 stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
529 stats = LocalSelectorStatistic::default();
530 }
531 if pick_tasks
532 .last()
533 .map(|task| task.compaction_group_id != compaction_group_id)
534 .unwrap_or(true)
535 {
536 unschedule_groups.push(compaction_group_id);
537 }
538 stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
539 }
540
541 if !trivial_tasks.is_empty() {
542 commit_multi_var!(
543 self.meta_store_ref(),
544 compaction_statuses,
545 compact_task_assignment,
546 version,
547 version_stats
548 )?;
549 self.metrics
550 .compact_task_batch_count
551 .with_label_values(&["batch_trivial_move"])
552 .observe(trivial_tasks.len() as f64);
553
554 for trivial_task in &trivial_tasks {
555 self.metrics
556 .compact_task_trivial_move_sst_count
557 .with_label_values(&[&trivial_task.compaction_group_id.to_string()])
558 .observe(trivial_task.input_ssts[0].table_infos.len() as _);
559 }
560
561 drop(versioning_guard);
562 } else {
563 drop(versioning_guard);
566 commit_multi_var!(
567 self.meta_store_ref(),
568 compaction_statuses,
569 compact_task_assignment
570 )?;
571 }
572 drop(compaction_guard);
573 if !pick_tasks.is_empty() {
574 self.metrics
575 .compact_task_batch_count
576 .with_label_values(&["batch_get_compact_task"])
577 .observe(pick_tasks.len() as f64);
578 }
579
580 for compact_task in &mut pick_tasks {
581 let compaction_group_id = compact_task.compaction_group_id;
582
583 self.compactor_manager
585 .initiate_task_heartbeat(compact_task.clone());
586
587 compact_task.task_status = TaskStatus::Pending;
589 let compact_task_statistics = statistics_compact_task(compact_task);
590
591 let level_type_label = build_compact_task_level_type_metrics_label(
592 compact_task.input_ssts[0].level_idx as usize,
593 compact_task.input_ssts.last().unwrap().level_idx as usize,
594 );
595
596 let level_count = compact_task.input_ssts.len();
597 if compact_task.input_ssts[0].level_idx == 0 {
598 self.metrics
599 .l0_compact_level_count
600 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
601 .observe(level_count as _);
602 }
603
604 self.metrics
605 .compact_task_size
606 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
607 .observe(compact_task_statistics.total_file_size as _);
608
609 self.metrics
610 .compact_task_size
611 .with_label_values(&[
612 &compaction_group_id.to_string(),
613 &format!("{} uncompressed", level_type_label),
614 ])
615 .observe(compact_task_statistics.total_uncompressed_file_size as _);
616
617 self.metrics
618 .compact_task_file_count
619 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
620 .observe(compact_task_statistics.total_file_count as _);
621
622 tracing::trace!(
623 "For compaction group {}: pick up {} {} sub_level in level {} to compact to target {}. cost time: {:?} compact_task_statistics {:?}",
624 compaction_group_id,
625 level_count,
626 compact_task.input_ssts[0].level_type.as_str_name(),
627 compact_task.input_ssts[0].level_idx,
628 compact_task.target_level,
629 start_time.elapsed(),
630 compact_task_statistics
631 );
632 }
633
634 #[cfg(test)]
635 {
636 self.check_state_consistency().await;
637 }
638 pick_tasks.extend(trivial_tasks);
639 Ok((pick_tasks, unschedule_groups))
640 }
641
642 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
644 fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore(
645 anyhow::anyhow!("failpoint metastore err")
646 )));
647 let ret = self
648 .cancel_compact_task_impl(vec![task_id], task_status)
649 .await?;
650 Ok(ret[0])
651 }
652
653 pub async fn cancel_compact_tasks(
654 &self,
655 tasks: Vec<u64>,
656 task_status: TaskStatus,
657 ) -> Result<Vec<bool>> {
658 self.cancel_compact_task_impl(tasks, task_status).await
659 }
660
661 async fn cancel_compact_task_impl(
662 &self,
663 task_ids: Vec<u64>,
664 task_status: TaskStatus,
665 ) -> Result<Vec<bool>> {
666 assert!(CANCEL_STATUS_SET.contains(&task_status));
667 let tasks = task_ids
668 .into_iter()
669 .map(|task_id| ReportTask {
670 task_id,
671 task_status,
672 sorted_output_ssts: vec![],
673 table_stats_change: HashMap::default(),
674 object_timestamps: HashMap::default(),
675 })
676 .collect_vec();
677 let rets = self.report_compact_tasks(tasks).await?;
678 #[cfg(test)]
679 {
680 self.check_state_consistency().await;
681 }
682 Ok(rets)
683 }
684
685 async fn get_compact_tasks(
686 &self,
687 mut compaction_groups: Vec<CompactionGroupId>,
688 max_select_count: usize,
689 selector: &mut dyn CompactionSelector,
690 ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
691 fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
692 anyhow::anyhow!("failpoint metastore error")
693 )));
694 compaction_groups.shuffle(&mut thread_rng());
695 let (mut tasks, groups) = self
696 .get_compact_tasks_impl(compaction_groups, max_select_count, selector)
697 .await?;
698 tasks.retain(|task| {
699 if task.task_status == TaskStatus::Success {
700 debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
701 false
702 } else {
703 true
704 }
705 });
706 Ok((tasks, groups))
707 }
708
709 pub async fn get_compact_task(
710 &self,
711 compaction_group_id: CompactionGroupId,
712 selector: &mut dyn CompactionSelector,
713 ) -> Result<Option<CompactTask>> {
714 fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
715 anyhow::anyhow!("failpoint metastore error")
716 )));
717
718 let (normal_tasks, _) = self
719 .get_compact_tasks_impl(vec![compaction_group_id], 1, selector)
720 .await?;
721 for task in normal_tasks {
722 if task.task_status != TaskStatus::Success {
723 return Ok(Some(task));
724 }
725 debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
726 }
727 Ok(None)
728 }
729
730 pub async fn manual_get_compact_task(
731 &self,
732 compaction_group_id: CompactionGroupId,
733 manual_compaction_option: ManualCompactionOption,
734 ) -> Result<Option<CompactTask>> {
735 let (task, _) = self
736 .manual_get_compact_task_with_info(compaction_group_id, manual_compaction_option)
737 .await?;
738 Ok(task)
739 }
740
741 pub async fn manual_get_compact_task_with_info(
742 &self,
743 compaction_group_id: CompactionGroupId,
744 manual_compaction_option: ManualCompactionOption,
745 ) -> Result<(Option<CompactTask>, bool)> {
746 let mut selector = ManualCompactionSelector::new(manual_compaction_option);
747 let task = self
748 .get_compact_task(compaction_group_id, &mut selector)
749 .await?;
750 Ok((task, selector.blocked_by_pending()))
751 }
752
753 pub async fn report_compact_task(
754 &self,
755 task_id: u64,
756 task_status: TaskStatus,
757 sorted_output_ssts: Vec<SstableInfo>,
758 table_stats_change: Option<PbTableStatsMap>,
759 object_timestamps: HashMap<HummockSstableObjectId, u64>,
760 ) -> Result<bool> {
761 let rets = self
762 .report_compact_tasks(vec![ReportTask {
763 task_id,
764 task_status,
765 sorted_output_ssts,
766 table_stats_change: table_stats_change.unwrap_or_default(),
767 object_timestamps,
768 }])
769 .await?;
770 Ok(rets[0])
771 }
772
773 pub async fn report_compact_tasks(&self, report_tasks: Vec<ReportTask>) -> Result<Vec<bool>> {
774 let compaction_guard = self.compaction.write().await;
775 let versioning_guard = self.versioning.write().await;
776
777 self.report_compact_tasks_impl(report_tasks, compaction_guard, versioning_guard)
778 .await
779 }
780
781 pub async fn report_compact_tasks_impl(
789 &self,
790 report_tasks: Vec<ReportTask>,
791 mut compaction_guard: RwLockWriteGuard<'_, Compaction>,
792 mut versioning_guard: RwLockWriteGuard<'_, Versioning>,
793 ) -> Result<Vec<bool>> {
794 let deterministic_mode = self.env.opts.compaction_deterministic_test;
795 let compaction: &mut Compaction = &mut compaction_guard;
796 let start_time = Instant::now();
797 let original_keys = compaction.compaction_statuses.keys().cloned().collect_vec();
798 let mut compact_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
799 let mut rets = vec![false; report_tasks.len()];
800 let mut compact_task_assignment =
801 BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
802 let versioning: &mut Versioning = &mut versioning_guard;
804 let _timer = start_measure_real_process_timer!(self, "report_compact_tasks");
805
806 for group_id in original_keys {
808 if !versioning.current_version.levels.contains_key(&group_id) {
809 compact_statuses.remove(group_id);
810 }
811 }
812 let mut tasks = vec![];
813
814 let mut version = HummockVersionTransaction::new(
815 &mut versioning.current_version,
816 &mut versioning.hummock_version_deltas,
817 &mut versioning.table_change_log,
818 self.env.notification_manager(),
819 None,
820 &self.metrics,
821 &self.env.opts,
822 );
823
824 if deterministic_mode {
825 version.disable_apply_to_txn();
826 }
827
828 let mut version_stats = HummockVersionStatsTransaction::new(
829 &mut versioning.version_stats,
830 self.env.notification_manager(),
831 );
832 let mut success_count = 0;
833 let mut report_results = Vec::with_capacity(rets.len());
834 for (idx, task) in report_tasks.into_iter().enumerate() {
835 rets[idx] = true;
836 let task_id = task.task_id;
837 let mut task_status = task.task_status;
838 let mut compact_task = match compact_task_assignment.remove(task.task_id) {
839 Some(compact_task) => CompactTask::from(compact_task.compact_task.unwrap()),
840 None => {
841 tracing::warn!("{}", format!("compact task {} not found", task.task_id));
842 rets[idx] = false;
843 report_results.push(CompactionTaskReportResult {
844 task_id,
845 task_status,
846 reported: false,
847 });
848 continue;
849 }
850 };
851
852 {
853 compact_task.task_status = task.task_status;
855 compact_task.sorted_output_ssts = task.sorted_output_ssts;
856 }
857
858 match compact_statuses.get_mut(compact_task.compaction_group_id) {
859 Some(mut compact_status) => {
860 compact_status.report_compact_task(&compact_task);
861 }
862 None => {
863 compact_task.task_status = TaskStatus::InvalidGroupCanceled;
869 }
870 }
871
872 let is_success = if let TaskStatus::Success = compact_task.task_status {
873 match self
874 .report_compaction_sanity_check(&task.object_timestamps)
875 .await
876 {
877 Err(e) => {
878 warn!(
879 "failed to commit compaction task {} {}",
880 compact_task.task_id,
881 e.as_report()
882 );
883 compact_task.task_status = TaskStatus::RetentionTimeRejected;
884 false
885 }
886 _ => {
887 let group = version
888 .latest_version()
889 .levels
890 .get(&compact_task.compaction_group_id)
891 .unwrap();
892 let is_expired = compact_task.is_expired(group.compaction_group_version_id);
893 if is_expired {
894 compact_task.task_status = TaskStatus::InputOutdatedCanceled;
895 warn!(
896 "The task may be expired because of group split, task:\n {:?}",
897 compact_task_to_string(&compact_task)
898 );
899 }
900 !is_expired
901 }
902 }
903 } else {
904 false
905 };
906 if is_success {
907 success_count += 1;
908 version.apply_compact_task(&compact_task);
909 if purge_prost_table_stats(
910 &mut version_stats.table_stats,
911 version.latest_version(),
912 &HashSet::default(),
913 ) {
914 self.metrics.version_stats.reset();
915 versioning.local_metrics.clear();
916 }
917 add_prost_table_stats_map(&mut version_stats.table_stats, &task.table_stats_change);
918 trigger_local_table_stat(
919 &self.metrics,
920 &mut versioning.local_metrics,
921 &version_stats,
922 &task.table_stats_change,
923 );
924 }
925 task_status = compact_task.task_status;
926 report_results.push(CompactionTaskReportResult {
927 task_id,
928 task_status,
929 reported: rets[idx],
930 });
931 tasks.push(compact_task);
932 }
933 if success_count > 0 {
934 commit_multi_var!(
935 self.meta_store_ref(),
936 compact_statuses,
937 compact_task_assignment,
938 version,
939 version_stats
940 )?;
941
942 self.metrics
943 .compact_task_batch_count
944 .with_label_values(&["batch_report_task"])
945 .observe(success_count as f64);
946 } else {
947 commit_multi_var!(
949 self.meta_store_ref(),
950 compact_statuses,
951 compact_task_assignment
952 )?;
953 }
954
955 self.notify_compaction_task_report_waiters(report_results);
956
957 let mut success_groups = vec![];
958 for compact_task in &tasks {
959 self.compactor_manager
960 .remove_task_heartbeat(compact_task.task_id);
961 tracing::trace!(
962 "Reported compaction task. {}. cost time: {:?}",
963 compact_task_to_string(compact_task),
964 start_time.elapsed(),
965 );
966
967 if !deterministic_mode
968 && (matches!(compact_task.task_type, compact_task::TaskType::Dynamic)
969 || matches!(compact_task.task_type, compact_task::TaskType::Emergency))
970 {
971 self.try_send_compaction_request(
973 compact_task.compaction_group_id,
974 compact_task::TaskType::Dynamic,
975 );
976 }
977
978 if compact_task.task_status == TaskStatus::Success {
979 success_groups.push(compact_task.compaction_group_id);
980 }
981 }
982
983 trigger_compact_tasks_stat(
984 &self.metrics,
985 &tasks,
986 &compaction.compaction_statuses,
987 &versioning_guard.current_version,
988 );
989 drop(versioning_guard);
990 if !success_groups.is_empty() {
991 self.try_update_write_limits(&success_groups).await;
992 }
993 Ok(rets)
994 }
995
996 pub async fn trigger_compaction_deterministic(
999 &self,
1000 _base_version_id: HummockVersionId,
1001 compaction_groups: Vec<CompactionGroupId>,
1002 ) -> Result<()> {
1003 self.on_current_version(|old_version| {
1004 tracing::info!(
1005 "Trigger compaction for version {}, groups {:?}",
1006 old_version.id,
1007 compaction_groups
1008 );
1009 })
1010 .await;
1011
1012 if compaction_groups.is_empty() {
1013 return Ok(());
1014 }
1015 for compaction_group in compaction_groups {
1016 self.try_send_compaction_request(compaction_group, compact_task::TaskType::Dynamic);
1017 }
1018 Ok(())
1019 }
1020
1021 pub async fn trigger_manual_compaction(
1022 &self,
1023 compaction_group: CompactionGroupId,
1024 manual_compaction_option: ManualCompactionOption,
1025 ) -> Result<ManualCompactionTriggerResult> {
1026 let start_time = Instant::now();
1027 let exclusive = manual_compaction_option.exclusive;
1028
1029 let compactor = match self.compactor_manager.next_compactor() {
1031 Some(compactor) => compactor,
1032 None => {
1033 tracing::warn!("trigger_manual_compaction No compactor is available.");
1034 return Err(anyhow::anyhow!(
1035 "trigger_manual_compaction No compactor is available. compaction_group {}",
1036 compaction_group
1037 )
1038 .into());
1039 }
1040 };
1041
1042 let compact_task = self
1044 .manual_get_compact_task_with_info(compaction_group, manual_compaction_option)
1045 .await;
1046 let (compact_task, blocked_by_pending) = match compact_task {
1047 Ok((compact_task, blocked_by_pending)) => (compact_task, blocked_by_pending),
1048 Err(err) => {
1049 tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
1050
1051 return Err(anyhow::anyhow!(err)
1052 .context(format!(
1053 "Failed to get compaction task for compaction_group {}",
1054 compaction_group,
1055 ))
1056 .into());
1057 }
1058 };
1059 let compact_task = match compact_task {
1060 Some(compact_task) => compact_task,
1061 None => {
1062 if exclusive && blocked_by_pending {
1063 return Ok(ManualCompactionTriggerResult::Retry);
1064 }
1065 return Err(anyhow::anyhow!(
1067 "trigger_manual_compaction No compaction_task is available. compaction_group {}",
1068 compaction_group
1069 )
1070 .into());
1071 }
1072 };
1073
1074 let task_id = compact_task.task_id;
1076 let compact_task_string = compact_task_to_string(&compact_task);
1077 tracing::info!(
1078 compact_task_string,
1079 duration = ?start_time.elapsed(),
1080 "Triggered manual compaction task."
1081 );
1082
1083 let report_rx = self.register_compaction_task_report_waiter(task_id);
1084 if let Err(err) = compactor
1085 .send_event(ResponseEvent::CompactTask(compact_task.into()))
1086 .with_context(|| {
1087 format!(
1088 "Failed to trigger compaction task for compaction_group {}",
1089 compaction_group,
1090 )
1091 })
1092 {
1093 self.remove_compaction_task_report_waiter(task_id);
1094 return Err(err.into());
1095 }
1096
1097 let report_result = match report_rx.await {
1098 Ok(result) => result,
1099 Err(_) => {
1100 self.remove_compaction_task_report_waiter(task_id);
1101 return Err(anyhow::anyhow!(
1102 "trigger_manual_compaction wait report failed. compaction_group {}",
1103 compaction_group
1104 )
1105 .into());
1106 }
1107 };
1108 if !report_result.reported {
1109 return Err(anyhow::anyhow!(
1110 "trigger_manual_compaction report not accepted. task_id {}",
1111 report_result.task_id
1112 )
1113 .into());
1114 }
1115
1116 if report_result.task_status == TaskStatus::NoAvailCpuResourceCanceled
1117 || report_result.task_status == TaskStatus::NoAvailMemoryResourceCanceled
1118 {
1119 return Ok(ManualCompactionTriggerResult::Retry);
1120 }
1121
1122 tracing::info!(
1123 ?report_result,
1124 duration = ?start_time.elapsed(),
1125 "Completed manual compaction task."
1126 );
1127
1128 Ok(ManualCompactionTriggerResult::Submitted)
1129 }
1130
1131 pub fn try_send_compaction_request(
1133 &self,
1134 compaction_group: CompactionGroupId,
1135 task_type: compact_task::TaskType,
1136 ) -> bool {
1137 self.compaction_state.try_sched_compaction(
1138 compaction_group,
1139 task_type,
1140 ScheduleTrigger::NewData,
1141 )
1142 }
1143
1144 fn apply_split_weight_by_vnode_partition(
1147 &self,
1148 compact_task: &mut CompactTask,
1149 compaction_config: &CompactionConfig,
1150 compact_table_ids: &[TableId],
1151 ) {
1152 if compaction_config.split_weight_by_vnode > 0 {
1153 for table_id in compact_table_ids {
1154 compact_task
1155 .table_vnode_partition
1156 .insert(*table_id, compact_task.split_weight_by_vnode);
1157 }
1158
1159 return;
1160 }
1161
1162 let mut table_size_info: HashMap<TableId, u64> = HashMap::default();
1164 for input_ssts in &compact_task.input_ssts {
1165 for sst in &input_ssts.table_infos {
1166 for table_id in &sst.table_ids {
1167 *table_size_info.entry(*table_id).or_default() +=
1168 sst.sst_size / (sst.table_ids.len() as u64);
1169 }
1170 }
1171 }
1172
1173 let hybrid_vnode_count = self.env.opts.hybrid_partition_node_count;
1174 let default_partition_count = self.env.opts.partition_vnode_count;
1175 let compact_task_table_size_partition_threshold_low = self
1176 .env
1177 .opts
1178 .compact_task_table_size_partition_threshold_low;
1179 let compact_task_table_size_partition_threshold_high = self
1180 .env
1181 .opts
1182 .compact_task_table_size_partition_threshold_high;
1183
1184 let table_write_throughput_statistic_manager =
1186 self.table_write_throughput_statistic_manager.read();
1187 let timestamp = chrono::Utc::now().timestamp();
1188
1189 for (table_id, compact_table_size) in table_size_info {
1190 let write_throughput = table_write_throughput_statistic_manager
1191 .get_table_throughput_descending(table_id, timestamp)
1192 .peekable()
1193 .peek()
1194 .map(|item| item.throughput)
1195 .unwrap_or(0);
1196
1197 if compact_table_size > compact_task_table_size_partition_threshold_high
1198 && default_partition_count > 0
1199 {
1200 compact_task
1201 .table_vnode_partition
1202 .insert(table_id, default_partition_count);
1203 } else if (compact_table_size > compact_task_table_size_partition_threshold_low
1204 || (write_throughput > self.env.opts.table_high_write_throughput_threshold
1205 && compact_table_size > compaction_config.target_file_size_base))
1206 && hybrid_vnode_count > 0
1207 {
1208 compact_task
1209 .table_vnode_partition
1210 .insert(table_id, hybrid_vnode_count);
1211 } else if compact_table_size > compaction_config.target_file_size_base {
1212 compact_task.table_vnode_partition.insert(table_id, 1);
1213 }
1214 }
1215
1216 compact_task
1217 .table_vnode_partition
1218 .retain(|table_id, _| compact_table_ids.contains(table_id));
1219 }
1220
1221 pub(crate) fn calculate_vnode_partition(
1222 &self,
1223 compact_task: &mut CompactTask,
1224 compaction_config: &CompactionConfig,
1225 compact_table_ids: &[TableId],
1226 ) {
1227 if compact_task.target_level > compact_task.base_level {
1232 return;
1233 }
1234
1235 self.apply_split_weight_by_vnode_partition(
1237 compact_task,
1238 compaction_config,
1239 compact_table_ids,
1240 );
1241 }
1242
1243 fn build_ready_compact_task(
1244 &self,
1245 picked_task: PickedCompactionTask,
1246 context: CompactTaskBuildContext,
1247 table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
1248 all_versioned_table_schemas: &HashMap<TableId, Vec<i32>>,
1249 ) -> BuiltCompactTask {
1250 let compaction_config = context.compaction_config.clone();
1251 let (mut compact_task, compact_table_ids) = build_base_compact_task(picked_task, context);
1252
1253 if compact_task.is_trivial_reclaim() {
1254 compact_task.task_status = TaskStatus::Success;
1255 compact_task.sorted_output_ssts.clear();
1256 return BuiltCompactTask::MetaFinished(compact_task);
1257 }
1258
1259 if compact_task.is_trivial_move_task() {
1260 compact_task.task_status = TaskStatus::Success;
1261 compact_task.sorted_output_ssts = compact_task.input_ssts[0]
1262 .read_sstable_infos()
1263 .cloned()
1264 .collect();
1265 return BuiltCompactTask::MetaFinished(compact_task);
1266 }
1267
1268 self.prepare_compact_task_for_assignment(
1269 &mut compact_task,
1270 compaction_config.as_ref(),
1271 &compact_table_ids,
1272 safe_epoch_table_watermarks_impl(table_watermarks, &compact_table_ids),
1273 all_versioned_table_schemas,
1274 );
1275
1276 BuiltCompactTask::PendingAssignment(compact_task)
1277 }
1278
1279 fn prepare_compact_task_for_assignment(
1280 &self,
1281 compact_task: &mut CompactTask,
1282 compaction_config: &CompactionConfig,
1283 compact_table_ids: &[TableId],
1284 table_watermarks: BTreeMap<TableId, TableWatermarks>,
1285 all_versioned_table_schemas: &HashMap<TableId, Vec<i32>>,
1286 ) {
1287 self.calculate_vnode_partition(compact_task, compaction_config, compact_table_ids);
1288 attach_compact_task_table_metadata(
1289 compact_task,
1290 compact_table_ids,
1291 table_watermarks,
1292 all_versioned_table_schemas,
1293 );
1294 }
1295
1296 pub fn compactor_manager_ref(&self) -> crate::hummock::CompactorManagerRef {
1297 self.compactor_manager.clone()
1298 }
1299
1300 fn register_compaction_task_report_waiter(
1301 &self,
1302 task_id: HummockCompactionTaskId,
1303 ) -> Receiver<CompactionTaskReportResult> {
1304 let (tx, rx) = tokio::sync::oneshot::channel();
1305 self.compaction_task_report_notifiers
1306 .lock()
1307 .register(task_id, tx);
1308 rx
1309 }
1310
1311 fn remove_compaction_task_report_waiter(&self, task_id: HummockCompactionTaskId) {
1312 self.compaction_task_report_notifiers.lock().remove(task_id);
1313 }
1314
1315 fn notify_compaction_task_report_waiters(&self, results: Vec<CompactionTaskReportResult>) {
1316 let mut guard = self.compaction_task_report_notifiers.lock();
1317 for result in results {
1318 guard.notify(result);
1319 }
1320 }
1321}
1322
1323#[cfg(any(test, feature = "test"))]
1324impl HummockManager {
1325 pub async fn compaction_task_from_assignment_for_test(
1326 &self,
1327 task_id: u64,
1328 ) -> Option<CompactTaskAssignment> {
1329 let compaction_guard = self.compaction.read().await;
1330 let assignment_ref = &compaction_guard.compact_task_assignment;
1331 assignment_ref.get(&task_id).cloned()
1332 }
1333
1334 pub async fn report_compact_task_for_test(
1335 &self,
1336 task_id: u64,
1337 compact_task: Option<CompactTask>,
1338 task_status: TaskStatus,
1339 sorted_output_ssts: Vec<SstableInfo>,
1340 table_stats_change: Option<PbTableStatsMap>,
1341 ) -> Result<()> {
1342 if let Some(task) = compact_task {
1343 let mut guard = self.compaction.write().await;
1344 guard.compact_task_assignment.insert(
1345 task_id,
1346 CompactTaskAssignment {
1347 compact_task: Some(task.into()),
1348 context_id: 0.into(),
1349 },
1350 );
1351 }
1352
1353 self.report_compact_tasks(vec![ReportTask {
1356 task_id,
1357 task_status,
1358 sorted_output_ssts,
1359 table_stats_change: table_stats_change.unwrap_or_default(),
1360 object_timestamps: HashMap::default(),
1361 }])
1362 .await?;
1363 Ok(())
1364 }
1365}
1366
1367#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1369pub enum ScheduleTrigger {
1370 NewData,
1372 Periodic,
1374}
1375
1376pub struct CompactionScheduleSnapshot {
1381 scheduled: HashSet<(CompactionGroupId, compact_task::TaskType)>,
1382 snapshot_time: Instant,
1383}
1384
1385impl CompactionScheduleSnapshot {
1386 const TASK_TYPE_PRIORITY: &[TaskType] = &[
1388 TaskType::Dynamic,
1389 TaskType::SpaceReclaim,
1390 TaskType::Ttl,
1391 TaskType::Tombstone,
1392 TaskType::VnodeWatermark,
1393 ];
1394
1395 pub fn snapshot_time(&self) -> Instant {
1396 self.snapshot_time
1397 }
1398
1399 pub fn pick_compaction_groups_and_type(&self) -> Option<(Vec<CompactionGroupId>, TaskType)> {
1404 let group_ids = self.group_ids_shuffled();
1405 let mut normal_groups = vec![];
1406 for cg_id in group_ids {
1407 if let Some(pick_type) = self.pick_type(cg_id) {
1408 if pick_type == TaskType::Dynamic {
1409 normal_groups.push(cg_id);
1410 } else if normal_groups.is_empty() {
1411 return Some((vec![cg_id], pick_type));
1412 }
1413 }
1414 }
1415 if normal_groups.is_empty() {
1416 None
1417 } else {
1418 Some((normal_groups, TaskType::Dynamic))
1419 }
1420 }
1421
1422 fn group_ids_shuffled(&self) -> Vec<CompactionGroupId> {
1423 let mut group_ids: Vec<_> = self.scheduled.iter().map(|(g, _)| *g).unique().collect();
1424 group_ids.shuffle(&mut thread_rng());
1425 group_ids
1426 }
1427
1428 fn pick_type(&self, group: CompactionGroupId) -> Option<TaskType> {
1429 Self::TASK_TYPE_PRIORITY
1430 .iter()
1431 .find(|t| self.scheduled.contains(&(group, **t)))
1432 .copied()
1433 }
1434}
1435
1436#[derive(Debug, Default)]
1441pub struct CompactionState {
1442 inner: Mutex<CompactionStateInner>,
1443}
1444
1445#[derive(Debug, Default)]
1446struct CompactionStateInner {
1447 scheduled: HashSet<(CompactionGroupId, compact_task::TaskType)>,
1448 dynamic_cooldown: HashSet<CompactionGroupId>,
1450 last_new_data_time: HashMap<CompactionGroupId, Instant>,
1452}
1453
1454impl CompactionState {
1455 pub fn new() -> Self {
1456 Self {
1457 inner: Default::default(),
1458 }
1459 }
1460
1461 pub fn try_sched_compaction(
1465 &self,
1466 compaction_group: CompactionGroupId,
1467 task_type: TaskType,
1468 trigger: ScheduleTrigger,
1469 ) -> bool {
1470 let mut guard = self.inner.lock();
1471 if task_type == TaskType::Dynamic {
1472 match trigger {
1473 ScheduleTrigger::NewData => {
1474 guard.dynamic_cooldown.remove(&compaction_group);
1475 guard
1476 .last_new_data_time
1477 .insert(compaction_group, Instant::now());
1478 }
1479 ScheduleTrigger::Periodic => {
1480 if guard.dynamic_cooldown.contains(&compaction_group) {
1481 return false;
1482 }
1483 }
1484 }
1485 }
1486 guard.scheduled.insert((compaction_group, task_type))
1487 }
1488
1489 pub fn unschedule(
1492 &self,
1493 compaction_group: CompactionGroupId,
1494 task_type: compact_task::TaskType,
1495 snapshot_time: Instant,
1496 ) {
1497 let mut guard = self.inner.lock();
1498 guard.scheduled.remove(&(compaction_group, task_type));
1499 if task_type == TaskType::Dynamic {
1500 let has_new_data = guard
1501 .last_new_data_time
1502 .get(&compaction_group)
1503 .is_some_and(|t| *t > snapshot_time);
1504 if !has_new_data {
1505 guard.dynamic_cooldown.insert(compaction_group);
1506 }
1507 }
1508 }
1509
1510 pub fn snapshot(&self) -> CompactionScheduleSnapshot {
1512 let guard = self.inner.lock();
1513 let snapshot_time = Instant::now();
1515 CompactionScheduleSnapshot {
1516 scheduled: guard.scheduled.clone(),
1517 snapshot_time,
1518 }
1519 }
1520
1521 pub fn remove_compaction_group(&self, compaction_group: CompactionGroupId) {
1523 let mut guard = self.inner.lock();
1524 guard
1525 .scheduled
1526 .retain(|(group, _)| *group != compaction_group);
1527 guard.dynamic_cooldown.remove(&compaction_group);
1528 guard.last_new_data_time.remove(&compaction_group);
1529 }
1530}
1531
1532impl Compaction {
1533 pub fn get_compact_task_assignments_by_group_id(
1534 &self,
1535 compaction_group_id: CompactionGroupId,
1536 ) -> Vec<CompactTaskAssignment> {
1537 self.compact_task_assignment
1538 .iter()
1539 .filter_map(|(_, assignment)| {
1540 if assignment
1541 .compact_task
1542 .as_ref()
1543 .is_some_and(|task| task.compaction_group_id == compaction_group_id)
1544 {
1545 Some(CompactTaskAssignment {
1546 compact_task: assignment.compact_task.clone(),
1547 context_id: assignment.context_id,
1548 })
1549 } else {
1550 None
1551 }
1552 })
1553 .collect()
1554 }
1555}
1556
1557#[derive(Clone, Default)]
1558pub struct CompactionGroupStatistic {
1559 pub group_id: CompactionGroupId,
1560 pub group_size: u64,
1561 pub table_statistic: BTreeMap<StateTableId, u64>,
1562 pub compaction_group_config: CompactionGroup,
1563}
1564
1565fn update_table_stats_for_vnode_watermark_trivial_reclaim(
1567 table_stats: &mut PbTableStatsMap,
1568 task: &CompactTask,
1569) {
1570 if task.task_type != TaskType::VnodeWatermark {
1571 return;
1572 }
1573 let mut deleted_table_keys: HashMap<TableId, u64> = HashMap::default();
1574 for s in task.input_ssts.iter().flat_map(|l| l.table_infos.iter()) {
1575 assert_eq!(s.table_ids.len(), 1);
1576 let e = deleted_table_keys.entry(s.table_ids[0]).or_insert(0);
1577 *e += s.total_key_count;
1578 }
1579 for (table_id, delete_count) in deleted_table_keys {
1580 let Some(stats) = table_stats.get_mut(&table_id) else {
1581 continue;
1582 };
1583 if stats.total_key_count == 0 {
1584 continue;
1585 }
1586 let new_total_key_count = stats.total_key_count.saturating_sub(delete_count as i64);
1587 let ratio = new_total_key_count as f64 / stats.total_key_count as f64;
1588 stats.total_key_count = new_total_key_count;
1590 stats.total_key_size = (stats.total_key_size as f64 * ratio).ceil() as i64;
1592 stats.total_value_size = (stats.total_value_size as f64 * ratio).ceil() as i64;
1593 }
1594}
1595
1596#[derive(Debug, Clone)]
1597pub enum GroupState {
1598 Normal,
1600
1601 Emergency(String), WriteStop(String), }
1607
1608impl GroupState {
1609 pub fn is_write_stop(&self) -> bool {
1610 matches!(self, Self::WriteStop(_))
1611 }
1612
1613 pub fn is_emergency(&self) -> bool {
1614 matches!(self, Self::Emergency(_))
1615 }
1616
1617 pub fn reason(&self) -> Option<&str> {
1618 match self {
1619 Self::Emergency(reason) | Self::WriteStop(reason) => Some(reason),
1620 _ => None,
1621 }
1622 }
1623}
1624
1625#[derive(Clone, Default)]
1626pub struct GroupStateValidator;
1627
1628impl GroupStateValidator {
1629 pub fn write_stop_sub_level_count(
1630 level_count: usize,
1631 compaction_config: &CompactionConfig,
1632 ) -> bool {
1633 let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize;
1634 level_count > threshold
1635 }
1636
1637 pub fn write_stop_l0_size(l0_size: u64, compaction_config: &CompactionConfig) -> bool {
1638 l0_size
1639 > compaction_config
1640 .level0_stop_write_threshold_max_size
1641 .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1642 }
1643
1644 pub fn write_stop_l0_file_count(
1645 l0_file_count: usize,
1646 compaction_config: &CompactionConfig,
1647 ) -> bool {
1648 l0_file_count
1649 > compaction_config
1650 .level0_stop_write_threshold_max_sst_count
1651 .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1652 as usize
1653 }
1654
1655 pub fn emergency_l0_file_count(
1656 l0_file_count: usize,
1657 compaction_config: &CompactionConfig,
1658 ) -> bool {
1659 l0_file_count
1660 > compaction_config
1661 .emergency_level0_sst_file_count
1662 .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1663 as usize
1664 }
1665
1666 pub fn emergency_l0_partition_count(
1667 last_l0_sub_level_partition_count: usize,
1668 compaction_config: &CompactionConfig,
1669 ) -> bool {
1670 last_l0_sub_level_partition_count
1671 > compaction_config
1672 .emergency_level0_sub_level_partition
1673 .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1674 as usize
1675 }
1676
1677 pub fn check_single_group_write_stop(
1678 levels: &Levels,
1679 compaction_config: &CompactionConfig,
1680 ) -> GroupState {
1681 if Self::write_stop_sub_level_count(levels.l0.sub_levels.len(), compaction_config) {
1682 return GroupState::WriteStop(format!(
1683 "WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels",
1684 levels.l0.sub_levels.len(),
1685 compaction_config.level0_stop_write_threshold_sub_level_number
1686 ));
1687 }
1688
1689 if Self::write_stop_l0_file_count(
1690 levels
1691 .l0
1692 .sub_levels
1693 .iter()
1694 .map(|l| l.table_infos.len())
1695 .sum(),
1696 compaction_config,
1697 ) {
1698 return GroupState::WriteStop(format!(
1699 "WriteStop(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1700 levels
1701 .l0
1702 .sub_levels
1703 .iter()
1704 .map(|l| l.table_infos.len())
1705 .sum::<usize>(),
1706 compaction_config
1707 .level0_stop_write_threshold_max_sst_count
1708 .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1709 ));
1710 }
1711
1712 if Self::write_stop_l0_size(levels.l0.total_file_size, compaction_config) {
1713 return GroupState::WriteStop(format!(
1714 "WriteStop(l0_size: {}, threshold: {}) too large L0 size",
1715 levels.l0.total_file_size,
1716 compaction_config
1717 .level0_stop_write_threshold_max_size
1718 .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1719 ));
1720 }
1721
1722 GroupState::Normal
1723 }
1724
1725 pub fn check_single_group_emergency(
1726 levels: &Levels,
1727 compaction_config: &CompactionConfig,
1728 ) -> GroupState {
1729 if Self::emergency_l0_file_count(
1730 levels
1731 .l0
1732 .sub_levels
1733 .iter()
1734 .map(|l| l.table_infos.len())
1735 .sum(),
1736 compaction_config,
1737 ) {
1738 return GroupState::Emergency(format!(
1739 "Emergency(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1740 levels
1741 .l0
1742 .sub_levels
1743 .iter()
1744 .map(|l| l.table_infos.len())
1745 .sum::<usize>(),
1746 compaction_config
1747 .emergency_level0_sst_file_count
1748 .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1749 ));
1750 }
1751
1752 if Self::emergency_l0_partition_count(
1753 levels
1754 .l0
1755 .sub_levels
1756 .first()
1757 .map(|l| l.table_infos.len())
1758 .unwrap_or(0),
1759 compaction_config,
1760 ) {
1761 return GroupState::Emergency(format!(
1762 "Emergency(l0_partition_count: {}, threshold: {}) too many L0 partitions",
1763 levels
1764 .l0
1765 .sub_levels
1766 .first()
1767 .map(|l| l.table_infos.len())
1768 .unwrap_or(0),
1769 compaction_config
1770 .emergency_level0_sub_level_partition
1771 .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1772 ));
1773 }
1774
1775 GroupState::Normal
1776 }
1777
1778 pub fn group_state(levels: &Levels, compaction_config: &CompactionConfig) -> GroupState {
1779 let state = Self::check_single_group_write_stop(levels, compaction_config);
1780 if state.is_write_stop() {
1781 return state;
1782 }
1783
1784 Self::check_single_group_emergency(levels, compaction_config)
1785 }
1786}
1787
1788#[cfg(test)]
1789mod prefetched_task_id_tests {
1790 use crate::manager::MetaOpts;
1791
1792 #[test]
1793 fn test_compaction_task_id_refill_capacity_default() {
1794 assert_eq!(MetaOpts::test(false).compaction_task_id_refill_capacity, 64);
1795 }
1796}
1797
1798#[cfg(test)]
1799mod compaction_state_tests {
1800 use risingwave_pb::hummock::compact_task::TaskType;
1801
1802 use super::*;
1803
1804 #[test]
1805 fn test_basic_schedule_and_unschedule() {
1806 let state = CompactionState::new();
1807 let group_id: CompactionGroupId = 1.into();
1808
1809 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1811 assert!(!state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1813 assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1815
1816 let snapshot = state.snapshot();
1818 assert!(snapshot.scheduled.contains(&(group_id, TaskType::Dynamic)));
1819 assert!(snapshot.scheduled.contains(&(group_id, TaskType::Ttl)));
1820
1821 state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1823 let snapshot2 = state.snapshot();
1824 assert!(!snapshot2.scheduled.contains(&(group_id, TaskType::Dynamic)));
1825 assert!(snapshot2.scheduled.contains(&(group_id, TaskType::Ttl)));
1826 }
1827
1828 #[test]
1829 fn test_cooldown_blocks_periodic_trigger() {
1830 let state = CompactionState::new();
1831 let group_id: CompactionGroupId = 1.into();
1832
1833 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1835 let snapshot = state.snapshot();
1836 state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1837
1838 assert!(state.inner.lock().dynamic_cooldown.contains(&group_id));
1840
1841 assert!(!state.try_sched_compaction(
1843 group_id,
1844 TaskType::Dynamic,
1845 ScheduleTrigger::Periodic
1846 ));
1847 }
1848
1849 #[test]
1850 fn test_new_data_clears_cooldown() {
1851 let state = CompactionState::new();
1852 let group_id: CompactionGroupId = 1.into();
1853
1854 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1856 let snapshot = state.snapshot();
1857 state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1858 assert!(state.inner.lock().dynamic_cooldown.contains(&group_id));
1859
1860 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1862 assert!(!state.inner.lock().dynamic_cooldown.contains(&group_id));
1863 }
1864
1865 #[test]
1866 fn test_cooldown_only_affects_dynamic_type() {
1867 let state = CompactionState::new();
1868 let group_id: CompactionGroupId = 1.into();
1869
1870 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1872 let snapshot = state.snapshot();
1873 state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1874
1875 let group_id_2: CompactionGroupId = 2.into();
1877 assert!(state.try_sched_compaction(group_id_2, TaskType::Ttl, ScheduleTrigger::Periodic));
1878 let snapshot2 = state.snapshot();
1879 state.unschedule(group_id_2, TaskType::Ttl, snapshot2.snapshot_time());
1880 assert!(!state.inner.lock().dynamic_cooldown.contains(&group_id_2));
1881
1882 assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1884 assert!(state.try_sched_compaction(
1885 group_id,
1886 TaskType::SpaceReclaim,
1887 ScheduleTrigger::Periodic
1888 ));
1889 }
1890
1891 #[test]
1892 fn test_race_condition_new_data_after_snapshot() {
1893 let state = CompactionState::new();
1894 let group_id: CompactionGroupId = 1.into();
1895
1896 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1897 let snapshot = state.snapshot();
1898
1899 {
1901 let mut guard = state.inner.lock();
1902 guard.last_new_data_time.insert(group_id, Instant::now());
1903 }
1904
1905 state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1907 assert!(
1908 !state.inner.lock().dynamic_cooldown.contains(&group_id),
1909 "Should skip cooldown when new data arrived after snapshot"
1910 );
1911 }
1912
1913 #[test]
1914 fn test_remove_compaction_group_cleans_all_state() {
1915 let state = CompactionState::new();
1916 let group_id: CompactionGroupId = 1.into();
1917
1918 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1920 assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1921 state.inner.lock().dynamic_cooldown.insert(group_id);
1922
1923 state.remove_compaction_group(group_id);
1925
1926 let guard = state.inner.lock();
1928 assert!(!guard.scheduled.contains(&(group_id, TaskType::Dynamic)));
1929 assert!(!guard.scheduled.contains(&(group_id, TaskType::Ttl)));
1930 assert!(!guard.dynamic_cooldown.contains(&group_id));
1931 assert!(!guard.last_new_data_time.contains_key(&group_id));
1932 }
1933
1934 #[test]
1935 fn test_snapshot_pick_type_priority() {
1936 let state = CompactionState::new();
1937 let group_id: CompactionGroupId = 1.into();
1938
1939 assert_eq!(state.snapshot().pick_type(group_id), None);
1941
1942 state.try_sched_compaction(
1944 group_id,
1945 TaskType::VnodeWatermark,
1946 ScheduleTrigger::Periodic,
1947 );
1948 assert_eq!(
1949 state.snapshot().pick_type(group_id),
1950 Some(TaskType::VnodeWatermark)
1951 );
1952
1953 state.try_sched_compaction(group_id, TaskType::Tombstone, ScheduleTrigger::Periodic);
1954 assert_eq!(
1955 state.snapshot().pick_type(group_id),
1956 Some(TaskType::Tombstone)
1957 );
1958
1959 state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic);
1960 assert_eq!(state.snapshot().pick_type(group_id), Some(TaskType::Ttl));
1961
1962 state.try_sched_compaction(group_id, TaskType::SpaceReclaim, ScheduleTrigger::Periodic);
1963 assert_eq!(
1964 state.snapshot().pick_type(group_id),
1965 Some(TaskType::SpaceReclaim)
1966 );
1967
1968 state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData);
1969 assert_eq!(
1970 state.snapshot().pick_type(group_id),
1971 Some(TaskType::Dynamic)
1972 );
1973 }
1974
1975 #[test]
1976 fn test_multiple_groups_independent_cooldown() {
1977 let state = CompactionState::new();
1978 let g1: CompactionGroupId = 1.into();
1979 let g2: CompactionGroupId = 2.into();
1980
1981 state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
1982 state.try_sched_compaction(g2, TaskType::Dynamic, ScheduleTrigger::NewData);
1983 let snapshot = state.snapshot();
1984
1985 state.unschedule(g1, TaskType::Dynamic, snapshot.snapshot_time());
1987
1988 let guard = state.inner.lock();
1989 assert!(guard.dynamic_cooldown.contains(&g1));
1990 assert!(!guard.dynamic_cooldown.contains(&g2));
1991 }
1992
1993 #[test]
1994 fn test_pick_compaction_groups_empty() {
1995 let state = CompactionState::new();
1996 let snapshot = state.snapshot();
1997 assert!(snapshot.pick_compaction_groups_and_type().is_none());
1999 }
2000
2001 #[test]
2002 fn test_pick_compaction_groups_mixed_types() {
2003 let state = CompactionState::new();
2004 let g1: CompactionGroupId = 1.into();
2005 let g2: CompactionGroupId = 2.into();
2006 let g3: CompactionGroupId = 3.into();
2007
2008 state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
2010 state.try_sched_compaction(g2, TaskType::Ttl, ScheduleTrigger::Periodic);
2011 state.try_sched_compaction(g3, TaskType::Dynamic, ScheduleTrigger::NewData);
2012
2013 let snapshot = state.snapshot();
2014 let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
2015
2016 if task_type == TaskType::Dynamic {
2021 assert!(groups.contains(&g1));
2022 assert!(groups.contains(&g3));
2023 assert!(!groups.contains(&g2)); } else {
2025 assert_eq!(task_type, TaskType::Ttl);
2026 assert_eq!(groups, vec![g2]);
2027 }
2028 }
2029
2030 #[test]
2031 fn test_pick_compaction_groups_all_dynamic() {
2032 let state = CompactionState::new();
2033 let g1: CompactionGroupId = 1.into();
2034 let g2: CompactionGroupId = 2.into();
2035
2036 state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
2037 state.try_sched_compaction(g2, TaskType::Dynamic, ScheduleTrigger::NewData);
2038
2039 let snapshot = state.snapshot();
2040 let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
2041 assert_eq!(task_type, TaskType::Dynamic);
2042 assert!(groups.contains(&g1));
2043 assert!(groups.contains(&g2));
2044 }
2045
2046 #[test]
2047 fn test_pick_compaction_groups_single_non_dynamic() {
2048 let state = CompactionState::new();
2049 let g1: CompactionGroupId = 1.into();
2050
2051 state.try_sched_compaction(g1, TaskType::SpaceReclaim, ScheduleTrigger::Periodic);
2052
2053 let snapshot = state.snapshot();
2054 let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
2055 assert_eq!(task_type, TaskType::SpaceReclaim);
2056 assert_eq!(groups, vec![g1]);
2057 }
2058}