1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17use std::time::Instant;
18
19use anyhow::Context;
20use compaction_event_loop::{
21 HummockCompactionEventDispatcher, HummockCompactionEventHandler, HummockCompactionEventLoop,
22 HummockCompactorDedicatedEventLoop,
23};
24use fail::fail_point;
25use itertools::Itertools;
26use parking_lot::Mutex;
27use rand::rng as thread_rng;
28use rand::seq::SliceRandom;
29use risingwave_common::catalog::TableId;
30use risingwave_common::config::meta::default::compaction_config;
31use risingwave_common::util::epoch::Epoch;
32use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask};
33use risingwave_hummock_sdk::compaction_group::StateTableId;
34use risingwave_hummock_sdk::key_range::KeyRange;
35use risingwave_hummock_sdk::level::Levels;
36use risingwave_hummock_sdk::sstable_info::SstableInfo;
37use risingwave_hummock_sdk::table_stats::{
38 PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats,
39};
40use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
41use risingwave_hummock_sdk::version::{GroupDelta, IntraLevelDelta};
42use risingwave_hummock_sdk::{
43 CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockSstableId,
44 HummockSstableObjectId, HummockVersionId, compact_task_to_string, statistics_compact_task,
45};
46use risingwave_meta_model::hummock_sequence::COMPACTION_TASK_ID;
47use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
48use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
49use risingwave_pb::hummock::{
50 CompactTaskAssignment, CompactionConfig, PbCompactStatus, PbCompactTaskAssignment,
51 SubscribeCompactionEventRequest, TableOption, TableSchema, compact_task,
52};
53use thiserror_ext::AsReport;
54use tokio::sync::RwLockWriteGuard;
55use tokio::sync::mpsc::UnboundedReceiver;
56use tokio::sync::oneshot::{Receiver, Sender};
57use tokio::task::JoinHandle;
58use tonic::Streaming;
59use tracing::warn;
60
61use crate::hummock::compaction::selector::level_selector::PickerInfo;
62use crate::hummock::compaction::selector::{
63 DynamicLevelSelector, DynamicLevelSelectorCore, LocalSelectorStatistic, ManualCompactionOption,
64 ManualCompactionSelector, SpaceReclaimCompactionSelector, TombstoneCompactionSelector,
65 TtlCompactionSelector, VnodeWatermarkCompactionSelector,
66};
67use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig, CompactionSelector};
68use crate::hummock::error::{Error, Result};
69use crate::hummock::manager::CompactionTaskReportResult;
70use crate::hummock::manager::transaction::{
71 HummockVersionStatsTransaction, HummockVersionTransaction,
72};
73use crate::hummock::manager::versioning::Versioning;
74use crate::hummock::metrics_utils::{
75 build_compact_task_level_type_metrics_label, trigger_compact_tasks_stat,
76 trigger_local_table_stat,
77};
78use crate::hummock::model::CompactionGroup;
79use crate::hummock::{HummockManager, commit_multi_var, start_measure_real_process_timer};
80use crate::manager::META_NODE_ID;
81use crate::model::BTreeMapTransaction;
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84pub enum ManualCompactionTriggerResult {
85 Submitted,
86 Retry,
87}
88
89pub mod compaction_event_loop;
90pub mod compaction_group_manager;
91pub mod compaction_group_schedule;
92
93static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
94 [
95 TaskStatus::ManualCanceled,
96 TaskStatus::SendFailCanceled,
97 TaskStatus::AssignFailCanceled,
98 TaskStatus::HeartbeatCanceled,
99 TaskStatus::InvalidGroupCanceled,
100 TaskStatus::NoAvailMemoryResourceCanceled,
101 TaskStatus::NoAvailCpuResourceCanceled,
102 TaskStatus::HeartbeatProgressCanceled,
103 ]
104 .into_iter()
105 .collect()
106});
107
108fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> {
109 let mut compaction_selectors: HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> =
110 HashMap::default();
111 compaction_selectors.insert(
112 compact_task::TaskType::Dynamic,
113 Box::<DynamicLevelSelector>::default(),
114 );
115 compaction_selectors.insert(
116 compact_task::TaskType::SpaceReclaim,
117 Box::<SpaceReclaimCompactionSelector>::default(),
118 );
119 compaction_selectors.insert(
120 compact_task::TaskType::Ttl,
121 Box::<TtlCompactionSelector>::default(),
122 );
123 compaction_selectors.insert(
124 compact_task::TaskType::Tombstone,
125 Box::<TombstoneCompactionSelector>::default(),
126 );
127 compaction_selectors.insert(
128 compact_task::TaskType::VnodeWatermark,
129 Box::<VnodeWatermarkCompactionSelector>::default(),
130 );
131 compaction_selectors
132}
133
134impl HummockVersionTransaction<'_> {
135 fn apply_compact_task(&mut self, compact_task: &CompactTask) {
136 let mut version_delta = self.new_delta();
137 let trivial_move = compact_task.is_trivial_move_task();
138 version_delta.trivial_move = trivial_move;
139
140 let group_deltas = &mut version_delta
141 .group_deltas
142 .entry(compact_task.compaction_group_id)
143 .or_default()
144 .group_deltas;
145 let mut removed_table_ids_map: BTreeMap<u32, HashSet<HummockSstableId>> =
146 BTreeMap::default();
147
148 for level in &compact_task.input_ssts {
149 let level_idx = level.level_idx;
150
151 removed_table_ids_map
152 .entry(level_idx)
153 .or_default()
154 .extend(level.table_infos.iter().map(|sst| sst.sst_id));
155 }
156
157 for (level_idx, removed_table_ids) in removed_table_ids_map {
158 let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
159 level_idx,
160 0, removed_table_ids,
162 vec![], 0, compact_task.compaction_group_version_id,
165 ));
166
167 group_deltas.push(group_delta);
168 }
169
170 let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
171 compact_task.target_level,
172 compact_task.target_sub_level_id,
173 HashSet::new(), compact_task.sorted_output_ssts.clone(),
175 compact_task.split_weight_by_vnode,
176 compact_task.compaction_group_version_id,
177 ));
178
179 group_deltas.push(group_delta);
180 version_delta.pre_apply();
181 }
182}
183
184#[derive(Default)]
185pub struct Compaction {
186 pub compact_task_assignment: BTreeMap<HummockCompactionTaskId, PbCompactTaskAssignment>,
188 pub compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus>,
190
191 pub _deterministic_mode: bool,
192}
193
194impl HummockManager {
195 pub async fn get_assigned_compact_task_num(&self) -> u64 {
196 self.compaction.read().await.compact_task_assignment.len() as u64
197 }
198
199 pub async fn list_compaction_status(
200 &self,
201 ) -> (Vec<PbCompactStatus>, Vec<CompactTaskAssignment>) {
202 let compaction = self.compaction.read().await;
203 (
204 compaction.compaction_statuses.values().map_into().collect(),
205 compaction
206 .compact_task_assignment
207 .values()
208 .cloned()
209 .collect(),
210 )
211 }
212
213 pub async fn get_compaction_scores(
214 &self,
215 compaction_group_id: CompactionGroupId,
216 ) -> Vec<PickerInfo> {
217 let (status, levels, group) = {
218 let compaction = self.compaction.read().await;
219 let versioning = self.versioning.read().await;
220 let config_manager = self.compaction_group_manager.read().await;
221 match (
222 compaction.compaction_statuses.get(&compaction_group_id),
223 versioning.current_version.levels.get(&compaction_group_id),
224 config_manager.try_get_compaction_group_config(compaction_group_id),
225 ) {
226 (Some(cs), Some(v), Some(cf)) => (cs.to_owned(), v.to_owned(), cf),
227 _ => {
228 return vec![];
229 }
230 }
231 };
232 let dynamic_level_core = DynamicLevelSelectorCore::new(
233 group.compaction_config,
234 Arc::new(CompactionDeveloperConfig::default()),
235 );
236 let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers);
237 ctx.score_levels
238 }
239}
240
241impl HummockManager {
242 pub fn compaction_event_loop(
243 hummock_manager: Arc<Self>,
244 compactor_streams_change_rx: UnboundedReceiver<(
245 HummockContextId,
246 Streaming<SubscribeCompactionEventRequest>,
247 )>,
248 ) -> Vec<(JoinHandle<()>, Sender<()>)> {
249 let mut join_handle_vec = Vec::default();
250
251 let hummock_compaction_event_handler =
252 HummockCompactionEventHandler::new(hummock_manager.clone());
253
254 let dedicated_event_loop = HummockCompactorDedicatedEventLoop::new(
255 hummock_manager.clone(),
256 hummock_compaction_event_handler.clone(),
257 );
258
259 let (dedicated_event_loop_join_handle, event_tx, shutdown_tx) = dedicated_event_loop.run();
260 join_handle_vec.push((dedicated_event_loop_join_handle, shutdown_tx));
261
262 let hummock_compaction_event_dispatcher = HummockCompactionEventDispatcher::new(
263 hummock_manager.env.opts.clone(),
264 hummock_compaction_event_handler,
265 Some(event_tx),
266 );
267
268 let event_loop = HummockCompactionEventLoop::new(
269 hummock_compaction_event_dispatcher,
270 hummock_manager.metrics.clone(),
271 compactor_streams_change_rx,
272 );
273
274 let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
275 join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
276
277 join_handle_vec
278 }
279
280 pub fn add_compactor_stream(
281 &self,
282 context_id: HummockContextId,
283 req_stream: Streaming<SubscribeCompactionEventRequest>,
284 ) {
285 self.compactor_streams_change_tx
286 .send((context_id, req_stream))
287 .unwrap();
288 }
289}
290
291impl HummockManager {
292 async fn next_compaction_task_id_with_prefetch(&self, refill_capacity: u32) -> Result<u64> {
295 self.prefetched_compaction_task_ids
296 .next(refill_capacity, |count| async move {
297 self.env
298 .hummock_seq
299 .next_interval(COMPACTION_TASK_ID, count)
300 .await
301 })
302 .await
303 }
304
305 pub async fn get_compact_tasks_impl(
306 &self,
307 compaction_groups: Vec<CompactionGroupId>,
308 max_select_count: usize,
309 selector: &mut dyn CompactionSelector,
310 ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
311 let deterministic_mode = self.env.opts.compaction_deterministic_test;
312
313 let mut compaction_guard = self.compaction.write().await;
314 let compaction: &mut Compaction = &mut compaction_guard;
315 let mut versioning_guard = self.versioning.write().await;
316 let versioning: &mut Versioning = &mut versioning_guard;
317
318 let _timer = start_measure_real_process_timer!(self, "get_compact_tasks_impl");
319
320 let start_time = Instant::now();
321 let mut compaction_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
322
323 let mut compact_task_assignment =
324 BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
325
326 let mut version = HummockVersionTransaction::new(
327 &mut versioning.current_version,
328 &mut versioning.hummock_version_deltas,
329 &mut versioning.table_change_log,
330 self.env.notification_manager(),
331 None,
332 &self.metrics,
333 &self.env.opts,
334 );
335 let mut version_stats = HummockVersionStatsTransaction::new(
337 &mut versioning.version_stats,
338 self.env.notification_manager(),
339 );
340
341 if deterministic_mode {
342 version.disable_apply_to_txn();
343 }
344 let all_versioned_table_schemas = if self.env.opts.enable_dropped_column_reclaim {
345 self.metadata_manager
346 .catalog_controller
347 .get_versioned_table_schemas()
348 .await
349 .map_err(|e| Error::Internal(e.into()))?
350 } else {
351 HashMap::default()
352 };
353 let mut unschedule_groups = vec![];
354 let mut trivial_tasks = vec![];
355 let mut pick_tasks = vec![];
356 let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts(
357 &self.env.opts,
358 ));
359 'outside: for compaction_group_id in compaction_groups {
364 if pick_tasks.len() >= max_select_count {
365 break;
366 }
367
368 if !version
369 .latest_version()
370 .levels
371 .contains_key(&compaction_group_id)
372 {
373 continue;
374 }
375
376 let group_config = {
380 let config_manager = self.compaction_group_manager.read().await;
381
382 match config_manager.try_get_compaction_group_config(compaction_group_id) {
383 Some(config) => config,
384 None => continue,
385 }
386 };
387
388 let task_id = self
390 .next_compaction_task_id_with_prefetch(
391 self.env.opts.compaction_task_id_refill_capacity,
392 )
393 .await?;
394
395 if !compaction_statuses.contains_key(&compaction_group_id) {
396 compaction_statuses.insert(
398 compaction_group_id,
399 CompactStatus::new(
400 compaction_group_id,
401 group_config.compaction_config.max_level,
402 ),
403 );
404 }
405 let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap();
406
407 let can_trivial_move = matches!(selector.task_type(), TaskType::Dynamic)
408 || matches!(selector.task_type(), TaskType::Emergency);
409
410 let mut stats = LocalSelectorStatistic::default();
411 let member_table_ids: Vec<_> = version
412 .latest_version()
413 .state_table_info
414 .compaction_group_member_table_ids(compaction_group_id)
415 .iter()
416 .copied()
417 .collect();
418
419 let mut table_id_to_option: HashMap<TableId, _> = HashMap::default();
420
421 {
422 let guard = self.table_id_to_table_option.read();
423 for table_id in &member_table_ids {
424 if let Some(opts) = guard.get(table_id) {
425 table_id_to_option.insert(*table_id, *opts);
426 }
427 }
428 }
429
430 while let Some(compact_task) = compact_status.get_compact_task(
431 version
432 .latest_version()
433 .get_compaction_group_levels(compaction_group_id),
434 version
435 .latest_version()
436 .state_table_info
437 .compaction_group_member_table_ids(compaction_group_id),
438 task_id as HummockCompactionTaskId,
439 &group_config,
440 &mut stats,
441 selector,
442 &table_id_to_option,
443 developer_config.clone(),
444 &version.latest_version().table_watermarks,
445 &version.latest_version().state_table_info,
446 ) {
447 let target_level_id = compact_task.input.target_level as u32;
448 let compaction_group_version_id = version
449 .latest_version()
450 .get_compaction_group_levels(compaction_group_id)
451 .compaction_group_version_id;
452 let compression_algorithm = match compact_task.compression_algorithm.as_str() {
453 "Lz4" => 1,
454 "Zstd" => 2,
455 _ => 0,
456 };
457 let vnode_partition_count = compact_task.input.vnode_partition_count;
458 let mut compact_task = CompactTask {
459 input_ssts: compact_task.input.input_levels,
460 splits: vec![KeyRange::inf()],
461 sorted_output_ssts: vec![],
462 task_id,
463 target_level: target_level_id,
464 gc_delete_keys: version
467 .latest_version()
468 .get_compaction_group_levels(compaction_group_id)
469 .is_last_level(target_level_id),
470 base_level: compact_task.base_level as u32,
471 task_status: TaskStatus::Pending,
472 compaction_group_id: group_config.group_id,
473 compaction_group_version_id,
474 existing_table_ids: member_table_ids.clone(),
475 compression_algorithm,
476 target_file_size: compact_task.target_file_size,
477 table_options: table_id_to_option
478 .iter()
479 .map(|(table_id, table_option)| {
480 (*table_id, TableOption::from(table_option))
481 })
482 .collect(),
483 current_epoch_time: Epoch::now().0,
484 compaction_filter_mask: group_config.compaction_config.compaction_filter_mask,
485 target_sub_level_id: compact_task.input.target_sub_level_id,
486 task_type: compact_task.compaction_task_type,
487 split_weight_by_vnode: vnode_partition_count,
488 max_sub_compaction: group_config.compaction_config.max_sub_compaction,
489 max_kv_count_for_xor16: group_config.compaction_config.max_kv_count_for_xor16,
490 max_vnode_key_range_bytes: group_config
491 .compaction_config
492 .max_vnode_key_range_bytes,
493 ..Default::default()
494 };
495
496 let is_trivial_reclaim = compact_task.is_trivial_reclaim();
497 let is_trivial_move = compact_task.is_trivial_move_task();
498 if is_trivial_reclaim || (is_trivial_move && can_trivial_move) {
499 let log_label = if is_trivial_reclaim {
500 "TrivialReclaim"
501 } else {
502 "TrivialMove"
503 };
504 let label = if is_trivial_reclaim {
505 "trivial-space-reclaim"
506 } else {
507 "trivial-move"
508 };
509
510 tracing::debug!(
511 "{} for compaction group {}: input: {:?}, cost time: {:?}",
512 log_label,
513 compact_task.compaction_group_id,
514 compact_task.input_ssts,
515 start_time.elapsed()
516 );
517 compact_task.task_status = TaskStatus::Success;
518 compact_status.report_compact_task(&compact_task);
519 if !is_trivial_reclaim {
520 compact_task
521 .sorted_output_ssts
522 .clone_from(&compact_task.input_ssts[0].table_infos);
523 }
524 update_table_stats_for_vnode_watermark_trivial_reclaim(
525 &mut version_stats.table_stats,
526 &compact_task,
527 );
528 self.metrics
529 .compact_frequency
530 .with_label_values(&[
531 label,
532 &compact_task.compaction_group_id.to_string(),
533 selector.task_type().as_str_name(),
534 "SUCCESS",
535 ])
536 .inc();
537
538 version.apply_compact_task(&compact_task);
539 trivial_tasks.push(compact_task);
540 if trivial_tasks.len() >= self.env.opts.max_trivial_move_task_count_per_loop {
541 break 'outside;
542 }
543 } else {
544 self.calculate_vnode_partition(
545 &mut compact_task,
546 group_config.compaction_config.as_ref(),
547 );
548
549 let table_ids_to_be_compacted = compact_task.build_compact_table_ids();
550
551 let mut pk_prefix_table_watermarks = BTreeMap::default();
552 let mut non_pk_prefix_table_watermarks = BTreeMap::default();
553 let mut value_table_watermarks = BTreeMap::default();
554 for (table_id, watermark) in version
555 .latest_version()
556 .safe_epoch_table_watermarks(&table_ids_to_be_compacted)
557 {
558 match watermark.watermark_type {
559 WatermarkSerdeType::PkPrefix => {
560 pk_prefix_table_watermarks.insert(table_id, watermark);
561 }
562 WatermarkSerdeType::NonPkPrefix => {
563 non_pk_prefix_table_watermarks.insert(table_id, watermark);
564 }
565 WatermarkSerdeType::Value => {
566 value_table_watermarks.insert(table_id, watermark);
567 }
568 }
569 }
570 compact_task.pk_prefix_table_watermarks = pk_prefix_table_watermarks;
571 compact_task.non_pk_prefix_table_watermarks = non_pk_prefix_table_watermarks;
572 compact_task.value_table_watermarks = value_table_watermarks;
573
574 compact_task.table_schemas = compact_task
575 .existing_table_ids
576 .iter()
577 .filter_map(|table_id| {
578 all_versioned_table_schemas.get(table_id).map(|column_ids| {
579 (
580 *table_id,
581 TableSchema {
582 column_ids: column_ids.clone(),
583 },
584 )
585 })
586 })
587 .collect();
588
589 compact_task_assignment.insert(
590 compact_task.task_id,
591 CompactTaskAssignment {
592 compact_task: Some(compact_task.clone().into()),
593 context_id: META_NODE_ID, },
595 );
596
597 pick_tasks.push(compact_task);
598 break;
599 }
600
601 stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
602 stats = LocalSelectorStatistic::default();
603 }
604 if pick_tasks
605 .last()
606 .map(|task| task.compaction_group_id != compaction_group_id)
607 .unwrap_or(true)
608 {
609 unschedule_groups.push(compaction_group_id);
610 }
611 stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
612 }
613
614 if !trivial_tasks.is_empty() {
615 commit_multi_var!(
616 self.meta_store_ref(),
617 compaction_statuses,
618 compact_task_assignment,
619 version,
620 version_stats
621 )?;
622 self.metrics
623 .compact_task_batch_count
624 .with_label_values(&["batch_trivial_move"])
625 .observe(trivial_tasks.len() as f64);
626
627 for trivial_task in &trivial_tasks {
628 self.metrics
629 .compact_task_trivial_move_sst_count
630 .with_label_values(&[&trivial_task.compaction_group_id.to_string()])
631 .observe(trivial_task.input_ssts[0].table_infos.len() as _);
632 }
633
634 drop(versioning_guard);
635 } else {
636 drop(versioning_guard);
639 commit_multi_var!(
640 self.meta_store_ref(),
641 compaction_statuses,
642 compact_task_assignment
643 )?;
644 }
645 drop(compaction_guard);
646 if !pick_tasks.is_empty() {
647 self.metrics
648 .compact_task_batch_count
649 .with_label_values(&["batch_get_compact_task"])
650 .observe(pick_tasks.len() as f64);
651 }
652
653 for compact_task in &mut pick_tasks {
654 let compaction_group_id = compact_task.compaction_group_id;
655
656 self.compactor_manager
658 .initiate_task_heartbeat(compact_task.clone());
659
660 compact_task.task_status = TaskStatus::Pending;
662 let compact_task_statistics = statistics_compact_task(compact_task);
663
664 let level_type_label = build_compact_task_level_type_metrics_label(
665 compact_task.input_ssts[0].level_idx as usize,
666 compact_task.input_ssts.last().unwrap().level_idx as usize,
667 );
668
669 let level_count = compact_task.input_ssts.len();
670 if compact_task.input_ssts[0].level_idx == 0 {
671 self.metrics
672 .l0_compact_level_count
673 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
674 .observe(level_count as _);
675 }
676
677 self.metrics
678 .compact_task_size
679 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
680 .observe(compact_task_statistics.total_file_size as _);
681
682 self.metrics
683 .compact_task_size
684 .with_label_values(&[
685 &compaction_group_id.to_string(),
686 &format!("{} uncompressed", level_type_label),
687 ])
688 .observe(compact_task_statistics.total_uncompressed_file_size as _);
689
690 self.metrics
691 .compact_task_file_count
692 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
693 .observe(compact_task_statistics.total_file_count as _);
694
695 tracing::trace!(
696 "For compaction group {}: pick up {} {} sub_level in level {} to compact to target {}. cost time: {:?} compact_task_statistics {:?}",
697 compaction_group_id,
698 level_count,
699 compact_task.input_ssts[0].level_type.as_str_name(),
700 compact_task.input_ssts[0].level_idx,
701 compact_task.target_level,
702 start_time.elapsed(),
703 compact_task_statistics
704 );
705 }
706
707 #[cfg(test)]
708 {
709 self.check_state_consistency().await;
710 }
711 pick_tasks.extend(trivial_tasks);
712 Ok((pick_tasks, unschedule_groups))
713 }
714
715 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
717 fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore(
718 anyhow::anyhow!("failpoint metastore err")
719 )));
720 let ret = self
721 .cancel_compact_task_impl(vec![task_id], task_status)
722 .await?;
723 Ok(ret[0])
724 }
725
726 pub async fn cancel_compact_tasks(
727 &self,
728 tasks: Vec<u64>,
729 task_status: TaskStatus,
730 ) -> Result<Vec<bool>> {
731 self.cancel_compact_task_impl(tasks, task_status).await
732 }
733
734 async fn cancel_compact_task_impl(
735 &self,
736 task_ids: Vec<u64>,
737 task_status: TaskStatus,
738 ) -> Result<Vec<bool>> {
739 assert!(CANCEL_STATUS_SET.contains(&task_status));
740 let tasks = task_ids
741 .into_iter()
742 .map(|task_id| ReportTask {
743 task_id,
744 task_status,
745 sorted_output_ssts: vec![],
746 table_stats_change: HashMap::default(),
747 object_timestamps: HashMap::default(),
748 })
749 .collect_vec();
750 let rets = self.report_compact_tasks(tasks).await?;
751 #[cfg(test)]
752 {
753 self.check_state_consistency().await;
754 }
755 Ok(rets)
756 }
757
758 async fn get_compact_tasks(
759 &self,
760 mut compaction_groups: Vec<CompactionGroupId>,
761 max_select_count: usize,
762 selector: &mut dyn CompactionSelector,
763 ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
764 fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
765 anyhow::anyhow!("failpoint metastore error")
766 )));
767 compaction_groups.shuffle(&mut thread_rng());
768 let (mut tasks, groups) = self
769 .get_compact_tasks_impl(compaction_groups, max_select_count, selector)
770 .await?;
771 tasks.retain(|task| {
772 if task.task_status == TaskStatus::Success {
773 debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
774 false
775 } else {
776 true
777 }
778 });
779 Ok((tasks, groups))
780 }
781
782 pub async fn get_compact_task(
783 &self,
784 compaction_group_id: CompactionGroupId,
785 selector: &mut dyn CompactionSelector,
786 ) -> Result<Option<CompactTask>> {
787 fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
788 anyhow::anyhow!("failpoint metastore error")
789 )));
790
791 let (normal_tasks, _) = self
792 .get_compact_tasks_impl(vec![compaction_group_id], 1, selector)
793 .await?;
794 for task in normal_tasks {
795 if task.task_status != TaskStatus::Success {
796 return Ok(Some(task));
797 }
798 debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
799 }
800 Ok(None)
801 }
802
803 pub async fn manual_get_compact_task(
804 &self,
805 compaction_group_id: CompactionGroupId,
806 manual_compaction_option: ManualCompactionOption,
807 ) -> Result<Option<CompactTask>> {
808 let (task, _) = self
809 .manual_get_compact_task_with_info(compaction_group_id, manual_compaction_option)
810 .await?;
811 Ok(task)
812 }
813
814 pub async fn manual_get_compact_task_with_info(
815 &self,
816 compaction_group_id: CompactionGroupId,
817 manual_compaction_option: ManualCompactionOption,
818 ) -> Result<(Option<CompactTask>, bool)> {
819 let mut selector = ManualCompactionSelector::new(manual_compaction_option);
820 let task = self
821 .get_compact_task(compaction_group_id, &mut selector)
822 .await?;
823 Ok((task, selector.blocked_by_pending()))
824 }
825
826 pub async fn report_compact_task(
827 &self,
828 task_id: u64,
829 task_status: TaskStatus,
830 sorted_output_ssts: Vec<SstableInfo>,
831 table_stats_change: Option<PbTableStatsMap>,
832 object_timestamps: HashMap<HummockSstableObjectId, u64>,
833 ) -> Result<bool> {
834 let rets = self
835 .report_compact_tasks(vec![ReportTask {
836 task_id,
837 task_status,
838 sorted_output_ssts,
839 table_stats_change: table_stats_change.unwrap_or_default(),
840 object_timestamps,
841 }])
842 .await?;
843 Ok(rets[0])
844 }
845
846 pub async fn report_compact_tasks(&self, report_tasks: Vec<ReportTask>) -> Result<Vec<bool>> {
847 let compaction_guard = self.compaction.write().await;
848 let versioning_guard = self.versioning.write().await;
849
850 self.report_compact_tasks_impl(report_tasks, compaction_guard, versioning_guard)
851 .await
852 }
853
854 pub async fn report_compact_tasks_impl(
862 &self,
863 report_tasks: Vec<ReportTask>,
864 mut compaction_guard: RwLockWriteGuard<'_, Compaction>,
865 mut versioning_guard: RwLockWriteGuard<'_, Versioning>,
866 ) -> Result<Vec<bool>> {
867 let deterministic_mode = self.env.opts.compaction_deterministic_test;
868 let compaction: &mut Compaction = &mut compaction_guard;
869 let start_time = Instant::now();
870 let original_keys = compaction.compaction_statuses.keys().cloned().collect_vec();
871 let mut compact_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
872 let mut rets = vec![false; report_tasks.len()];
873 let mut compact_task_assignment =
874 BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
875 let versioning: &mut Versioning = &mut versioning_guard;
877 let _timer = start_measure_real_process_timer!(self, "report_compact_tasks");
878
879 for group_id in original_keys {
881 if !versioning.current_version.levels.contains_key(&group_id) {
882 compact_statuses.remove(group_id);
883 }
884 }
885 let mut tasks = vec![];
886
887 let mut version = HummockVersionTransaction::new(
888 &mut versioning.current_version,
889 &mut versioning.hummock_version_deltas,
890 &mut versioning.table_change_log,
891 self.env.notification_manager(),
892 None,
893 &self.metrics,
894 &self.env.opts,
895 );
896
897 if deterministic_mode {
898 version.disable_apply_to_txn();
899 }
900
901 let mut version_stats = HummockVersionStatsTransaction::new(
902 &mut versioning.version_stats,
903 self.env.notification_manager(),
904 );
905 let mut success_count = 0;
906 let mut report_results = Vec::with_capacity(rets.len());
907 for (idx, task) in report_tasks.into_iter().enumerate() {
908 rets[idx] = true;
909 let task_id = task.task_id;
910 let mut task_status = task.task_status;
911 let mut compact_task = match compact_task_assignment.remove(task.task_id) {
912 Some(compact_task) => CompactTask::from(compact_task.compact_task.unwrap()),
913 None => {
914 tracing::warn!("{}", format!("compact task {} not found", task.task_id));
915 rets[idx] = false;
916 report_results.push(CompactionTaskReportResult {
917 task_id,
918 task_status,
919 reported: false,
920 });
921 continue;
922 }
923 };
924
925 {
926 compact_task.task_status = task.task_status;
928 compact_task.sorted_output_ssts = task.sorted_output_ssts;
929 }
930
931 match compact_statuses.get_mut(compact_task.compaction_group_id) {
932 Some(mut compact_status) => {
933 compact_status.report_compact_task(&compact_task);
934 }
935 None => {
936 compact_task.task_status = TaskStatus::InvalidGroupCanceled;
942 }
943 }
944
945 let is_success = if let TaskStatus::Success = compact_task.task_status {
946 match self
947 .report_compaction_sanity_check(&task.object_timestamps)
948 .await
949 {
950 Err(e) => {
951 warn!(
952 "failed to commit compaction task {} {}",
953 compact_task.task_id,
954 e.as_report()
955 );
956 compact_task.task_status = TaskStatus::RetentionTimeRejected;
957 false
958 }
959 _ => {
960 let group = version
961 .latest_version()
962 .levels
963 .get(&compact_task.compaction_group_id)
964 .unwrap();
965 let is_expired = compact_task.is_expired(group.compaction_group_version_id);
966 if is_expired {
967 compact_task.task_status = TaskStatus::InputOutdatedCanceled;
968 warn!(
969 "The task may be expired because of group split, task:\n {:?}",
970 compact_task_to_string(&compact_task)
971 );
972 }
973 !is_expired
974 }
975 }
976 } else {
977 false
978 };
979 if is_success {
980 success_count += 1;
981 version.apply_compact_task(&compact_task);
982 if purge_prost_table_stats(
983 &mut version_stats.table_stats,
984 version.latest_version(),
985 &HashSet::default(),
986 ) {
987 self.metrics.version_stats.reset();
988 versioning.local_metrics.clear();
989 }
990 add_prost_table_stats_map(&mut version_stats.table_stats, &task.table_stats_change);
991 trigger_local_table_stat(
992 &self.metrics,
993 &mut versioning.local_metrics,
994 &version_stats,
995 &task.table_stats_change,
996 );
997 }
998 task_status = compact_task.task_status;
999 report_results.push(CompactionTaskReportResult {
1000 task_id,
1001 task_status,
1002 reported: rets[idx],
1003 });
1004 tasks.push(compact_task);
1005 }
1006 if success_count > 0 {
1007 commit_multi_var!(
1008 self.meta_store_ref(),
1009 compact_statuses,
1010 compact_task_assignment,
1011 version,
1012 version_stats
1013 )?;
1014
1015 self.metrics
1016 .compact_task_batch_count
1017 .with_label_values(&["batch_report_task"])
1018 .observe(success_count as f64);
1019 } else {
1020 commit_multi_var!(
1022 self.meta_store_ref(),
1023 compact_statuses,
1024 compact_task_assignment
1025 )?;
1026 }
1027
1028 self.notify_compaction_task_report_waiters(report_results);
1029
1030 let mut success_groups = vec![];
1031 for compact_task in &tasks {
1032 self.compactor_manager
1033 .remove_task_heartbeat(compact_task.task_id);
1034 tracing::trace!(
1035 "Reported compaction task. {}. cost time: {:?}",
1036 compact_task_to_string(compact_task),
1037 start_time.elapsed(),
1038 );
1039
1040 if !deterministic_mode
1041 && (matches!(compact_task.task_type, compact_task::TaskType::Dynamic)
1042 || matches!(compact_task.task_type, compact_task::TaskType::Emergency))
1043 {
1044 self.try_send_compaction_request(
1046 compact_task.compaction_group_id,
1047 compact_task::TaskType::Dynamic,
1048 );
1049 }
1050
1051 if compact_task.task_status == TaskStatus::Success {
1052 success_groups.push(compact_task.compaction_group_id);
1053 }
1054 }
1055
1056 trigger_compact_tasks_stat(
1057 &self.metrics,
1058 &tasks,
1059 &compaction.compaction_statuses,
1060 &versioning_guard.current_version,
1061 );
1062 drop(versioning_guard);
1063 if !success_groups.is_empty() {
1064 self.try_update_write_limits(&success_groups).await;
1065 }
1066 Ok(rets)
1067 }
1068
1069 pub async fn trigger_compaction_deterministic(
1072 &self,
1073 _base_version_id: HummockVersionId,
1074 compaction_groups: Vec<CompactionGroupId>,
1075 ) -> Result<()> {
1076 self.on_current_version(|old_version| {
1077 tracing::info!(
1078 "Trigger compaction for version {}, groups {:?}",
1079 old_version.id,
1080 compaction_groups
1081 );
1082 })
1083 .await;
1084
1085 if compaction_groups.is_empty() {
1086 return Ok(());
1087 }
1088 for compaction_group in compaction_groups {
1089 self.try_send_compaction_request(compaction_group, compact_task::TaskType::Dynamic);
1090 }
1091 Ok(())
1092 }
1093
1094 pub async fn trigger_manual_compaction(
1095 &self,
1096 compaction_group: CompactionGroupId,
1097 manual_compaction_option: ManualCompactionOption,
1098 ) -> Result<ManualCompactionTriggerResult> {
1099 let start_time = Instant::now();
1100 let exclusive = manual_compaction_option.exclusive;
1101
1102 let compactor = match self.compactor_manager.next_compactor() {
1104 Some(compactor) => compactor,
1105 None => {
1106 tracing::warn!("trigger_manual_compaction No compactor is available.");
1107 return Err(anyhow::anyhow!(
1108 "trigger_manual_compaction No compactor is available. compaction_group {}",
1109 compaction_group
1110 )
1111 .into());
1112 }
1113 };
1114
1115 let compact_task = self
1117 .manual_get_compact_task_with_info(compaction_group, manual_compaction_option)
1118 .await;
1119 let (compact_task, blocked_by_pending) = match compact_task {
1120 Ok((compact_task, blocked_by_pending)) => (compact_task, blocked_by_pending),
1121 Err(err) => {
1122 tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
1123
1124 return Err(anyhow::anyhow!(err)
1125 .context(format!(
1126 "Failed to get compaction task for compaction_group {}",
1127 compaction_group,
1128 ))
1129 .into());
1130 }
1131 };
1132 let compact_task = match compact_task {
1133 Some(compact_task) => compact_task,
1134 None => {
1135 if exclusive && blocked_by_pending {
1136 return Ok(ManualCompactionTriggerResult::Retry);
1137 }
1138 return Err(anyhow::anyhow!(
1140 "trigger_manual_compaction No compaction_task is available. compaction_group {}",
1141 compaction_group
1142 )
1143 .into());
1144 }
1145 };
1146
1147 let task_id = compact_task.task_id;
1149 let compact_task_string = compact_task_to_string(&compact_task);
1150 tracing::info!(
1151 compact_task_string,
1152 duration = ?start_time.elapsed(),
1153 "Triggered manual compaction task."
1154 );
1155
1156 let report_rx = self.register_compaction_task_report_waiter(task_id);
1157 if let Err(err) = compactor
1158 .send_event(ResponseEvent::CompactTask(compact_task.into()))
1159 .with_context(|| {
1160 format!(
1161 "Failed to trigger compaction task for compaction_group {}",
1162 compaction_group,
1163 )
1164 })
1165 {
1166 self.remove_compaction_task_report_waiter(task_id);
1167 return Err(err.into());
1168 }
1169
1170 let report_result = match report_rx.await {
1171 Ok(result) => result,
1172 Err(_) => {
1173 self.remove_compaction_task_report_waiter(task_id);
1174 return Err(anyhow::anyhow!(
1175 "trigger_manual_compaction wait report failed. compaction_group {}",
1176 compaction_group
1177 )
1178 .into());
1179 }
1180 };
1181 if !report_result.reported {
1182 return Err(anyhow::anyhow!(
1183 "trigger_manual_compaction report not accepted. task_id {}",
1184 report_result.task_id
1185 )
1186 .into());
1187 }
1188
1189 if report_result.task_status == TaskStatus::NoAvailCpuResourceCanceled
1190 || report_result.task_status == TaskStatus::NoAvailMemoryResourceCanceled
1191 {
1192 return Ok(ManualCompactionTriggerResult::Retry);
1193 }
1194
1195 tracing::info!(
1196 ?report_result,
1197 duration = ?start_time.elapsed(),
1198 "Completed manual compaction task."
1199 );
1200
1201 Ok(ManualCompactionTriggerResult::Submitted)
1202 }
1203
1204 pub fn try_send_compaction_request(
1206 &self,
1207 compaction_group: CompactionGroupId,
1208 task_type: compact_task::TaskType,
1209 ) -> bool {
1210 self.compaction_state.try_sched_compaction(
1211 compaction_group,
1212 task_type,
1213 ScheduleTrigger::NewData,
1214 )
1215 }
1216
1217 fn apply_split_weight_by_vnode_partition(
1220 &self,
1221 compact_task: &mut CompactTask,
1222 compaction_config: &CompactionConfig,
1223 ) {
1224 if compaction_config.split_weight_by_vnode > 0 {
1225 for table_id in &compact_task.existing_table_ids {
1226 compact_task
1227 .table_vnode_partition
1228 .insert(*table_id, compact_task.split_weight_by_vnode);
1229 }
1230
1231 return;
1232 }
1233
1234 let mut table_size_info: HashMap<TableId, u64> = HashMap::default();
1236 let mut existing_table_ids: HashSet<TableId> = HashSet::default();
1237 for input_ssts in &compact_task.input_ssts {
1238 for sst in &input_ssts.table_infos {
1239 existing_table_ids.extend(sst.table_ids.iter());
1240 for table_id in &sst.table_ids {
1241 *table_size_info.entry(*table_id).or_default() +=
1242 sst.sst_size / (sst.table_ids.len() as u64);
1243 }
1244 }
1245 }
1246 compact_task
1247 .existing_table_ids
1248 .retain(|table_id| existing_table_ids.contains(table_id));
1249
1250 let hybrid_vnode_count = self.env.opts.hybrid_partition_node_count;
1251 let default_partition_count = self.env.opts.partition_vnode_count;
1252 let compact_task_table_size_partition_threshold_low = self
1253 .env
1254 .opts
1255 .compact_task_table_size_partition_threshold_low;
1256 let compact_task_table_size_partition_threshold_high = self
1257 .env
1258 .opts
1259 .compact_task_table_size_partition_threshold_high;
1260
1261 let table_write_throughput_statistic_manager =
1263 self.table_write_throughput_statistic_manager.read();
1264 let timestamp = chrono::Utc::now().timestamp();
1265
1266 for (table_id, compact_table_size) in table_size_info {
1267 let write_throughput = table_write_throughput_statistic_manager
1268 .get_table_throughput_descending(table_id, timestamp)
1269 .peekable()
1270 .peek()
1271 .map(|item| item.throughput)
1272 .unwrap_or(0);
1273
1274 if compact_table_size > compact_task_table_size_partition_threshold_high
1275 && default_partition_count > 0
1276 {
1277 compact_task
1278 .table_vnode_partition
1279 .insert(table_id, default_partition_count);
1280 } else if (compact_table_size > compact_task_table_size_partition_threshold_low
1281 || (write_throughput > self.env.opts.table_high_write_throughput_threshold
1282 && compact_table_size > compaction_config.target_file_size_base))
1283 && hybrid_vnode_count > 0
1284 {
1285 compact_task
1286 .table_vnode_partition
1287 .insert(table_id, hybrid_vnode_count);
1288 } else if compact_table_size > compaction_config.target_file_size_base {
1289 compact_task.table_vnode_partition.insert(table_id, 1);
1290 }
1291 }
1292
1293 compact_task
1294 .table_vnode_partition
1295 .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id));
1296 }
1297
1298 pub(crate) fn calculate_vnode_partition(
1299 &self,
1300 compact_task: &mut CompactTask,
1301 compaction_config: &CompactionConfig,
1302 ) {
1303 if compact_task.target_level > compact_task.base_level {
1308 return;
1309 }
1310
1311 self.apply_split_weight_by_vnode_partition(compact_task, compaction_config);
1313 }
1314
1315 pub fn compactor_manager_ref(&self) -> crate::hummock::CompactorManagerRef {
1316 self.compactor_manager.clone()
1317 }
1318
1319 fn register_compaction_task_report_waiter(
1320 &self,
1321 task_id: HummockCompactionTaskId,
1322 ) -> Receiver<CompactionTaskReportResult> {
1323 let (tx, rx) = tokio::sync::oneshot::channel();
1324 self.compaction_task_report_notifiers
1325 .lock()
1326 .register(task_id, tx);
1327 rx
1328 }
1329
1330 fn remove_compaction_task_report_waiter(&self, task_id: HummockCompactionTaskId) {
1331 self.compaction_task_report_notifiers.lock().remove(task_id);
1332 }
1333
1334 fn notify_compaction_task_report_waiters(&self, results: Vec<CompactionTaskReportResult>) {
1335 let mut guard = self.compaction_task_report_notifiers.lock();
1336 for result in results {
1337 guard.notify(result);
1338 }
1339 }
1340}
1341
1342#[cfg(any(test, feature = "test"))]
1343impl HummockManager {
1344 pub async fn compaction_task_from_assignment_for_test(
1345 &self,
1346 task_id: u64,
1347 ) -> Option<CompactTaskAssignment> {
1348 let compaction_guard = self.compaction.read().await;
1349 let assignment_ref = &compaction_guard.compact_task_assignment;
1350 assignment_ref.get(&task_id).cloned()
1351 }
1352
1353 pub async fn report_compact_task_for_test(
1354 &self,
1355 task_id: u64,
1356 compact_task: Option<CompactTask>,
1357 task_status: TaskStatus,
1358 sorted_output_ssts: Vec<SstableInfo>,
1359 table_stats_change: Option<PbTableStatsMap>,
1360 ) -> Result<()> {
1361 if let Some(task) = compact_task {
1362 let mut guard = self.compaction.write().await;
1363 guard.compact_task_assignment.insert(
1364 task_id,
1365 CompactTaskAssignment {
1366 compact_task: Some(task.into()),
1367 context_id: 0.into(),
1368 },
1369 );
1370 }
1371
1372 self.report_compact_tasks(vec![ReportTask {
1375 task_id,
1376 task_status,
1377 sorted_output_ssts,
1378 table_stats_change: table_stats_change.unwrap_or_default(),
1379 object_timestamps: HashMap::default(),
1380 }])
1381 .await?;
1382 Ok(())
1383 }
1384}
1385
1386#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1388pub enum ScheduleTrigger {
1389 NewData,
1391 Periodic,
1393}
1394
1395pub struct CompactionScheduleSnapshot {
1400 scheduled: HashSet<(CompactionGroupId, compact_task::TaskType)>,
1401 snapshot_time: Instant,
1402}
1403
1404impl CompactionScheduleSnapshot {
1405 const TASK_TYPE_PRIORITY: &[TaskType] = &[
1407 TaskType::Dynamic,
1408 TaskType::SpaceReclaim,
1409 TaskType::Ttl,
1410 TaskType::Tombstone,
1411 TaskType::VnodeWatermark,
1412 ];
1413
1414 pub fn snapshot_time(&self) -> Instant {
1415 self.snapshot_time
1416 }
1417
1418 pub fn pick_compaction_groups_and_type(&self) -> Option<(Vec<CompactionGroupId>, TaskType)> {
1423 let group_ids = self.group_ids_shuffled();
1424 let mut normal_groups = vec![];
1425 for cg_id in group_ids {
1426 if let Some(pick_type) = self.pick_type(cg_id) {
1427 if pick_type == TaskType::Dynamic {
1428 normal_groups.push(cg_id);
1429 } else if normal_groups.is_empty() {
1430 return Some((vec![cg_id], pick_type));
1431 }
1432 }
1433 }
1434 if normal_groups.is_empty() {
1435 None
1436 } else {
1437 Some((normal_groups, TaskType::Dynamic))
1438 }
1439 }
1440
1441 fn group_ids_shuffled(&self) -> Vec<CompactionGroupId> {
1442 let mut group_ids: Vec<_> = self.scheduled.iter().map(|(g, _)| *g).unique().collect();
1443 group_ids.shuffle(&mut thread_rng());
1444 group_ids
1445 }
1446
1447 fn pick_type(&self, group: CompactionGroupId) -> Option<TaskType> {
1448 Self::TASK_TYPE_PRIORITY
1449 .iter()
1450 .find(|t| self.scheduled.contains(&(group, **t)))
1451 .copied()
1452 }
1453}
1454
1455#[derive(Debug, Default)]
1460pub struct CompactionState {
1461 inner: Mutex<CompactionStateInner>,
1462}
1463
1464#[derive(Debug, Default)]
1465struct CompactionStateInner {
1466 scheduled: HashSet<(CompactionGroupId, compact_task::TaskType)>,
1467 dynamic_cooldown: HashSet<CompactionGroupId>,
1469 last_new_data_time: HashMap<CompactionGroupId, Instant>,
1471}
1472
1473impl CompactionState {
1474 pub fn new() -> Self {
1475 Self {
1476 inner: Default::default(),
1477 }
1478 }
1479
1480 pub fn try_sched_compaction(
1484 &self,
1485 compaction_group: CompactionGroupId,
1486 task_type: TaskType,
1487 trigger: ScheduleTrigger,
1488 ) -> bool {
1489 let mut guard = self.inner.lock();
1490 if task_type == TaskType::Dynamic {
1491 match trigger {
1492 ScheduleTrigger::NewData => {
1493 guard.dynamic_cooldown.remove(&compaction_group);
1494 guard
1495 .last_new_data_time
1496 .insert(compaction_group, Instant::now());
1497 }
1498 ScheduleTrigger::Periodic => {
1499 if guard.dynamic_cooldown.contains(&compaction_group) {
1500 return false;
1501 }
1502 }
1503 }
1504 }
1505 guard.scheduled.insert((compaction_group, task_type))
1506 }
1507
1508 pub fn unschedule(
1511 &self,
1512 compaction_group: CompactionGroupId,
1513 task_type: compact_task::TaskType,
1514 snapshot_time: Instant,
1515 ) {
1516 let mut guard = self.inner.lock();
1517 guard.scheduled.remove(&(compaction_group, task_type));
1518 if task_type == TaskType::Dynamic {
1519 let has_new_data = guard
1520 .last_new_data_time
1521 .get(&compaction_group)
1522 .is_some_and(|t| *t > snapshot_time);
1523 if !has_new_data {
1524 guard.dynamic_cooldown.insert(compaction_group);
1525 }
1526 }
1527 }
1528
1529 pub fn snapshot(&self) -> CompactionScheduleSnapshot {
1531 let guard = self.inner.lock();
1532 let snapshot_time = Instant::now();
1534 CompactionScheduleSnapshot {
1535 scheduled: guard.scheduled.clone(),
1536 snapshot_time,
1537 }
1538 }
1539
1540 pub fn remove_compaction_group(&self, compaction_group: CompactionGroupId) {
1542 let mut guard = self.inner.lock();
1543 guard
1544 .scheduled
1545 .retain(|(group, _)| *group != compaction_group);
1546 guard.dynamic_cooldown.remove(&compaction_group);
1547 guard.last_new_data_time.remove(&compaction_group);
1548 }
1549}
1550
1551impl Compaction {
1552 pub fn get_compact_task_assignments_by_group_id(
1553 &self,
1554 compaction_group_id: CompactionGroupId,
1555 ) -> Vec<CompactTaskAssignment> {
1556 self.compact_task_assignment
1557 .iter()
1558 .filter_map(|(_, assignment)| {
1559 if assignment
1560 .compact_task
1561 .as_ref()
1562 .is_some_and(|task| task.compaction_group_id == compaction_group_id)
1563 {
1564 Some(CompactTaskAssignment {
1565 compact_task: assignment.compact_task.clone(),
1566 context_id: assignment.context_id,
1567 })
1568 } else {
1569 None
1570 }
1571 })
1572 .collect()
1573 }
1574}
1575
1576#[derive(Clone, Default)]
1577pub struct CompactionGroupStatistic {
1578 pub group_id: CompactionGroupId,
1579 pub group_size: u64,
1580 pub table_statistic: BTreeMap<StateTableId, u64>,
1581 pub compaction_group_config: CompactionGroup,
1582}
1583
1584fn update_table_stats_for_vnode_watermark_trivial_reclaim(
1586 table_stats: &mut PbTableStatsMap,
1587 task: &CompactTask,
1588) {
1589 if task.task_type != TaskType::VnodeWatermark {
1590 return;
1591 }
1592 let mut deleted_table_keys: HashMap<TableId, u64> = HashMap::default();
1593 for s in task.input_ssts.iter().flat_map(|l| l.table_infos.iter()) {
1594 assert_eq!(s.table_ids.len(), 1);
1595 let e = deleted_table_keys.entry(s.table_ids[0]).or_insert(0);
1596 *e += s.total_key_count;
1597 }
1598 for (table_id, delete_count) in deleted_table_keys {
1599 let Some(stats) = table_stats.get_mut(&table_id) else {
1600 continue;
1601 };
1602 if stats.total_key_count == 0 {
1603 continue;
1604 }
1605 let new_total_key_count = stats.total_key_count.saturating_sub(delete_count as i64);
1606 let ratio = new_total_key_count as f64 / stats.total_key_count as f64;
1607 stats.total_key_count = new_total_key_count;
1609 stats.total_key_size = (stats.total_key_size as f64 * ratio).ceil() as i64;
1611 stats.total_value_size = (stats.total_value_size as f64 * ratio).ceil() as i64;
1612 }
1613}
1614
1615#[derive(Debug, Clone)]
1616pub enum GroupState {
1617 Normal,
1619
1620 Emergency(String), WriteStop(String), }
1626
1627impl GroupState {
1628 pub fn is_write_stop(&self) -> bool {
1629 matches!(self, Self::WriteStop(_))
1630 }
1631
1632 pub fn is_emergency(&self) -> bool {
1633 matches!(self, Self::Emergency(_))
1634 }
1635
1636 pub fn reason(&self) -> Option<&str> {
1637 match self {
1638 Self::Emergency(reason) | Self::WriteStop(reason) => Some(reason),
1639 _ => None,
1640 }
1641 }
1642}
1643
1644#[derive(Clone, Default)]
1645pub struct GroupStateValidator;
1646
1647impl GroupStateValidator {
1648 pub fn write_stop_sub_level_count(
1649 level_count: usize,
1650 compaction_config: &CompactionConfig,
1651 ) -> bool {
1652 let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize;
1653 level_count > threshold
1654 }
1655
1656 pub fn write_stop_l0_size(l0_size: u64, compaction_config: &CompactionConfig) -> bool {
1657 l0_size
1658 > compaction_config
1659 .level0_stop_write_threshold_max_size
1660 .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1661 }
1662
1663 pub fn write_stop_l0_file_count(
1664 l0_file_count: usize,
1665 compaction_config: &CompactionConfig,
1666 ) -> bool {
1667 l0_file_count
1668 > compaction_config
1669 .level0_stop_write_threshold_max_sst_count
1670 .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1671 as usize
1672 }
1673
1674 pub fn emergency_l0_file_count(
1675 l0_file_count: usize,
1676 compaction_config: &CompactionConfig,
1677 ) -> bool {
1678 l0_file_count
1679 > compaction_config
1680 .emergency_level0_sst_file_count
1681 .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1682 as usize
1683 }
1684
1685 pub fn emergency_l0_partition_count(
1686 last_l0_sub_level_partition_count: usize,
1687 compaction_config: &CompactionConfig,
1688 ) -> bool {
1689 last_l0_sub_level_partition_count
1690 > compaction_config
1691 .emergency_level0_sub_level_partition
1692 .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1693 as usize
1694 }
1695
1696 pub fn check_single_group_write_stop(
1697 levels: &Levels,
1698 compaction_config: &CompactionConfig,
1699 ) -> GroupState {
1700 if Self::write_stop_sub_level_count(levels.l0.sub_levels.len(), compaction_config) {
1701 return GroupState::WriteStop(format!(
1702 "WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels",
1703 levels.l0.sub_levels.len(),
1704 compaction_config.level0_stop_write_threshold_sub_level_number
1705 ));
1706 }
1707
1708 if Self::write_stop_l0_file_count(
1709 levels
1710 .l0
1711 .sub_levels
1712 .iter()
1713 .map(|l| l.table_infos.len())
1714 .sum(),
1715 compaction_config,
1716 ) {
1717 return GroupState::WriteStop(format!(
1718 "WriteStop(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1719 levels
1720 .l0
1721 .sub_levels
1722 .iter()
1723 .map(|l| l.table_infos.len())
1724 .sum::<usize>(),
1725 compaction_config
1726 .level0_stop_write_threshold_max_sst_count
1727 .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1728 ));
1729 }
1730
1731 if Self::write_stop_l0_size(levels.l0.total_file_size, compaction_config) {
1732 return GroupState::WriteStop(format!(
1733 "WriteStop(l0_size: {}, threshold: {}) too large L0 size",
1734 levels.l0.total_file_size,
1735 compaction_config
1736 .level0_stop_write_threshold_max_size
1737 .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1738 ));
1739 }
1740
1741 GroupState::Normal
1742 }
1743
1744 pub fn check_single_group_emergency(
1745 levels: &Levels,
1746 compaction_config: &CompactionConfig,
1747 ) -> GroupState {
1748 if Self::emergency_l0_file_count(
1749 levels
1750 .l0
1751 .sub_levels
1752 .iter()
1753 .map(|l| l.table_infos.len())
1754 .sum(),
1755 compaction_config,
1756 ) {
1757 return GroupState::Emergency(format!(
1758 "Emergency(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1759 levels
1760 .l0
1761 .sub_levels
1762 .iter()
1763 .map(|l| l.table_infos.len())
1764 .sum::<usize>(),
1765 compaction_config
1766 .emergency_level0_sst_file_count
1767 .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1768 ));
1769 }
1770
1771 if Self::emergency_l0_partition_count(
1772 levels
1773 .l0
1774 .sub_levels
1775 .first()
1776 .map(|l| l.table_infos.len())
1777 .unwrap_or(0),
1778 compaction_config,
1779 ) {
1780 return GroupState::Emergency(format!(
1781 "Emergency(l0_partition_count: {}, threshold: {}) too many L0 partitions",
1782 levels
1783 .l0
1784 .sub_levels
1785 .first()
1786 .map(|l| l.table_infos.len())
1787 .unwrap_or(0),
1788 compaction_config
1789 .emergency_level0_sub_level_partition
1790 .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1791 ));
1792 }
1793
1794 GroupState::Normal
1795 }
1796
1797 pub fn group_state(levels: &Levels, compaction_config: &CompactionConfig) -> GroupState {
1798 let state = Self::check_single_group_write_stop(levels, compaction_config);
1799 if state.is_write_stop() {
1800 return state;
1801 }
1802
1803 Self::check_single_group_emergency(levels, compaction_config)
1804 }
1805}
1806
1807#[cfg(test)]
1808mod prefetched_task_id_tests {
1809 use crate::manager::MetaOpts;
1810
1811 #[test]
1812 fn test_compaction_task_id_refill_capacity_default() {
1813 assert_eq!(MetaOpts::test(false).compaction_task_id_refill_capacity, 64);
1814 }
1815}
1816
1817#[cfg(test)]
1818mod compaction_state_tests {
1819 use risingwave_pb::hummock::compact_task::TaskType;
1820
1821 use super::*;
1822
1823 #[test]
1824 fn test_basic_schedule_and_unschedule() {
1825 let state = CompactionState::new();
1826 let group_id: CompactionGroupId = 1.into();
1827
1828 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1830 assert!(!state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1832 assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1834
1835 let snapshot = state.snapshot();
1837 assert!(snapshot.scheduled.contains(&(group_id, TaskType::Dynamic)));
1838 assert!(snapshot.scheduled.contains(&(group_id, TaskType::Ttl)));
1839
1840 state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1842 let snapshot2 = state.snapshot();
1843 assert!(!snapshot2.scheduled.contains(&(group_id, TaskType::Dynamic)));
1844 assert!(snapshot2.scheduled.contains(&(group_id, TaskType::Ttl)));
1845 }
1846
1847 #[test]
1848 fn test_cooldown_blocks_periodic_trigger() {
1849 let state = CompactionState::new();
1850 let group_id: CompactionGroupId = 1.into();
1851
1852 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1854 let snapshot = state.snapshot();
1855 state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1856
1857 assert!(state.inner.lock().dynamic_cooldown.contains(&group_id));
1859
1860 assert!(!state.try_sched_compaction(
1862 group_id,
1863 TaskType::Dynamic,
1864 ScheduleTrigger::Periodic
1865 ));
1866 }
1867
1868 #[test]
1869 fn test_new_data_clears_cooldown() {
1870 let state = CompactionState::new();
1871 let group_id: CompactionGroupId = 1.into();
1872
1873 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1875 let snapshot = state.snapshot();
1876 state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1877 assert!(state.inner.lock().dynamic_cooldown.contains(&group_id));
1878
1879 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1881 assert!(!state.inner.lock().dynamic_cooldown.contains(&group_id));
1882 }
1883
1884 #[test]
1885 fn test_cooldown_only_affects_dynamic_type() {
1886 let state = CompactionState::new();
1887 let group_id: CompactionGroupId = 1.into();
1888
1889 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1891 let snapshot = state.snapshot();
1892 state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1893
1894 let group_id_2: CompactionGroupId = 2.into();
1896 assert!(state.try_sched_compaction(group_id_2, TaskType::Ttl, ScheduleTrigger::Periodic));
1897 let snapshot2 = state.snapshot();
1898 state.unschedule(group_id_2, TaskType::Ttl, snapshot2.snapshot_time());
1899 assert!(!state.inner.lock().dynamic_cooldown.contains(&group_id_2));
1900
1901 assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1903 assert!(state.try_sched_compaction(
1904 group_id,
1905 TaskType::SpaceReclaim,
1906 ScheduleTrigger::Periodic
1907 ));
1908 }
1909
1910 #[test]
1911 fn test_race_condition_new_data_after_snapshot() {
1912 let state = CompactionState::new();
1913 let group_id: CompactionGroupId = 1.into();
1914
1915 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1916 let snapshot = state.snapshot();
1917
1918 {
1920 let mut guard = state.inner.lock();
1921 guard.last_new_data_time.insert(group_id, Instant::now());
1922 }
1923
1924 state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1926 assert!(
1927 !state.inner.lock().dynamic_cooldown.contains(&group_id),
1928 "Should skip cooldown when new data arrived after snapshot"
1929 );
1930 }
1931
1932 #[test]
1933 fn test_remove_compaction_group_cleans_all_state() {
1934 let state = CompactionState::new();
1935 let group_id: CompactionGroupId = 1.into();
1936
1937 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1939 assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1940 state.inner.lock().dynamic_cooldown.insert(group_id);
1941
1942 state.remove_compaction_group(group_id);
1944
1945 let guard = state.inner.lock();
1947 assert!(!guard.scheduled.contains(&(group_id, TaskType::Dynamic)));
1948 assert!(!guard.scheduled.contains(&(group_id, TaskType::Ttl)));
1949 assert!(!guard.dynamic_cooldown.contains(&group_id));
1950 assert!(!guard.last_new_data_time.contains_key(&group_id));
1951 }
1952
1953 #[test]
1954 fn test_snapshot_pick_type_priority() {
1955 let state = CompactionState::new();
1956 let group_id: CompactionGroupId = 1.into();
1957
1958 assert_eq!(state.snapshot().pick_type(group_id), None);
1960
1961 state.try_sched_compaction(
1963 group_id,
1964 TaskType::VnodeWatermark,
1965 ScheduleTrigger::Periodic,
1966 );
1967 assert_eq!(
1968 state.snapshot().pick_type(group_id),
1969 Some(TaskType::VnodeWatermark)
1970 );
1971
1972 state.try_sched_compaction(group_id, TaskType::Tombstone, ScheduleTrigger::Periodic);
1973 assert_eq!(
1974 state.snapshot().pick_type(group_id),
1975 Some(TaskType::Tombstone)
1976 );
1977
1978 state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic);
1979 assert_eq!(state.snapshot().pick_type(group_id), Some(TaskType::Ttl));
1980
1981 state.try_sched_compaction(group_id, TaskType::SpaceReclaim, ScheduleTrigger::Periodic);
1982 assert_eq!(
1983 state.snapshot().pick_type(group_id),
1984 Some(TaskType::SpaceReclaim)
1985 );
1986
1987 state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData);
1988 assert_eq!(
1989 state.snapshot().pick_type(group_id),
1990 Some(TaskType::Dynamic)
1991 );
1992 }
1993
1994 #[test]
1995 fn test_multiple_groups_independent_cooldown() {
1996 let state = CompactionState::new();
1997 let g1: CompactionGroupId = 1.into();
1998 let g2: CompactionGroupId = 2.into();
1999
2000 state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
2001 state.try_sched_compaction(g2, TaskType::Dynamic, ScheduleTrigger::NewData);
2002 let snapshot = state.snapshot();
2003
2004 state.unschedule(g1, TaskType::Dynamic, snapshot.snapshot_time());
2006
2007 let guard = state.inner.lock();
2008 assert!(guard.dynamic_cooldown.contains(&g1));
2009 assert!(!guard.dynamic_cooldown.contains(&g2));
2010 }
2011
2012 #[test]
2013 fn test_pick_compaction_groups_empty() {
2014 let state = CompactionState::new();
2015 let snapshot = state.snapshot();
2016 assert!(snapshot.pick_compaction_groups_and_type().is_none());
2018 }
2019
2020 #[test]
2021 fn test_pick_compaction_groups_mixed_types() {
2022 let state = CompactionState::new();
2023 let g1: CompactionGroupId = 1.into();
2024 let g2: CompactionGroupId = 2.into();
2025 let g3: CompactionGroupId = 3.into();
2026
2027 state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
2029 state.try_sched_compaction(g2, TaskType::Ttl, ScheduleTrigger::Periodic);
2030 state.try_sched_compaction(g3, TaskType::Dynamic, ScheduleTrigger::NewData);
2031
2032 let snapshot = state.snapshot();
2033 let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
2034
2035 if task_type == TaskType::Dynamic {
2040 assert!(groups.contains(&g1));
2041 assert!(groups.contains(&g3));
2042 assert!(!groups.contains(&g2)); } else {
2044 assert_eq!(task_type, TaskType::Ttl);
2045 assert_eq!(groups, vec![g2]);
2046 }
2047 }
2048
2049 #[test]
2050 fn test_pick_compaction_groups_all_dynamic() {
2051 let state = CompactionState::new();
2052 let g1: CompactionGroupId = 1.into();
2053 let g2: CompactionGroupId = 2.into();
2054
2055 state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
2056 state.try_sched_compaction(g2, TaskType::Dynamic, ScheduleTrigger::NewData);
2057
2058 let snapshot = state.snapshot();
2059 let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
2060 assert_eq!(task_type, TaskType::Dynamic);
2061 assert!(groups.contains(&g1));
2062 assert!(groups.contains(&g2));
2063 }
2064
2065 #[test]
2066 fn test_pick_compaction_groups_single_non_dynamic() {
2067 let state = CompactionState::new();
2068 let g1: CompactionGroupId = 1.into();
2069
2070 state.try_sched_compaction(g1, TaskType::SpaceReclaim, ScheduleTrigger::Periodic);
2071
2072 let snapshot = state.snapshot();
2073 let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
2074 assert_eq!(task_type, TaskType::SpaceReclaim);
2075 assert_eq!(groups, vec![g1]);
2076 }
2077}