1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17use std::time::Instant;
18
19use anyhow::Context;
20use compaction_event_loop::{
21 HummockCompactionEventDispatcher, HummockCompactionEventHandler, HummockCompactionEventLoop,
22 HummockCompactorDedicatedEventLoop,
23};
24use fail::fail_point;
25use itertools::Itertools;
26use parking_lot::Mutex;
27use rand::rng as thread_rng;
28use rand::seq::SliceRandom;
29use risingwave_common::catalog::TableId;
30use risingwave_common::config::meta::default::compaction_config;
31use risingwave_common::util::epoch::Epoch;
32use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask};
33use risingwave_hummock_sdk::compaction_group::StateTableId;
34use risingwave_hummock_sdk::compaction_group::hummock_version_ext::safe_epoch_table_watermarks_impl;
35use risingwave_hummock_sdk::level::Levels;
36use risingwave_hummock_sdk::sstable_info::SstableInfo;
37use risingwave_hummock_sdk::table_stats::{
38 PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats,
39};
40use risingwave_hummock_sdk::table_watermark::TableWatermarks;
41use risingwave_hummock_sdk::version::{GroupDelta, IntraLevelDelta};
42use risingwave_hummock_sdk::{
43 CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockSstableId,
44 HummockSstableObjectId, HummockVersionId, compact_task_to_string, statistics_compact_task,
45};
46use risingwave_meta_model::hummock_sequence::COMPACTION_TASK_ID;
47use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
48use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
49use risingwave_pb::hummock::{
50 CompactTaskAssignment, CompactionConfig, PbCompactStatus, PbCompactTaskAssignment,
51 SubscribeCompactionEventRequest, TableOption, compact_task,
52};
53use thiserror_ext::AsReport;
54use tokio::sync::RwLockWriteGuard;
55use tokio::sync::mpsc::UnboundedReceiver;
56use tokio::sync::oneshot::{Receiver, Sender};
57use tokio::task::JoinHandle;
58use tonic::Streaming;
59use tracing::warn;
60
61use crate::hummock::compaction::selector::level_selector::PickerInfo;
62use crate::hummock::compaction::selector::{
63 DynamicLevelSelector, DynamicLevelSelectorCore, LocalSelectorStatistic, ManualCompactionOption,
64 ManualCompactionSelector, SpaceReclaimCompactionSelector, TombstoneCompactionSelector,
65 TtlCompactionSelector, VnodeWatermarkCompactionSelector,
66};
67use crate::hummock::compaction::{
68 CompactStatus, CompactionDeveloperConfig, CompactionSelector,
69 CompactionTask as PickedCompactionTask,
70};
71use crate::hummock::error::{Error, Result};
72use crate::hummock::manager::CompactionTaskReportResult;
73use crate::hummock::manager::compaction::compact_task_builder::{
74 CompactTaskBuildContext, attach_compact_task_table_metadata, build_base_compact_task,
75};
76use crate::hummock::manager::transaction::{
77 HummockVersionStatsTransaction, HummockVersionTransaction,
78};
79use crate::hummock::manager::versioning::Versioning;
80use crate::hummock::metrics_utils::{
81 build_compact_task_level_type_metrics_label, trigger_compact_tasks_stat,
82 trigger_local_table_stat,
83};
84use crate::hummock::model::CompactionGroup;
85use crate::hummock::{HummockManager, commit_multi_var, start_measure_real_process_timer};
86use crate::manager::META_NODE_ID;
87use crate::model::BTreeMapTransaction;
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90pub enum ManualCompactionTriggerResult {
91 Submitted,
92 Retry,
93}
94
95mod compact_task_builder;
96pub mod compaction_event_loop;
97pub mod compaction_group_manager;
98pub mod compaction_group_schedule;
99
100static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
101 [
102 TaskStatus::ManualCanceled,
103 TaskStatus::SendFailCanceled,
104 TaskStatus::AssignFailCanceled,
105 TaskStatus::HeartbeatCanceled,
106 TaskStatus::InvalidGroupCanceled,
107 TaskStatus::NoAvailMemoryResourceCanceled,
108 TaskStatus::NoAvailCpuResourceCanceled,
109 TaskStatus::HeartbeatProgressCanceled,
110 ]
111 .into_iter()
112 .collect()
113});
114
115fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> {
116 let mut compaction_selectors: HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> =
117 HashMap::default();
118 compaction_selectors.insert(
119 compact_task::TaskType::Dynamic,
120 Box::<DynamicLevelSelector>::default(),
121 );
122 compaction_selectors.insert(
123 compact_task::TaskType::SpaceReclaim,
124 Box::<SpaceReclaimCompactionSelector>::default(),
125 );
126 compaction_selectors.insert(
127 compact_task::TaskType::Ttl,
128 Box::<TtlCompactionSelector>::default(),
129 );
130 compaction_selectors.insert(
131 compact_task::TaskType::Tombstone,
132 Box::<TombstoneCompactionSelector>::default(),
133 );
134 compaction_selectors.insert(
135 compact_task::TaskType::VnodeWatermark,
136 Box::<VnodeWatermarkCompactionSelector>::default(),
137 );
138 compaction_selectors
139}
140
141enum BuiltCompactTask {
142 MetaFinished(CompactTask),
143 PendingAssignment(CompactTask),
144}
145
146impl HummockVersionTransaction<'_> {
147 fn apply_compact_task(&mut self, compact_task: &CompactTask) {
148 let mut version_delta = self.new_delta();
149 let trivial_move = compact_task.is_trivial_move_task();
150 version_delta.trivial_move = trivial_move;
151
152 let group_deltas = &mut version_delta
153 .group_deltas
154 .entry(compact_task.compaction_group_id)
155 .or_default()
156 .group_deltas;
157 let mut removed_table_ids_map: BTreeMap<u32, HashSet<HummockSstableId>> =
158 BTreeMap::default();
159
160 for level in &compact_task.input_ssts {
161 let level_idx = level.level_idx;
162
163 removed_table_ids_map
164 .entry(level_idx)
165 .or_default()
166 .extend(level.table_infos.iter().map(|sst| sst.sst_id));
167 }
168
169 for (level_idx, removed_table_ids) in removed_table_ids_map {
170 let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
171 level_idx,
172 0, removed_table_ids,
174 vec![], 0, compact_task.compaction_group_version_id,
177 ));
178
179 group_deltas.push(group_delta);
180 }
181
182 let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
183 compact_task.target_level,
184 compact_task.target_sub_level_id,
185 HashSet::new(), compact_task.sorted_output_ssts.clone(),
187 compact_task.split_weight_by_vnode,
188 compact_task.compaction_group_version_id,
189 ));
190
191 group_deltas.push(group_delta);
192 version_delta.pre_apply();
193 }
194}
195
196#[derive(Default)]
197pub struct Compaction {
198 pub compact_task_assignment: BTreeMap<HummockCompactionTaskId, PbCompactTaskAssignment>,
200 pub compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus>,
202
203 pub _deterministic_mode: bool,
204}
205
206impl HummockManager {
207 pub async fn get_assigned_compact_task_num(&self) -> u64 {
208 self.compaction.read().await.compact_task_assignment.len() as u64
209 }
210
211 pub async fn list_compaction_status(
212 &self,
213 ) -> (Vec<PbCompactStatus>, Vec<CompactTaskAssignment>) {
214 let compaction = self.compaction.read().await;
215 (
216 compaction.compaction_statuses.values().map_into().collect(),
217 compaction
218 .compact_task_assignment
219 .values()
220 .cloned()
221 .collect(),
222 )
223 }
224
225 pub async fn get_compaction_scores(
226 &self,
227 compaction_group_id: CompactionGroupId,
228 ) -> Vec<PickerInfo> {
229 let (status, levels, group) = {
230 let compaction = self.compaction.read().await;
231 let versioning = self.versioning.read().await;
232 let config_manager = self.compaction_group_manager.read().await;
233 match (
234 compaction.compaction_statuses.get(&compaction_group_id),
235 versioning.current_version.levels.get(&compaction_group_id),
236 config_manager.try_get_compaction_group_config(compaction_group_id),
237 ) {
238 (Some(cs), Some(v), Some(cf)) => (cs.to_owned(), v.to_owned(), cf),
239 _ => {
240 return vec![];
241 }
242 }
243 };
244 let dynamic_level_core = DynamicLevelSelectorCore::new(
245 group.compaction_config,
246 Arc::new(CompactionDeveloperConfig::default()),
247 );
248 let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers);
249 ctx.score_levels
250 }
251}
252
253impl HummockManager {
254 pub fn compaction_event_loop(
255 hummock_manager: Arc<Self>,
256 compactor_streams_change_rx: UnboundedReceiver<(
257 HummockContextId,
258 Streaming<SubscribeCompactionEventRequest>,
259 )>,
260 ) -> Vec<(JoinHandle<()>, Sender<()>)> {
261 let mut join_handle_vec = Vec::default();
262
263 let hummock_compaction_event_handler =
264 HummockCompactionEventHandler::new(hummock_manager.clone());
265
266 let dedicated_event_loop = HummockCompactorDedicatedEventLoop::new(
267 hummock_manager.clone(),
268 hummock_compaction_event_handler.clone(),
269 );
270
271 let (dedicated_event_loop_join_handle, event_tx, shutdown_tx) = dedicated_event_loop.run();
272 join_handle_vec.push((dedicated_event_loop_join_handle, shutdown_tx));
273
274 let hummock_compaction_event_dispatcher = HummockCompactionEventDispatcher::new(
275 hummock_manager.env.opts.clone(),
276 hummock_compaction_event_handler,
277 Some(event_tx),
278 );
279
280 let event_loop = HummockCompactionEventLoop::new(
281 hummock_compaction_event_dispatcher,
282 hummock_manager.metrics.clone(),
283 compactor_streams_change_rx,
284 );
285
286 let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
287 join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
288
289 join_handle_vec
290 }
291
292 pub fn add_compactor_stream(
293 &self,
294 context_id: HummockContextId,
295 req_stream: Streaming<SubscribeCompactionEventRequest>,
296 ) {
297 self.compactor_streams_change_tx
298 .send((context_id, req_stream))
299 .unwrap();
300 }
301}
302
303impl HummockManager {
304 async fn next_compaction_task_id_with_prefetch(&self, refill_capacity: u32) -> Result<u64> {
307 self.prefetched_compaction_task_ids
308 .next(refill_capacity, |count| async move {
309 self.env
310 .hummock_seq
311 .next_interval(COMPACTION_TASK_ID, count)
312 .await
313 })
314 .await
315 }
316
317 pub async fn get_compact_tasks_impl(
318 &self,
319 compaction_groups: Vec<CompactionGroupId>,
320 max_select_count: usize,
321 selector: &mut dyn CompactionSelector,
322 ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
323 let deterministic_mode = self.env.opts.compaction_deterministic_test;
324
325 let mut compaction_guard = self.compaction.write().await;
326 let compaction: &mut Compaction = &mut compaction_guard;
327 let mut versioning_guard = self.versioning.write().await;
328 let versioning: &mut Versioning = &mut versioning_guard;
329
330 let _timer = start_measure_real_process_timer!(self, "get_compact_tasks_impl");
331
332 let start_time = Instant::now();
333 let mut compaction_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
334
335 let mut compact_task_assignment =
336 BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
337
338 let mut version = HummockVersionTransaction::new(
339 &mut versioning.current_version,
340 &mut versioning.hummock_version_deltas,
341 &mut versioning.table_change_log,
342 self.env.notification_manager(),
343 None,
344 &self.metrics,
345 &self.env.opts,
346 );
347 let mut version_stats = HummockVersionStatsTransaction::new(
349 &mut versioning.version_stats,
350 self.env.notification_manager(),
351 );
352
353 if deterministic_mode {
354 version.disable_apply_to_txn();
355 }
356 let all_versioned_table_schemas = if self.env.opts.enable_dropped_column_reclaim {
357 self.metadata_manager
358 .catalog_controller
359 .get_versioned_table_schemas()
360 .await
361 .map_err(|e| Error::Internal(e.into()))?
362 } else {
363 HashMap::default()
364 };
365 let mut unschedule_groups = vec![];
366 let mut trivial_tasks = vec![];
367 let mut pick_tasks = vec![];
368 let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts(
369 &self.env.opts,
370 ));
371 'outside: for compaction_group_id in compaction_groups {
376 if pick_tasks.len() >= max_select_count {
377 break;
378 }
379
380 if !version
381 .latest_version()
382 .levels
383 .contains_key(&compaction_group_id)
384 {
385 continue;
386 }
387
388 let group_config = {
392 let config_manager = self.compaction_group_manager.read().await;
393
394 match config_manager.try_get_compaction_group_config(compaction_group_id) {
395 Some(config) => config,
396 None => continue,
397 }
398 };
399
400 let task_id = self
402 .next_compaction_task_id_with_prefetch(
403 self.env.opts.compaction_task_id_refill_capacity,
404 )
405 .await?;
406
407 if !compaction_statuses.contains_key(&compaction_group_id) {
408 compaction_statuses.insert(
410 compaction_group_id,
411 CompactStatus::new(
412 compaction_group_id,
413 group_config.compaction_config.max_level,
414 ),
415 );
416 }
417 let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap();
418
419 let mut stats = LocalSelectorStatistic::default();
420 let member_table_ids: Vec<_> = version
421 .latest_version()
422 .state_table_info
423 .compaction_group_member_table_ids(compaction_group_id)
424 .iter()
425 .copied()
426 .collect();
427
428 let mut table_id_to_option: HashMap<TableId, _> = HashMap::default();
429
430 {
431 let guard = self.table_id_to_table_option.read();
432 for table_id in &member_table_ids {
433 if let Some(opts) = guard.get(table_id) {
434 table_id_to_option.insert(*table_id, *opts);
435 }
436 }
437 }
438
439 while let Some(picked_task) = compact_status.get_compact_task(
440 version
441 .latest_version()
442 .get_compaction_group_levels(compaction_group_id),
443 version
444 .latest_version()
445 .state_table_info
446 .compaction_group_member_table_ids(compaction_group_id),
447 task_id as HummockCompactionTaskId,
448 &group_config,
449 &mut stats,
450 selector,
451 &table_id_to_option,
452 developer_config.clone(),
453 &version.latest_version().table_watermarks,
454 &version.latest_version().state_table_info,
455 ) {
456 let compaction_group_levels = version
457 .latest_version()
458 .get_compaction_group_levels(compaction_group_id);
459 let target_level_id = picked_task.input.target_level as u32;
460 let is_target_level_last = compaction_group_levels.is_last_level(target_level_id);
461 let table_options = table_id_to_option
462 .iter()
463 .map(|(table_id, table_option)| (*table_id, TableOption::from(table_option)))
464 .collect();
465 let built_compact_task = self.build_ready_compact_task(
466 picked_task,
467 CompactTaskBuildContext {
468 task_id,
469 compaction_group_id: group_config.group_id,
470 compaction_group_version_id: compaction_group_levels
471 .compaction_group_version_id,
472 existing_table_ids: member_table_ids.clone(),
473 table_options,
474 is_target_level_last,
475 compaction_config: group_config.compaction_config.clone(),
476 current_epoch_time: Epoch::now().0,
477 },
478 &version.latest_version().table_watermarks,
479 &all_versioned_table_schemas,
480 );
481
482 match built_compact_task {
483 BuiltCompactTask::MetaFinished(compact_task) => {
484 let label = compact_task.task_label();
485 tracing::debug!(
486 "{} for compaction group {}: input: {:?}, cost time: {:?}",
487 label,
488 compact_task.compaction_group_id,
489 compact_task.input_ssts,
490 start_time.elapsed()
491 );
492 compact_status.report_compact_task(&compact_task);
493 update_table_stats_for_vnode_watermark_trivial_reclaim(
494 &mut version_stats.table_stats,
495 &compact_task,
496 );
497 self.metrics
498 .compact_frequency
499 .with_label_values(&[
500 label,
501 &compact_task.compaction_group_id.to_string(),
502 selector.task_type().as_str_name(),
503 "SUCCESS",
504 ])
505 .inc();
506
507 version.apply_compact_task(&compact_task);
508 trivial_tasks.push(compact_task);
509 if trivial_tasks.len() >= self.env.opts.max_trivial_move_task_count_per_loop
510 {
511 break 'outside;
512 }
513 }
514 BuiltCompactTask::PendingAssignment(compact_task) => {
515 compact_task_assignment.insert(
516 compact_task.task_id,
517 CompactTaskAssignment {
518 compact_task: Some(compact_task.clone().into()),
519 context_id: META_NODE_ID, },
521 );
522
523 pick_tasks.push(compact_task);
524 break;
525 }
526 }
527
528 stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
529 stats = LocalSelectorStatistic::default();
530 }
531 if pick_tasks
532 .last()
533 .map(|task| task.compaction_group_id != compaction_group_id)
534 .unwrap_or(true)
535 {
536 unschedule_groups.push(compaction_group_id);
537 }
538 stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
539 }
540
541 if !trivial_tasks.is_empty() {
542 commit_multi_var!(
543 self.meta_store_ref(),
544 compaction_statuses,
545 compact_task_assignment,
546 version,
547 version_stats
548 )?;
549 self.metrics
550 .compact_task_batch_count
551 .with_label_values(&["batch_trivial_move"])
552 .observe(trivial_tasks.len() as f64);
553
554 for trivial_task in &trivial_tasks {
555 self.metrics
556 .compact_task_trivial_move_sst_count
557 .with_label_values(&[&trivial_task.compaction_group_id.to_string()])
558 .observe(trivial_task.input_ssts[0].table_infos.len() as _);
559 }
560
561 drop(versioning_guard);
562 } else {
563 drop(versioning_guard);
566 commit_multi_var!(
567 self.meta_store_ref(),
568 compaction_statuses,
569 compact_task_assignment
570 )?;
571 }
572 drop(compaction_guard);
573 if !pick_tasks.is_empty() {
574 self.metrics
575 .compact_task_batch_count
576 .with_label_values(&["batch_get_compact_task"])
577 .observe(pick_tasks.len() as f64);
578 }
579
580 for compact_task in &mut pick_tasks {
581 let compaction_group_id = compact_task.compaction_group_id;
582
583 self.compactor_manager
585 .initiate_task_heartbeat(compact_task.clone());
586
587 compact_task.task_status = TaskStatus::Pending;
589 let compact_task_statistics = statistics_compact_task(compact_task);
590
591 let level_type_label = build_compact_task_level_type_metrics_label(
592 compact_task.input_ssts[0].level_idx as usize,
593 compact_task.input_ssts.last().unwrap().level_idx as usize,
594 );
595
596 let level_count = compact_task.input_ssts.len();
597 if compact_task.input_ssts[0].level_idx == 0 {
598 self.metrics
599 .l0_compact_level_count
600 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
601 .observe(level_count as _);
602 }
603
604 self.metrics
605 .compact_task_size
606 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
607 .observe(compact_task_statistics.total_file_size as _);
608
609 self.metrics
610 .compact_task_size
611 .with_label_values(&[
612 &compaction_group_id.to_string(),
613 &format!("{} uncompressed", level_type_label),
614 ])
615 .observe(compact_task_statistics.total_uncompressed_file_size as _);
616
617 self.metrics
618 .compact_task_file_count
619 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
620 .observe(compact_task_statistics.total_file_count as _);
621
622 tracing::trace!(
623 "For compaction group {}: pick up {} {} sub_level in level {} to compact to target {}. cost time: {:?} compact_task_statistics {:?}",
624 compaction_group_id,
625 level_count,
626 compact_task.input_ssts[0].level_type.as_str_name(),
627 compact_task.input_ssts[0].level_idx,
628 compact_task.target_level,
629 start_time.elapsed(),
630 compact_task_statistics
631 );
632 }
633
634 #[cfg(test)]
635 {
636 self.check_state_consistency().await;
637 }
638 pick_tasks.extend(trivial_tasks);
639 Ok((pick_tasks, unschedule_groups))
640 }
641
642 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
644 fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore(
645 anyhow::anyhow!("failpoint metastore err")
646 )));
647 let ret = self
648 .cancel_compact_task_impl(vec![task_id], task_status)
649 .await?;
650 Ok(ret[0])
651 }
652
653 pub async fn cancel_compact_tasks(
654 &self,
655 tasks: Vec<u64>,
656 task_status: TaskStatus,
657 ) -> Result<Vec<bool>> {
658 self.cancel_compact_task_impl(tasks, task_status).await
659 }
660
661 async fn cancel_compact_task_impl(
662 &self,
663 task_ids: Vec<u64>,
664 task_status: TaskStatus,
665 ) -> Result<Vec<bool>> {
666 assert!(CANCEL_STATUS_SET.contains(&task_status));
667 let tasks = task_ids
668 .into_iter()
669 .map(|task_id| ReportTask {
670 task_id,
671 task_status,
672 sorted_output_ssts: vec![],
673 table_stats_change: HashMap::default(),
674 object_timestamps: HashMap::default(),
675 })
676 .collect_vec();
677 let rets = self.report_compact_tasks(tasks).await?;
678 #[cfg(test)]
679 {
680 self.check_state_consistency().await;
681 }
682 Ok(rets)
683 }
684
685 async fn get_compact_tasks(
686 &self,
687 mut compaction_groups: Vec<CompactionGroupId>,
688 max_select_count: usize,
689 selector: &mut dyn CompactionSelector,
690 ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
691 fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
692 anyhow::anyhow!("failpoint metastore error")
693 )));
694 compaction_groups.shuffle(&mut thread_rng());
695 let (mut tasks, groups) = self
696 .get_compact_tasks_impl(compaction_groups, max_select_count, selector)
697 .await?;
698 tasks.retain(|task| {
699 if task.task_status == TaskStatus::Success {
700 debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
701 false
702 } else {
703 true
704 }
705 });
706 Ok((tasks, groups))
707 }
708
709 pub async fn get_compact_task(
710 &self,
711 compaction_group_id: CompactionGroupId,
712 selector: &mut dyn CompactionSelector,
713 ) -> Result<Option<CompactTask>> {
714 fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
715 anyhow::anyhow!("failpoint metastore error")
716 )));
717
718 let (normal_tasks, _) = self
719 .get_compact_tasks_impl(vec![compaction_group_id], 1, selector)
720 .await?;
721 for task in normal_tasks {
722 if task.task_status != TaskStatus::Success {
723 return Ok(Some(task));
724 }
725 debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
726 }
727 Ok(None)
728 }
729
730 pub async fn manual_get_compact_task(
731 &self,
732 compaction_group_id: CompactionGroupId,
733 manual_compaction_option: ManualCompactionOption,
734 ) -> Result<Option<CompactTask>> {
735 let (task, _) = self
736 .manual_get_compact_task_with_info(compaction_group_id, manual_compaction_option)
737 .await?;
738 Ok(task)
739 }
740
741 pub async fn manual_get_compact_task_with_info(
742 &self,
743 compaction_group_id: CompactionGroupId,
744 manual_compaction_option: ManualCompactionOption,
745 ) -> Result<(Option<CompactTask>, bool)> {
746 let mut selector = ManualCompactionSelector::new(manual_compaction_option);
747 let task = self
748 .get_compact_task(compaction_group_id, &mut selector)
749 .await?;
750 if let Some(err) = selector.validation_error() {
751 return Err(Error::InvalidManualCompactionOption(err.to_owned()));
752 }
753 Ok((task, selector.blocked_by_pending()))
754 }
755
756 pub async fn report_compact_task(
757 &self,
758 task_id: u64,
759 task_status: TaskStatus,
760 sorted_output_ssts: Vec<SstableInfo>,
761 table_stats_change: Option<PbTableStatsMap>,
762 object_timestamps: HashMap<HummockSstableObjectId, u64>,
763 ) -> Result<bool> {
764 let rets = self
765 .report_compact_tasks(vec![ReportTask {
766 task_id,
767 task_status,
768 sorted_output_ssts,
769 table_stats_change: table_stats_change.unwrap_or_default(),
770 object_timestamps,
771 }])
772 .await?;
773 Ok(rets[0])
774 }
775
776 pub async fn report_compact_tasks(&self, report_tasks: Vec<ReportTask>) -> Result<Vec<bool>> {
777 let compaction_guard = self.compaction.write().await;
778 let versioning_guard = self.versioning.write().await;
779
780 self.report_compact_tasks_impl(report_tasks, compaction_guard, versioning_guard)
781 .await
782 }
783
784 pub async fn report_compact_tasks_impl(
792 &self,
793 report_tasks: Vec<ReportTask>,
794 mut compaction_guard: RwLockWriteGuard<'_, Compaction>,
795 mut versioning_guard: RwLockWriteGuard<'_, Versioning>,
796 ) -> Result<Vec<bool>> {
797 let deterministic_mode = self.env.opts.compaction_deterministic_test;
798 let compaction: &mut Compaction = &mut compaction_guard;
799 let start_time = Instant::now();
800 let original_keys = compaction.compaction_statuses.keys().cloned().collect_vec();
801 let mut compact_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
802 let mut rets = vec![false; report_tasks.len()];
803 let mut compact_task_assignment =
804 BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
805 let versioning: &mut Versioning = &mut versioning_guard;
807 let _timer = start_measure_real_process_timer!(self, "report_compact_tasks");
808
809 for group_id in original_keys {
811 if !versioning.current_version.levels.contains_key(&group_id) {
812 compact_statuses.remove(group_id);
813 }
814 }
815 let mut tasks = vec![];
816
817 let mut version = HummockVersionTransaction::new(
818 &mut versioning.current_version,
819 &mut versioning.hummock_version_deltas,
820 &mut versioning.table_change_log,
821 self.env.notification_manager(),
822 None,
823 &self.metrics,
824 &self.env.opts,
825 );
826
827 if deterministic_mode {
828 version.disable_apply_to_txn();
829 }
830
831 let mut version_stats = HummockVersionStatsTransaction::new(
832 &mut versioning.version_stats,
833 self.env.notification_manager(),
834 );
835 let mut success_count = 0;
836 let mut report_results = Vec::with_capacity(rets.len());
837 for (idx, task) in report_tasks.into_iter().enumerate() {
838 rets[idx] = true;
839 let task_id = task.task_id;
840 let mut task_status = task.task_status;
841 let mut compact_task = match compact_task_assignment.remove(task.task_id) {
842 Some(compact_task) => CompactTask::from(compact_task.compact_task.unwrap()),
843 None => {
844 tracing::warn!("{}", format!("compact task {} not found", task.task_id));
845 rets[idx] = false;
846 report_results.push(CompactionTaskReportResult {
847 task_id,
848 task_status,
849 reported: false,
850 });
851 continue;
852 }
853 };
854
855 {
856 compact_task.task_status = task.task_status;
858 compact_task.sorted_output_ssts = task.sorted_output_ssts;
859 }
860
861 match compact_statuses.get_mut(compact_task.compaction_group_id) {
862 Some(mut compact_status) => {
863 compact_status.report_compact_task(&compact_task);
864 }
865 None => {
866 compact_task.task_status = TaskStatus::InvalidGroupCanceled;
872 }
873 }
874
875 let is_success = if let TaskStatus::Success = compact_task.task_status {
876 match self
877 .report_compaction_sanity_check(&task.object_timestamps)
878 .await
879 {
880 Err(e) => {
881 warn!(
882 "failed to commit compaction task {} {}",
883 compact_task.task_id,
884 e.as_report()
885 );
886 compact_task.task_status = TaskStatus::RetentionTimeRejected;
887 false
888 }
889 _ => {
890 let group = version
891 .latest_version()
892 .levels
893 .get(&compact_task.compaction_group_id)
894 .unwrap();
895 let is_expired = compact_task.is_expired(group.compaction_group_version_id);
896 if is_expired {
897 compact_task.task_status = TaskStatus::InputOutdatedCanceled;
898 warn!(
899 "The task may be expired because of group split, task:\n {:?}",
900 compact_task_to_string(&compact_task)
901 );
902 }
903 !is_expired
904 }
905 }
906 } else {
907 false
908 };
909 if is_success {
910 success_count += 1;
911 version.apply_compact_task(&compact_task);
912 if purge_prost_table_stats(
913 &mut version_stats.table_stats,
914 version.latest_version(),
915 &HashSet::default(),
916 ) {
917 self.metrics.version_stats.reset();
918 versioning.local_metrics.clear();
919 }
920 add_prost_table_stats_map(&mut version_stats.table_stats, &task.table_stats_change);
921 trigger_local_table_stat(
922 &self.metrics,
923 &mut versioning.local_metrics,
924 &version_stats,
925 &task.table_stats_change,
926 );
927 }
928 task_status = compact_task.task_status;
929 report_results.push(CompactionTaskReportResult {
930 task_id,
931 task_status,
932 reported: rets[idx],
933 });
934 tasks.push(compact_task);
935 }
936 if success_count > 0 {
937 commit_multi_var!(
938 self.meta_store_ref(),
939 compact_statuses,
940 compact_task_assignment,
941 version,
942 version_stats
943 )?;
944
945 self.metrics
946 .compact_task_batch_count
947 .with_label_values(&["batch_report_task"])
948 .observe(success_count as f64);
949 } else {
950 commit_multi_var!(
952 self.meta_store_ref(),
953 compact_statuses,
954 compact_task_assignment
955 )?;
956 }
957
958 self.notify_compaction_task_report_waiters(report_results);
959
960 let mut success_groups = vec![];
961 for compact_task in &tasks {
962 self.compactor_manager
963 .remove_task_heartbeat(compact_task.task_id);
964 tracing::trace!(
965 "Reported compaction task. {}. cost time: {:?}",
966 compact_task_to_string(compact_task),
967 start_time.elapsed(),
968 );
969
970 if !deterministic_mode
971 && (matches!(compact_task.task_type, compact_task::TaskType::Dynamic)
972 || matches!(compact_task.task_type, compact_task::TaskType::Emergency))
973 {
974 self.try_send_compaction_request(
976 compact_task.compaction_group_id,
977 compact_task::TaskType::Dynamic,
978 );
979 }
980
981 if compact_task.task_status == TaskStatus::Success {
982 success_groups.push(compact_task.compaction_group_id);
983 }
984 }
985
986 trigger_compact_tasks_stat(
987 &self.metrics,
988 &tasks,
989 &compaction.compaction_statuses,
990 &versioning_guard.current_version,
991 );
992 drop(versioning_guard);
993 if !success_groups.is_empty() {
994 self.try_update_write_limits(&success_groups).await;
995 }
996 Ok(rets)
997 }
998
999 pub async fn trigger_compaction_deterministic(
1002 &self,
1003 _base_version_id: HummockVersionId,
1004 compaction_groups: Vec<CompactionGroupId>,
1005 ) -> Result<()> {
1006 self.on_current_version(|old_version| {
1007 tracing::info!(
1008 "Trigger compaction for version {}, groups {:?}",
1009 old_version.id,
1010 compaction_groups
1011 );
1012 })
1013 .await;
1014
1015 if compaction_groups.is_empty() {
1016 return Ok(());
1017 }
1018 for compaction_group in compaction_groups {
1019 self.try_send_compaction_request(compaction_group, compact_task::TaskType::Dynamic);
1020 }
1021 Ok(())
1022 }
1023
1024 pub async fn trigger_manual_compaction(
1025 &self,
1026 compaction_group: CompactionGroupId,
1027 manual_compaction_option: ManualCompactionOption,
1028 ) -> Result<ManualCompactionTriggerResult> {
1029 let start_time = Instant::now();
1030 let exclusive = manual_compaction_option.exclusive;
1031
1032 let compactor = match self.compactor_manager.next_compactor() {
1034 Some(compactor) => compactor,
1035 None => {
1036 tracing::warn!("trigger_manual_compaction No compactor is available.");
1037 return Err(anyhow::anyhow!(
1038 "trigger_manual_compaction No compactor is available. compaction_group {}",
1039 compaction_group
1040 )
1041 .into());
1042 }
1043 };
1044
1045 let compact_task = self
1047 .manual_get_compact_task_with_info(compaction_group, manual_compaction_option)
1048 .await;
1049 let (compact_task, blocked_by_pending) = match compact_task {
1050 Ok((compact_task, blocked_by_pending)) => (compact_task, blocked_by_pending),
1051 Err(err) => {
1052 tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
1053 if matches!(err, Error::InvalidManualCompactionOption(_)) {
1054 return Err(err);
1055 }
1056
1057 return Err(anyhow::anyhow!(err)
1058 .context(format!(
1059 "Failed to get compaction task for compaction_group {}",
1060 compaction_group,
1061 ))
1062 .into());
1063 }
1064 };
1065 let compact_task = match compact_task {
1066 Some(compact_task) => compact_task,
1067 None => {
1068 if exclusive && blocked_by_pending {
1069 return Ok(ManualCompactionTriggerResult::Retry);
1070 }
1071 return Err(anyhow::anyhow!(
1073 "trigger_manual_compaction No compaction_task is available. compaction_group {}",
1074 compaction_group
1075 )
1076 .into());
1077 }
1078 };
1079
1080 let task_id = compact_task.task_id;
1082 let compact_task_string = compact_task_to_string(&compact_task);
1083 tracing::info!(
1084 compact_task_string,
1085 duration = ?start_time.elapsed(),
1086 "Triggered manual compaction task."
1087 );
1088
1089 let report_rx = self.register_compaction_task_report_waiter(task_id);
1090 if let Err(err) = compactor
1091 .send_event(ResponseEvent::CompactTask(compact_task.into()))
1092 .with_context(|| {
1093 format!(
1094 "Failed to trigger compaction task for compaction_group {}",
1095 compaction_group,
1096 )
1097 })
1098 {
1099 self.remove_compaction_task_report_waiter(task_id);
1100 return Err(err.into());
1101 }
1102
1103 let report_result = match report_rx.await {
1104 Ok(result) => result,
1105 Err(_) => {
1106 self.remove_compaction_task_report_waiter(task_id);
1107 return Err(anyhow::anyhow!(
1108 "trigger_manual_compaction wait report failed. compaction_group {}",
1109 compaction_group
1110 )
1111 .into());
1112 }
1113 };
1114 if !report_result.reported {
1115 return Err(anyhow::anyhow!(
1116 "trigger_manual_compaction report not accepted. task_id {}",
1117 report_result.task_id
1118 )
1119 .into());
1120 }
1121
1122 if report_result.task_status == TaskStatus::NoAvailCpuResourceCanceled
1123 || report_result.task_status == TaskStatus::NoAvailMemoryResourceCanceled
1124 {
1125 return Ok(ManualCompactionTriggerResult::Retry);
1126 }
1127
1128 tracing::info!(
1129 ?report_result,
1130 duration = ?start_time.elapsed(),
1131 "Completed manual compaction task."
1132 );
1133
1134 Ok(ManualCompactionTriggerResult::Submitted)
1135 }
1136
1137 pub fn try_send_compaction_request(
1139 &self,
1140 compaction_group: CompactionGroupId,
1141 task_type: compact_task::TaskType,
1142 ) -> bool {
1143 self.compaction_state.try_sched_compaction(
1144 compaction_group,
1145 task_type,
1146 ScheduleTrigger::NewData,
1147 )
1148 }
1149
1150 fn apply_split_weight_by_vnode_partition(
1153 &self,
1154 compact_task: &mut CompactTask,
1155 compaction_config: &CompactionConfig,
1156 compact_table_ids: &[TableId],
1157 ) {
1158 if compaction_config.split_weight_by_vnode > 0 {
1159 for table_id in compact_table_ids {
1160 compact_task
1161 .table_vnode_partition
1162 .insert(*table_id, compact_task.split_weight_by_vnode);
1163 }
1164
1165 return;
1166 }
1167
1168 let mut table_size_info: HashMap<TableId, u64> = HashMap::default();
1170 for input_ssts in &compact_task.input_ssts {
1171 for sst in &input_ssts.table_infos {
1172 for table_id in &sst.table_ids {
1173 *table_size_info.entry(*table_id).or_default() +=
1174 sst.sst_size / (sst.table_ids.len() as u64);
1175 }
1176 }
1177 }
1178
1179 let hybrid_vnode_count = self.env.opts.hybrid_partition_node_count;
1180 let default_partition_count = self.env.opts.partition_vnode_count;
1181 let compact_task_table_size_partition_threshold_low = self
1182 .env
1183 .opts
1184 .compact_task_table_size_partition_threshold_low;
1185 let compact_task_table_size_partition_threshold_high = self
1186 .env
1187 .opts
1188 .compact_task_table_size_partition_threshold_high;
1189
1190 let table_write_throughput_statistic_manager =
1192 self.table_write_throughput_statistic_manager.read();
1193 let timestamp = chrono::Utc::now().timestamp();
1194
1195 for (table_id, compact_table_size) in table_size_info {
1196 let write_throughput = table_write_throughput_statistic_manager
1197 .get_table_throughput_descending(table_id, timestamp)
1198 .peekable()
1199 .peek()
1200 .map(|item| item.throughput)
1201 .unwrap_or(0);
1202
1203 if compact_table_size > compact_task_table_size_partition_threshold_high
1204 && default_partition_count > 0
1205 {
1206 compact_task
1207 .table_vnode_partition
1208 .insert(table_id, default_partition_count);
1209 } else if (compact_table_size > compact_task_table_size_partition_threshold_low
1210 || (write_throughput > self.env.opts.table_high_write_throughput_threshold
1211 && compact_table_size > compaction_config.target_file_size_base))
1212 && hybrid_vnode_count > 0
1213 {
1214 compact_task
1215 .table_vnode_partition
1216 .insert(table_id, hybrid_vnode_count);
1217 } else if compact_table_size > compaction_config.target_file_size_base {
1218 compact_task.table_vnode_partition.insert(table_id, 1);
1219 }
1220 }
1221
1222 compact_task
1223 .table_vnode_partition
1224 .retain(|table_id, _| compact_table_ids.contains(table_id));
1225 }
1226
1227 pub(crate) fn calculate_vnode_partition(
1228 &self,
1229 compact_task: &mut CompactTask,
1230 compaction_config: &CompactionConfig,
1231 compact_table_ids: &[TableId],
1232 ) {
1233 if compact_task.target_level > compact_task.base_level {
1238 return;
1239 }
1240
1241 self.apply_split_weight_by_vnode_partition(
1243 compact_task,
1244 compaction_config,
1245 compact_table_ids,
1246 );
1247 }
1248
1249 fn build_ready_compact_task(
1250 &self,
1251 picked_task: PickedCompactionTask,
1252 context: CompactTaskBuildContext,
1253 table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
1254 all_versioned_table_schemas: &HashMap<TableId, Vec<i32>>,
1255 ) -> BuiltCompactTask {
1256 let compaction_config = context.compaction_config.clone();
1257 let (mut compact_task, compact_table_ids) = build_base_compact_task(picked_task, context);
1258
1259 if compact_task.is_trivial_reclaim() {
1260 compact_task.task_status = TaskStatus::Success;
1261 compact_task.sorted_output_ssts.clear();
1262 return BuiltCompactTask::MetaFinished(compact_task);
1263 }
1264
1265 if compact_task.is_trivial_move_task() {
1266 compact_task.task_status = TaskStatus::Success;
1267 compact_task.sorted_output_ssts = compact_task.input_ssts[0]
1268 .read_sstable_infos()
1269 .cloned()
1270 .collect();
1271 return BuiltCompactTask::MetaFinished(compact_task);
1272 }
1273
1274 self.prepare_compact_task_for_assignment(
1275 &mut compact_task,
1276 compaction_config.as_ref(),
1277 &compact_table_ids,
1278 safe_epoch_table_watermarks_impl(table_watermarks, &compact_table_ids),
1279 all_versioned_table_schemas,
1280 );
1281
1282 BuiltCompactTask::PendingAssignment(compact_task)
1283 }
1284
1285 fn prepare_compact_task_for_assignment(
1286 &self,
1287 compact_task: &mut CompactTask,
1288 compaction_config: &CompactionConfig,
1289 compact_table_ids: &[TableId],
1290 table_watermarks: BTreeMap<TableId, TableWatermarks>,
1291 all_versioned_table_schemas: &HashMap<TableId, Vec<i32>>,
1292 ) {
1293 self.calculate_vnode_partition(compact_task, compaction_config, compact_table_ids);
1294 attach_compact_task_table_metadata(
1295 compact_task,
1296 compact_table_ids,
1297 table_watermarks,
1298 all_versioned_table_schemas,
1299 );
1300 }
1301
1302 pub fn compactor_manager_ref(&self) -> crate::hummock::CompactorManagerRef {
1303 self.compactor_manager.clone()
1304 }
1305
1306 fn register_compaction_task_report_waiter(
1307 &self,
1308 task_id: HummockCompactionTaskId,
1309 ) -> Receiver<CompactionTaskReportResult> {
1310 let (tx, rx) = tokio::sync::oneshot::channel();
1311 self.compaction_task_report_notifiers
1312 .lock()
1313 .register(task_id, tx);
1314 rx
1315 }
1316
1317 fn remove_compaction_task_report_waiter(&self, task_id: HummockCompactionTaskId) {
1318 self.compaction_task_report_notifiers.lock().remove(task_id);
1319 }
1320
1321 fn notify_compaction_task_report_waiters(&self, results: Vec<CompactionTaskReportResult>) {
1322 let mut guard = self.compaction_task_report_notifiers.lock();
1323 for result in results {
1324 guard.notify(result);
1325 }
1326 }
1327}
1328
1329#[cfg(any(test, feature = "test"))]
1330impl HummockManager {
1331 pub async fn compaction_task_from_assignment_for_test(
1332 &self,
1333 task_id: u64,
1334 ) -> Option<CompactTaskAssignment> {
1335 let compaction_guard = self.compaction.read().await;
1336 let assignment_ref = &compaction_guard.compact_task_assignment;
1337 assignment_ref.get(&task_id).cloned()
1338 }
1339
1340 pub async fn report_compact_task_for_test(
1341 &self,
1342 task_id: u64,
1343 compact_task: Option<CompactTask>,
1344 task_status: TaskStatus,
1345 sorted_output_ssts: Vec<SstableInfo>,
1346 table_stats_change: Option<PbTableStatsMap>,
1347 ) -> Result<()> {
1348 if let Some(task) = compact_task {
1349 let mut guard = self.compaction.write().await;
1350 guard.compact_task_assignment.insert(
1351 task_id,
1352 CompactTaskAssignment {
1353 compact_task: Some(task.into()),
1354 context_id: 0.into(),
1355 },
1356 );
1357 }
1358
1359 self.report_compact_tasks(vec![ReportTask {
1362 task_id,
1363 task_status,
1364 sorted_output_ssts,
1365 table_stats_change: table_stats_change.unwrap_or_default(),
1366 object_timestamps: HashMap::default(),
1367 }])
1368 .await?;
1369 Ok(())
1370 }
1371}
1372
1373#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1375pub enum ScheduleTrigger {
1376 NewData,
1378 Periodic,
1380}
1381
1382pub struct CompactionScheduleSnapshot {
1387 scheduled: HashSet<(CompactionGroupId, compact_task::TaskType)>,
1388 snapshot_time: Instant,
1389}
1390
1391impl CompactionScheduleSnapshot {
1392 const TASK_TYPE_PRIORITY: &[TaskType] = &[
1394 TaskType::Dynamic,
1395 TaskType::SpaceReclaim,
1396 TaskType::Ttl,
1397 TaskType::Tombstone,
1398 TaskType::VnodeWatermark,
1399 ];
1400
1401 pub fn snapshot_time(&self) -> Instant {
1402 self.snapshot_time
1403 }
1404
1405 pub fn pick_compaction_groups_and_type(&self) -> Option<(Vec<CompactionGroupId>, TaskType)> {
1410 let group_ids = self.group_ids_shuffled();
1411 let mut normal_groups = vec![];
1412 for cg_id in group_ids {
1413 if let Some(pick_type) = self.pick_type(cg_id) {
1414 if pick_type == TaskType::Dynamic {
1415 normal_groups.push(cg_id);
1416 } else if normal_groups.is_empty() {
1417 return Some((vec![cg_id], pick_type));
1418 }
1419 }
1420 }
1421 if normal_groups.is_empty() {
1422 None
1423 } else {
1424 Some((normal_groups, TaskType::Dynamic))
1425 }
1426 }
1427
1428 fn group_ids_shuffled(&self) -> Vec<CompactionGroupId> {
1429 let mut group_ids: Vec<_> = self.scheduled.iter().map(|(g, _)| *g).unique().collect();
1430 group_ids.shuffle(&mut thread_rng());
1431 group_ids
1432 }
1433
1434 fn pick_type(&self, group: CompactionGroupId) -> Option<TaskType> {
1435 Self::TASK_TYPE_PRIORITY
1436 .iter()
1437 .find(|t| self.scheduled.contains(&(group, **t)))
1438 .copied()
1439 }
1440}
1441
1442#[derive(Debug, Default)]
1447pub struct CompactionState {
1448 inner: Mutex<CompactionStateInner>,
1449}
1450
1451#[derive(Debug, Default)]
1452struct CompactionStateInner {
1453 scheduled: HashSet<(CompactionGroupId, compact_task::TaskType)>,
1454 dynamic_cooldown: HashSet<CompactionGroupId>,
1456 last_new_data_time: HashMap<CompactionGroupId, Instant>,
1458}
1459
1460impl CompactionState {
1461 pub fn new() -> Self {
1462 Self {
1463 inner: Default::default(),
1464 }
1465 }
1466
1467 pub fn try_sched_compaction(
1471 &self,
1472 compaction_group: CompactionGroupId,
1473 task_type: TaskType,
1474 trigger: ScheduleTrigger,
1475 ) -> bool {
1476 let mut guard = self.inner.lock();
1477 if task_type == TaskType::Dynamic {
1478 match trigger {
1479 ScheduleTrigger::NewData => {
1480 guard.dynamic_cooldown.remove(&compaction_group);
1481 guard
1482 .last_new_data_time
1483 .insert(compaction_group, Instant::now());
1484 }
1485 ScheduleTrigger::Periodic => {
1486 if guard.dynamic_cooldown.contains(&compaction_group) {
1487 return false;
1488 }
1489 }
1490 }
1491 }
1492 guard.scheduled.insert((compaction_group, task_type))
1493 }
1494
1495 pub fn unschedule(
1498 &self,
1499 compaction_group: CompactionGroupId,
1500 task_type: compact_task::TaskType,
1501 snapshot_time: Instant,
1502 ) {
1503 let mut guard = self.inner.lock();
1504 guard.scheduled.remove(&(compaction_group, task_type));
1505 if task_type == TaskType::Dynamic {
1506 let has_new_data = guard
1507 .last_new_data_time
1508 .get(&compaction_group)
1509 .is_some_and(|t| *t > snapshot_time);
1510 if !has_new_data {
1511 guard.dynamic_cooldown.insert(compaction_group);
1512 }
1513 }
1514 }
1515
1516 pub fn snapshot(&self) -> CompactionScheduleSnapshot {
1518 let guard = self.inner.lock();
1519 let snapshot_time = Instant::now();
1521 CompactionScheduleSnapshot {
1522 scheduled: guard.scheduled.clone(),
1523 snapshot_time,
1524 }
1525 }
1526
1527 pub fn remove_compaction_group(&self, compaction_group: CompactionGroupId) {
1529 let mut guard = self.inner.lock();
1530 guard
1531 .scheduled
1532 .retain(|(group, _)| *group != compaction_group);
1533 guard.dynamic_cooldown.remove(&compaction_group);
1534 guard.last_new_data_time.remove(&compaction_group);
1535 }
1536}
1537
1538impl Compaction {
1539 pub fn get_compact_task_assignments_by_group_id(
1540 &self,
1541 compaction_group_id: CompactionGroupId,
1542 ) -> Vec<CompactTaskAssignment> {
1543 self.compact_task_assignment
1544 .iter()
1545 .filter_map(|(_, assignment)| {
1546 if assignment
1547 .compact_task
1548 .as_ref()
1549 .is_some_and(|task| task.compaction_group_id == compaction_group_id)
1550 {
1551 Some(CompactTaskAssignment {
1552 compact_task: assignment.compact_task.clone(),
1553 context_id: assignment.context_id,
1554 })
1555 } else {
1556 None
1557 }
1558 })
1559 .collect()
1560 }
1561}
1562
1563#[derive(Clone, Default)]
1564pub struct CompactionGroupStatistic {
1565 pub group_id: CompactionGroupId,
1566 pub group_size: u64,
1567 pub table_statistic: BTreeMap<StateTableId, u64>,
1568 pub compaction_group_config: CompactionGroup,
1569}
1570
1571fn update_table_stats_for_vnode_watermark_trivial_reclaim(
1573 table_stats: &mut PbTableStatsMap,
1574 task: &CompactTask,
1575) {
1576 if task.task_type != TaskType::VnodeWatermark {
1577 return;
1578 }
1579 let mut deleted_table_keys: HashMap<TableId, u64> = HashMap::default();
1580 for s in task.input_ssts.iter().flat_map(|l| l.table_infos.iter()) {
1581 assert_eq!(s.table_ids.len(), 1);
1582 let e = deleted_table_keys.entry(s.table_ids[0]).or_insert(0);
1583 *e += s.total_key_count;
1584 }
1585 for (table_id, delete_count) in deleted_table_keys {
1586 let Some(stats) = table_stats.get_mut(&table_id) else {
1587 continue;
1588 };
1589 if stats.total_key_count == 0 {
1590 continue;
1591 }
1592 let new_total_key_count = stats.total_key_count.saturating_sub(delete_count as i64);
1593 let ratio = new_total_key_count as f64 / stats.total_key_count as f64;
1594 stats.total_key_count = new_total_key_count;
1596 stats.total_key_size = (stats.total_key_size as f64 * ratio).ceil() as i64;
1598 stats.total_value_size = (stats.total_value_size as f64 * ratio).ceil() as i64;
1599 }
1600}
1601
1602#[derive(Debug, Clone)]
1603pub enum GroupState {
1604 Normal,
1606
1607 Emergency(String), WriteStop(String), }
1613
1614impl GroupState {
1615 pub fn is_write_stop(&self) -> bool {
1616 matches!(self, Self::WriteStop(_))
1617 }
1618
1619 pub fn is_emergency(&self) -> bool {
1620 matches!(self, Self::Emergency(_))
1621 }
1622
1623 pub fn reason(&self) -> Option<&str> {
1624 match self {
1625 Self::Emergency(reason) | Self::WriteStop(reason) => Some(reason),
1626 _ => None,
1627 }
1628 }
1629}
1630
1631#[derive(Clone, Default)]
1632pub struct GroupStateValidator;
1633
1634impl GroupStateValidator {
1635 pub fn write_stop_sub_level_count(
1636 level_count: usize,
1637 compaction_config: &CompactionConfig,
1638 ) -> bool {
1639 let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize;
1640 level_count > threshold
1641 }
1642
1643 pub fn write_stop_l0_size(l0_size: u64, compaction_config: &CompactionConfig) -> bool {
1644 l0_size
1645 > compaction_config
1646 .level0_stop_write_threshold_max_size
1647 .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1648 }
1649
1650 pub fn write_stop_l0_file_count(
1651 l0_file_count: usize,
1652 compaction_config: &CompactionConfig,
1653 ) -> bool {
1654 l0_file_count
1655 > compaction_config
1656 .level0_stop_write_threshold_max_sst_count
1657 .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1658 as usize
1659 }
1660
1661 pub fn emergency_l0_file_count(
1662 l0_file_count: usize,
1663 compaction_config: &CompactionConfig,
1664 ) -> bool {
1665 l0_file_count
1666 > compaction_config
1667 .emergency_level0_sst_file_count
1668 .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1669 as usize
1670 }
1671
1672 pub fn emergency_l0_partition_count(
1673 last_l0_sub_level_partition_count: usize,
1674 compaction_config: &CompactionConfig,
1675 ) -> bool {
1676 last_l0_sub_level_partition_count
1677 > compaction_config
1678 .emergency_level0_sub_level_partition
1679 .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1680 as usize
1681 }
1682
1683 pub fn check_single_group_write_stop(
1684 levels: &Levels,
1685 compaction_config: &CompactionConfig,
1686 ) -> GroupState {
1687 if Self::write_stop_sub_level_count(levels.l0.sub_levels.len(), compaction_config) {
1688 return GroupState::WriteStop(format!(
1689 "WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels",
1690 levels.l0.sub_levels.len(),
1691 compaction_config.level0_stop_write_threshold_sub_level_number
1692 ));
1693 }
1694
1695 if Self::write_stop_l0_file_count(
1696 levels
1697 .l0
1698 .sub_levels
1699 .iter()
1700 .map(|l| l.table_infos.len())
1701 .sum(),
1702 compaction_config,
1703 ) {
1704 return GroupState::WriteStop(format!(
1705 "WriteStop(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1706 levels
1707 .l0
1708 .sub_levels
1709 .iter()
1710 .map(|l| l.table_infos.len())
1711 .sum::<usize>(),
1712 compaction_config
1713 .level0_stop_write_threshold_max_sst_count
1714 .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1715 ));
1716 }
1717
1718 if Self::write_stop_l0_size(levels.l0.total_file_size, compaction_config) {
1719 return GroupState::WriteStop(format!(
1720 "WriteStop(l0_size: {}, threshold: {}) too large L0 size",
1721 levels.l0.total_file_size,
1722 compaction_config
1723 .level0_stop_write_threshold_max_size
1724 .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1725 ));
1726 }
1727
1728 GroupState::Normal
1729 }
1730
1731 pub fn check_single_group_emergency(
1732 levels: &Levels,
1733 compaction_config: &CompactionConfig,
1734 ) -> GroupState {
1735 if Self::emergency_l0_file_count(
1736 levels
1737 .l0
1738 .sub_levels
1739 .iter()
1740 .map(|l| l.table_infos.len())
1741 .sum(),
1742 compaction_config,
1743 ) {
1744 return GroupState::Emergency(format!(
1745 "Emergency(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1746 levels
1747 .l0
1748 .sub_levels
1749 .iter()
1750 .map(|l| l.table_infos.len())
1751 .sum::<usize>(),
1752 compaction_config
1753 .emergency_level0_sst_file_count
1754 .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1755 ));
1756 }
1757
1758 if Self::emergency_l0_partition_count(
1759 levels
1760 .l0
1761 .sub_levels
1762 .first()
1763 .map(|l| l.table_infos.len())
1764 .unwrap_or(0),
1765 compaction_config,
1766 ) {
1767 return GroupState::Emergency(format!(
1768 "Emergency(l0_partition_count: {}, threshold: {}) too many L0 partitions",
1769 levels
1770 .l0
1771 .sub_levels
1772 .first()
1773 .map(|l| l.table_infos.len())
1774 .unwrap_or(0),
1775 compaction_config
1776 .emergency_level0_sub_level_partition
1777 .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1778 ));
1779 }
1780
1781 GroupState::Normal
1782 }
1783
1784 pub fn group_state(levels: &Levels, compaction_config: &CompactionConfig) -> GroupState {
1785 let state = Self::check_single_group_write_stop(levels, compaction_config);
1786 if state.is_write_stop() {
1787 return state;
1788 }
1789
1790 Self::check_single_group_emergency(levels, compaction_config)
1791 }
1792}
1793
1794#[cfg(test)]
1795mod prefetched_task_id_tests {
1796 use crate::manager::MetaOpts;
1797
1798 #[test]
1799 fn test_compaction_task_id_refill_capacity_default() {
1800 assert_eq!(MetaOpts::test(false).compaction_task_id_refill_capacity, 64);
1801 }
1802}
1803
1804#[cfg(test)]
1805mod compaction_state_tests {
1806 use risingwave_pb::hummock::compact_task::TaskType;
1807
1808 use super::*;
1809
1810 #[test]
1811 fn test_basic_schedule_and_unschedule() {
1812 let state = CompactionState::new();
1813 let group_id: CompactionGroupId = 1.into();
1814
1815 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1817 assert!(!state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1819 assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1821
1822 let snapshot = state.snapshot();
1824 assert!(snapshot.scheduled.contains(&(group_id, TaskType::Dynamic)));
1825 assert!(snapshot.scheduled.contains(&(group_id, TaskType::Ttl)));
1826
1827 state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1829 let snapshot2 = state.snapshot();
1830 assert!(!snapshot2.scheduled.contains(&(group_id, TaskType::Dynamic)));
1831 assert!(snapshot2.scheduled.contains(&(group_id, TaskType::Ttl)));
1832 }
1833
1834 #[test]
1835 fn test_cooldown_blocks_periodic_trigger() {
1836 let state = CompactionState::new();
1837 let group_id: CompactionGroupId = 1.into();
1838
1839 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1841 let snapshot = state.snapshot();
1842 state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1843
1844 assert!(state.inner.lock().dynamic_cooldown.contains(&group_id));
1846
1847 assert!(!state.try_sched_compaction(
1849 group_id,
1850 TaskType::Dynamic,
1851 ScheduleTrigger::Periodic
1852 ));
1853 }
1854
1855 #[test]
1856 fn test_new_data_clears_cooldown() {
1857 let state = CompactionState::new();
1858 let group_id: CompactionGroupId = 1.into();
1859
1860 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1862 let snapshot = state.snapshot();
1863 state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1864 assert!(state.inner.lock().dynamic_cooldown.contains(&group_id));
1865
1866 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1868 assert!(!state.inner.lock().dynamic_cooldown.contains(&group_id));
1869 }
1870
1871 #[test]
1872 fn test_cooldown_only_affects_dynamic_type() {
1873 let state = CompactionState::new();
1874 let group_id: CompactionGroupId = 1.into();
1875
1876 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1878 let snapshot = state.snapshot();
1879 state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1880
1881 let group_id_2: CompactionGroupId = 2.into();
1883 assert!(state.try_sched_compaction(group_id_2, TaskType::Ttl, ScheduleTrigger::Periodic));
1884 let snapshot2 = state.snapshot();
1885 state.unschedule(group_id_2, TaskType::Ttl, snapshot2.snapshot_time());
1886 assert!(!state.inner.lock().dynamic_cooldown.contains(&group_id_2));
1887
1888 assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1890 assert!(state.try_sched_compaction(
1891 group_id,
1892 TaskType::SpaceReclaim,
1893 ScheduleTrigger::Periodic
1894 ));
1895 }
1896
1897 #[test]
1898 fn test_race_condition_new_data_after_snapshot() {
1899 let state = CompactionState::new();
1900 let group_id: CompactionGroupId = 1.into();
1901
1902 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1903 let snapshot = state.snapshot();
1904
1905 {
1907 let mut guard = state.inner.lock();
1908 guard.last_new_data_time.insert(group_id, Instant::now());
1909 }
1910
1911 state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1913 assert!(
1914 !state.inner.lock().dynamic_cooldown.contains(&group_id),
1915 "Should skip cooldown when new data arrived after snapshot"
1916 );
1917 }
1918
1919 #[test]
1920 fn test_remove_compaction_group_cleans_all_state() {
1921 let state = CompactionState::new();
1922 let group_id: CompactionGroupId = 1.into();
1923
1924 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1926 assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1927 state.inner.lock().dynamic_cooldown.insert(group_id);
1928
1929 state.remove_compaction_group(group_id);
1931
1932 let guard = state.inner.lock();
1934 assert!(!guard.scheduled.contains(&(group_id, TaskType::Dynamic)));
1935 assert!(!guard.scheduled.contains(&(group_id, TaskType::Ttl)));
1936 assert!(!guard.dynamic_cooldown.contains(&group_id));
1937 assert!(!guard.last_new_data_time.contains_key(&group_id));
1938 }
1939
1940 #[test]
1941 fn test_snapshot_pick_type_priority() {
1942 let state = CompactionState::new();
1943 let group_id: CompactionGroupId = 1.into();
1944
1945 assert_eq!(state.snapshot().pick_type(group_id), None);
1947
1948 state.try_sched_compaction(
1950 group_id,
1951 TaskType::VnodeWatermark,
1952 ScheduleTrigger::Periodic,
1953 );
1954 assert_eq!(
1955 state.snapshot().pick_type(group_id),
1956 Some(TaskType::VnodeWatermark)
1957 );
1958
1959 state.try_sched_compaction(group_id, TaskType::Tombstone, ScheduleTrigger::Periodic);
1960 assert_eq!(
1961 state.snapshot().pick_type(group_id),
1962 Some(TaskType::Tombstone)
1963 );
1964
1965 state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic);
1966 assert_eq!(state.snapshot().pick_type(group_id), Some(TaskType::Ttl));
1967
1968 state.try_sched_compaction(group_id, TaskType::SpaceReclaim, ScheduleTrigger::Periodic);
1969 assert_eq!(
1970 state.snapshot().pick_type(group_id),
1971 Some(TaskType::SpaceReclaim)
1972 );
1973
1974 state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData);
1975 assert_eq!(
1976 state.snapshot().pick_type(group_id),
1977 Some(TaskType::Dynamic)
1978 );
1979 }
1980
1981 #[test]
1982 fn test_multiple_groups_independent_cooldown() {
1983 let state = CompactionState::new();
1984 let g1: CompactionGroupId = 1.into();
1985 let g2: CompactionGroupId = 2.into();
1986
1987 state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
1988 state.try_sched_compaction(g2, TaskType::Dynamic, ScheduleTrigger::NewData);
1989 let snapshot = state.snapshot();
1990
1991 state.unschedule(g1, TaskType::Dynamic, snapshot.snapshot_time());
1993
1994 let guard = state.inner.lock();
1995 assert!(guard.dynamic_cooldown.contains(&g1));
1996 assert!(!guard.dynamic_cooldown.contains(&g2));
1997 }
1998
1999 #[test]
2000 fn test_pick_compaction_groups_empty() {
2001 let state = CompactionState::new();
2002 let snapshot = state.snapshot();
2003 assert!(snapshot.pick_compaction_groups_and_type().is_none());
2005 }
2006
2007 #[test]
2008 fn test_pick_compaction_groups_mixed_types() {
2009 let state = CompactionState::new();
2010 let g1: CompactionGroupId = 1.into();
2011 let g2: CompactionGroupId = 2.into();
2012 let g3: CompactionGroupId = 3.into();
2013
2014 state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
2016 state.try_sched_compaction(g2, TaskType::Ttl, ScheduleTrigger::Periodic);
2017 state.try_sched_compaction(g3, TaskType::Dynamic, ScheduleTrigger::NewData);
2018
2019 let snapshot = state.snapshot();
2020 let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
2021
2022 if task_type == TaskType::Dynamic {
2027 assert!(groups.contains(&g1));
2028 assert!(groups.contains(&g3));
2029 assert!(!groups.contains(&g2)); } else {
2031 assert_eq!(task_type, TaskType::Ttl);
2032 assert_eq!(groups, vec![g2]);
2033 }
2034 }
2035
2036 #[test]
2037 fn test_pick_compaction_groups_all_dynamic() {
2038 let state = CompactionState::new();
2039 let g1: CompactionGroupId = 1.into();
2040 let g2: CompactionGroupId = 2.into();
2041
2042 state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
2043 state.try_sched_compaction(g2, TaskType::Dynamic, ScheduleTrigger::NewData);
2044
2045 let snapshot = state.snapshot();
2046 let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
2047 assert_eq!(task_type, TaskType::Dynamic);
2048 assert!(groups.contains(&g1));
2049 assert!(groups.contains(&g2));
2050 }
2051
2052 #[test]
2053 fn test_pick_compaction_groups_single_non_dynamic() {
2054 let state = CompactionState::new();
2055 let g1: CompactionGroupId = 1.into();
2056
2057 state.try_sched_compaction(g1, TaskType::SpaceReclaim, ScheduleTrigger::Periodic);
2058
2059 let snapshot = state.snapshot();
2060 let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
2061 assert_eq!(task_type, TaskType::SpaceReclaim);
2062 assert_eq!(groups, vec![g1]);
2063 }
2064}