1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17use std::time::Instant;
18
19use anyhow::Context;
20use compaction_event_loop::{
21 HummockCompactionEventDispatcher, HummockCompactionEventHandler, HummockCompactionEventLoop,
22 HummockCompactorDedicatedEventLoop,
23};
24use fail::fail_point;
25use itertools::Itertools;
26use parking_lot::Mutex;
27use rand::rng as thread_rng;
28use rand::seq::SliceRandom;
29use risingwave_common::catalog::TableId;
30use risingwave_common::config::meta::default::compaction_config;
31use risingwave_common::util::epoch::Epoch;
32use risingwave_hummock_sdk::compact_task::{CompactTask, CompactTaskAssignment, ReportTask};
33use risingwave_hummock_sdk::compaction_group::StateTableId;
34use risingwave_hummock_sdk::compaction_group::hummock_version_ext::safe_epoch_table_watermarks_impl;
35use risingwave_hummock_sdk::level::Levels;
36use risingwave_hummock_sdk::sstable_info::SstableInfo;
37use risingwave_hummock_sdk::table_stats::{
38 PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats,
39};
40use risingwave_hummock_sdk::table_watermark::TableWatermarks;
41use risingwave_hummock_sdk::version::{GroupDelta, IntraLevelDelta};
42use risingwave_hummock_sdk::{
43 CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockSstableId,
44 HummockSstableObjectId, HummockVersionId, compact_task_to_string, statistics_compact_task,
45};
46use risingwave_meta_model::hummock_sequence::COMPACTION_TASK_ID;
47use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
48use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
49use risingwave_pb::hummock::{
50 CompactTaskAssignment as PbCompactTaskAssignment, CompactionConfig, PbCompactStatus,
51 SubscribeCompactionEventRequest, TableOption, compact_task,
52};
53use thiserror_ext::AsReport;
54use tokio::sync::RwLockWriteGuard;
55use tokio::sync::mpsc::UnboundedReceiver;
56use tokio::sync::oneshot::{Receiver, Sender};
57use tokio::task::JoinHandle;
58use tonic::Streaming;
59use tracing::warn;
60
61use crate::hummock::compaction::in_progress_compaction::InProgressCompactionView;
62use crate::hummock::compaction::selector::level_selector::PickerInfo;
63use crate::hummock::compaction::selector::{
64 DynamicLevelSelector, DynamicLevelSelectorCore, LocalSelectorStatistic, ManualCompactionOption,
65 ManualCompactionSelector, SpaceReclaimCompactionSelector, TombstoneCompactionSelector,
66 TtlCompactionSelector, VnodeWatermarkCompactionSelector,
67};
68use crate::hummock::compaction::{
69 CompactStatus, CompactionDeveloperConfig, CompactionSelector,
70 CompactionTask as PickedCompactionTask,
71};
72use crate::hummock::error::{Error, Result};
73use crate::hummock::manager::CompactionTaskReportResult;
74use crate::hummock::manager::compaction::compact_task_builder::{
75 CompactTaskBuildContext, attach_compact_task_table_metadata, build_base_compact_task,
76};
77use crate::hummock::manager::transaction::{
78 HummockVersionStatsTransaction, HummockVersionTransaction,
79};
80use crate::hummock::manager::versioning::Versioning;
81use crate::hummock::metrics_utils::{
82 build_compact_task_level_type_metrics_label, trigger_compact_tasks_stat,
83 trigger_local_table_stat,
84};
85use crate::hummock::model::CompactionGroup;
86use crate::hummock::{HummockManager, commit_multi_var, start_measure_real_process_timer};
87use crate::manager::META_NODE_ID;
88use crate::model::BTreeMapTransaction;
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
91pub enum ManualCompactionTriggerResult {
92 Submitted,
93 Retry,
94}
95
96mod compact_task_builder;
97pub mod compaction_event_loop;
98pub mod compaction_group_manager;
99pub mod compaction_group_schedule;
100
101static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
102 [
103 TaskStatus::ManualCanceled,
104 TaskStatus::SendFailCanceled,
105 TaskStatus::AssignFailCanceled,
106 TaskStatus::HeartbeatCanceled,
107 TaskStatus::InvalidGroupCanceled,
108 TaskStatus::NoAvailMemoryResourceCanceled,
109 TaskStatus::NoAvailCpuResourceCanceled,
110 TaskStatus::HeartbeatProgressCanceled,
111 ]
112 .into_iter()
113 .collect()
114});
115
116fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> {
117 let mut compaction_selectors: HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> =
118 HashMap::default();
119 compaction_selectors.insert(
120 compact_task::TaskType::Dynamic,
121 Box::<DynamicLevelSelector>::default(),
122 );
123 compaction_selectors.insert(
124 compact_task::TaskType::SpaceReclaim,
125 Box::<SpaceReclaimCompactionSelector>::default(),
126 );
127 compaction_selectors.insert(
128 compact_task::TaskType::Ttl,
129 Box::<TtlCompactionSelector>::default(),
130 );
131 compaction_selectors.insert(
132 compact_task::TaskType::Tombstone,
133 Box::<TombstoneCompactionSelector>::default(),
134 );
135 compaction_selectors.insert(
136 compact_task::TaskType::VnodeWatermark,
137 Box::<VnodeWatermarkCompactionSelector>::default(),
138 );
139 compaction_selectors
140}
141
142enum BuiltCompactTask {
143 MetaFinished(CompactTask),
144 PendingAssignment(CompactTask),
145}
146
147impl HummockVersionTransaction<'_> {
148 fn apply_compact_task(&mut self, compact_task: &CompactTask) {
149 let mut version_delta = self.new_delta();
150 let trivial_move = compact_task.is_trivial_move_task();
151 version_delta.trivial_move = trivial_move;
152
153 let group_deltas = &mut version_delta
154 .group_deltas
155 .entry(compact_task.compaction_group_id)
156 .or_default()
157 .group_deltas;
158 let mut removed_table_ids_map: BTreeMap<u32, HashSet<HummockSstableId>> =
159 BTreeMap::default();
160
161 for level in &compact_task.input_ssts {
162 let level_idx = level.level_idx;
163
164 removed_table_ids_map
165 .entry(level_idx)
166 .or_default()
167 .extend(level.table_infos.iter().map(|sst| sst.sst_id));
168 }
169
170 for (level_idx, removed_table_ids) in removed_table_ids_map {
171 let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
172 level_idx,
173 0, removed_table_ids,
175 vec![], 0, compact_task.compaction_group_version_id,
178 ));
179
180 group_deltas.push(group_delta);
181 }
182
183 let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
184 compact_task.target_level,
185 compact_task.target_sub_level_id,
186 HashSet::new(), compact_task.sorted_output_ssts.clone(),
188 compact_task.split_weight_by_vnode,
189 compact_task.compaction_group_version_id,
190 ));
191
192 group_deltas.push(group_delta);
193 version_delta.pre_apply();
194 }
195}
196
197#[derive(Default)]
198pub struct Compaction {
199 pub compact_task_assignment: BTreeMap<HummockCompactionTaskId, CompactTaskAssignment>,
201 pub compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus>,
203
204 pub _deterministic_mode: bool,
205}
206
207impl HummockManager {
208 pub async fn get_assigned_compact_task_num(&self) -> u64 {
209 self.compaction.read().await.compact_task_assignment.len() as u64
210 }
211
212 pub async fn list_compaction_status(
213 &self,
214 ) -> (Vec<PbCompactStatus>, Vec<PbCompactTaskAssignment>) {
215 let (compaction_statuses, compact_task_assignments) = {
216 let compaction = self.compaction.read().await;
217 (
218 compaction
219 .compaction_statuses
220 .values()
221 .map_into()
222 .collect_vec(),
223 compaction
224 .compact_task_assignment
225 .values()
226 .cloned()
227 .collect_vec(),
228 )
229 };
230
231 (
232 compaction_statuses,
233 compact_task_assignments
234 .into_iter()
235 .map(PbCompactTaskAssignment::from)
236 .collect(),
237 )
238 }
239
240 pub async fn get_compaction_scores(
241 &self,
242 compaction_group_id: CompactionGroupId,
243 ) -> Vec<PickerInfo> {
244 let (status, levels, group) = {
245 let compaction = self.compaction.read().await;
246 let versioning = self.versioning.read().await;
247 let config_manager = self.compaction_group_manager.read().await;
248 match (
249 compaction.compaction_statuses.get(&compaction_group_id),
250 versioning.current_version.levels.get(&compaction_group_id),
251 config_manager.try_get_compaction_group_config(compaction_group_id),
252 ) {
253 (Some(cs), Some(v), Some(cf)) => (cs.to_owned(), v.to_owned(), cf),
254 _ => {
255 return vec![];
256 }
257 }
258 };
259 let dynamic_level_core = DynamicLevelSelectorCore::new(
260 group.compaction_config,
261 Arc::new(CompactionDeveloperConfig::default()),
262 );
263 let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers);
264 ctx.score_levels
265 }
266}
267
268impl HummockManager {
269 pub fn compaction_event_loop(
270 hummock_manager: Arc<Self>,
271 compactor_streams_change_rx: UnboundedReceiver<(
272 HummockContextId,
273 Streaming<SubscribeCompactionEventRequest>,
274 )>,
275 ) -> Vec<(JoinHandle<()>, Sender<()>)> {
276 let mut join_handle_vec = Vec::default();
277
278 let hummock_compaction_event_handler =
279 HummockCompactionEventHandler::new(hummock_manager.clone());
280
281 let dedicated_event_loop = HummockCompactorDedicatedEventLoop::new(
282 hummock_manager.clone(),
283 hummock_compaction_event_handler.clone(),
284 );
285
286 let (dedicated_event_loop_join_handle, event_tx, shutdown_tx) = dedicated_event_loop.run();
287 join_handle_vec.push((dedicated_event_loop_join_handle, shutdown_tx));
288
289 let hummock_compaction_event_dispatcher = HummockCompactionEventDispatcher::new(
290 hummock_manager.env.opts.clone(),
291 hummock_compaction_event_handler,
292 Some(event_tx),
293 );
294
295 let event_loop = HummockCompactionEventLoop::new(
296 hummock_compaction_event_dispatcher,
297 hummock_manager.metrics.clone(),
298 compactor_streams_change_rx,
299 );
300
301 let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
302 join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
303
304 join_handle_vec
305 }
306
307 pub fn add_compactor_stream(
308 &self,
309 context_id: HummockContextId,
310 req_stream: Streaming<SubscribeCompactionEventRequest>,
311 ) {
312 self.compactor_streams_change_tx
313 .send((context_id, req_stream))
314 .unwrap();
315 }
316}
317
318impl HummockManager {
319 async fn next_compaction_task_id_with_prefetch(&self, refill_capacity: u32) -> Result<u64> {
322 self.prefetched_compaction_task_ids
323 .next(refill_capacity, |count| async move {
324 self.env
325 .hummock_seq
326 .next_interval(COMPACTION_TASK_ID, count)
327 .await
328 })
329 .await
330 }
331
332 pub async fn get_compact_tasks_impl(
333 &self,
334 compaction_groups: Vec<CompactionGroupId>,
335 max_select_count: usize,
336 selector: &mut dyn CompactionSelector,
337 ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
338 let deterministic_mode = self.env.opts.compaction_deterministic_test;
339
340 let mut compaction_guard = self.compaction.write().await;
341 let compaction: &mut Compaction = &mut compaction_guard;
342 let mut versioning_guard = self.versioning.write().await;
343 let versioning: &mut Versioning = &mut versioning_guard;
344
345 let _timer = start_measure_real_process_timer!(self, "get_compact_tasks_impl");
346
347 let start_time = Instant::now();
348 let mut compaction_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
349
350 let mut compact_task_assignment =
351 BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
352
353 let mut version = HummockVersionTransaction::new(
354 &mut versioning.current_version,
355 &mut versioning.hummock_version_deltas,
356 &mut versioning.table_change_log,
357 self.env.notification_manager(),
358 None,
359 &self.metrics,
360 &self.env.opts,
361 );
362 let mut version_stats = HummockVersionStatsTransaction::new(
364 &mut versioning.version_stats,
365 self.env.notification_manager(),
366 );
367
368 if deterministic_mode {
369 version.disable_apply_to_txn();
370 }
371 let all_versioned_table_schemas = if self.env.opts.enable_dropped_column_reclaim {
372 self.metadata_manager
373 .catalog_controller
374 .get_versioned_table_schemas()
375 .await
376 .map_err(|e| Error::Internal(e.into()))?
377 } else {
378 HashMap::default()
379 };
380 let mut unschedule_groups = vec![];
381 let mut trivial_tasks = vec![];
382 let mut pick_tasks = vec![];
383 let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts(
384 &self.env.opts,
385 ));
386 'outside: for compaction_group_id in compaction_groups {
391 if pick_tasks.len() >= max_select_count {
392 break;
393 }
394
395 if !version
396 .latest_version()
397 .levels
398 .contains_key(&compaction_group_id)
399 {
400 continue;
401 }
402
403 let group_config = {
407 let config_manager = self.compaction_group_manager.read().await;
408
409 match config_manager.try_get_compaction_group_config(compaction_group_id) {
410 Some(config) => config,
411 None => continue,
412 }
413 };
414
415 let task_id = self
417 .next_compaction_task_id_with_prefetch(
418 self.env.opts.compaction_task_id_refill_capacity,
419 )
420 .await?;
421
422 if !compaction_statuses.contains_key(&compaction_group_id) {
423 compaction_statuses.insert(
425 compaction_group_id,
426 CompactStatus::new(
427 compaction_group_id,
428 group_config.compaction_config.max_level,
429 ),
430 );
431 }
432 let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap();
433
434 let mut stats = LocalSelectorStatistic::default();
435 let member_table_ids: Vec<_> = version
436 .latest_version()
437 .state_table_info
438 .compaction_group_member_table_ids(compaction_group_id)
439 .iter()
440 .copied()
441 .collect();
442
443 let mut table_id_to_option: HashMap<TableId, _> = HashMap::default();
444
445 {
446 let guard = self.table_id_to_table_option.read();
447 for table_id in &member_table_ids {
448 if let Some(opts) = guard.get(table_id) {
449 table_id_to_option.insert(*table_id, *opts);
450 }
451 }
452 }
453
454 let in_progress_compactions = InProgressCompactionView::for_group(
455 compact_task_assignment.tree_ref().values(),
456 compaction_group_id,
457 );
458
459 while let Some(picked_task) = compact_status.get_compact_task(
460 version
461 .latest_version()
462 .get_compaction_group_levels(compaction_group_id),
463 version
464 .latest_version()
465 .state_table_info
466 .compaction_group_member_table_ids(compaction_group_id),
467 task_id as HummockCompactionTaskId,
468 &group_config,
469 &mut stats,
470 selector,
471 &table_id_to_option,
472 developer_config.clone(),
473 &version.latest_version().table_watermarks,
474 &version.latest_version().state_table_info,
475 &in_progress_compactions,
476 ) {
477 let compaction_group_levels = version
478 .latest_version()
479 .get_compaction_group_levels(compaction_group_id);
480 let target_level_id = picked_task.input.target_level as u32;
481 let is_target_level_last = compaction_group_levels.is_last_level(target_level_id);
482 let table_options = table_id_to_option
483 .iter()
484 .map(|(table_id, table_option)| (*table_id, TableOption::from(table_option)))
485 .collect();
486 let built_compact_task = self.build_ready_compact_task(
487 picked_task,
488 CompactTaskBuildContext {
489 task_id,
490 compaction_group_id: group_config.group_id,
491 compaction_group_version_id: compaction_group_levels
492 .compaction_group_version_id,
493 existing_table_ids: member_table_ids.clone(),
494 table_options,
495 is_target_level_last,
496 compaction_config: group_config.compaction_config.clone(),
497 current_epoch_time: Epoch::now().0,
498 },
499 &version.latest_version().table_watermarks,
500 &all_versioned_table_schemas,
501 );
502
503 match built_compact_task {
504 BuiltCompactTask::MetaFinished(compact_task) => {
505 let label = compact_task.task_label();
506 tracing::debug!(
507 "{} for compaction group {}: input: {:?}, cost time: {:?}",
508 label,
509 compact_task.compaction_group_id,
510 compact_task.input_ssts,
511 start_time.elapsed()
512 );
513 compact_status.report_compact_task(&compact_task);
514 update_table_stats_for_vnode_watermark_trivial_reclaim(
515 &mut version_stats.table_stats,
516 &compact_task,
517 );
518 self.metrics
519 .compact_frequency
520 .with_label_values(&[
521 label,
522 &compact_task.compaction_group_id.to_string(),
523 selector.task_type().as_str_name(),
524 "SUCCESS",
525 ])
526 .inc();
527
528 version.apply_compact_task(&compact_task);
529 trivial_tasks.push(compact_task);
530 if trivial_tasks.len() >= self.env.opts.max_trivial_move_task_count_per_loop
531 {
532 break 'outside;
533 }
534 }
535 BuiltCompactTask::PendingAssignment(compact_task) => {
536 compact_task_assignment.insert(
537 compact_task.task_id,
538 CompactTaskAssignment {
539 compact_task: compact_task.clone(),
540 context_id: META_NODE_ID, },
542 );
543
544 pick_tasks.push(compact_task);
545 break;
546 }
547 }
548
549 stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
550 stats = LocalSelectorStatistic::default();
551 }
552 if pick_tasks
553 .last()
554 .map(|task| task.compaction_group_id != compaction_group_id)
555 .unwrap_or(true)
556 {
557 unschedule_groups.push(compaction_group_id);
558 }
559 stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
560 }
561
562 if !trivial_tasks.is_empty() {
563 commit_multi_var!(
564 self.meta_store_ref(),
565 compaction_statuses,
566 compact_task_assignment,
567 version,
568 version_stats
569 )?;
570 self.metrics
571 .compact_task_batch_count
572 .with_label_values(&["batch_trivial_move"])
573 .observe(trivial_tasks.len() as f64);
574
575 for trivial_task in &trivial_tasks {
576 self.metrics
577 .compact_task_trivial_move_sst_count
578 .with_label_values(&[&trivial_task.compaction_group_id.to_string()])
579 .observe(trivial_task.input_ssts[0].table_infos.len() as _);
580 }
581
582 drop(versioning_guard);
583 } else {
584 drop(versioning_guard);
587 commit_multi_var!(
588 self.meta_store_ref(),
589 compaction_statuses,
590 compact_task_assignment
591 )?;
592 }
593 drop(compaction_guard);
594 if !pick_tasks.is_empty() {
595 self.metrics
596 .compact_task_batch_count
597 .with_label_values(&["batch_get_compact_task"])
598 .observe(pick_tasks.len() as f64);
599 }
600
601 for compact_task in &mut pick_tasks {
602 let compaction_group_id = compact_task.compaction_group_id;
603
604 self.compactor_manager
606 .initiate_task_heartbeat(compact_task.clone());
607
608 compact_task.task_status = TaskStatus::Pending;
610 let compact_task_statistics = statistics_compact_task(compact_task);
611
612 let level_type_label = build_compact_task_level_type_metrics_label(
613 compact_task.input_ssts[0].level_idx as usize,
614 compact_task.input_ssts.last().unwrap().level_idx as usize,
615 );
616
617 let level_count = compact_task.input_ssts.len();
618 if compact_task.input_ssts[0].level_idx == 0 {
619 self.metrics
620 .l0_compact_level_count
621 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
622 .observe(level_count as _);
623 }
624
625 self.metrics
626 .compact_task_size
627 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
628 .observe(compact_task_statistics.total_file_size as _);
629
630 self.metrics
631 .compact_task_size
632 .with_label_values(&[
633 &compaction_group_id.to_string(),
634 &format!("{} uncompressed", level_type_label),
635 ])
636 .observe(compact_task_statistics.total_uncompressed_file_size as _);
637
638 self.metrics
639 .compact_task_file_count
640 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
641 .observe(compact_task_statistics.total_file_count as _);
642
643 tracing::trace!(
644 "For compaction group {}: pick up {} {} sub_level in level {} to compact to target {}. cost time: {:?} compact_task_statistics {:?}",
645 compaction_group_id,
646 level_count,
647 compact_task.input_ssts[0].level_type.as_str_name(),
648 compact_task.input_ssts[0].level_idx,
649 compact_task.target_level,
650 start_time.elapsed(),
651 compact_task_statistics
652 );
653 }
654
655 #[cfg(test)]
656 {
657 self.check_state_consistency().await;
658 }
659 pick_tasks.extend(trivial_tasks);
660 Ok((pick_tasks, unschedule_groups))
661 }
662
663 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
665 fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore(
666 anyhow::anyhow!("failpoint metastore err")
667 )));
668 let ret = self
669 .cancel_compact_task_impl(vec![task_id], task_status)
670 .await?;
671 Ok(ret[0])
672 }
673
674 pub async fn cancel_compact_tasks(
675 &self,
676 tasks: Vec<u64>,
677 task_status: TaskStatus,
678 ) -> Result<Vec<bool>> {
679 self.cancel_compact_task_impl(tasks, task_status).await
680 }
681
682 async fn cancel_compact_task_impl(
683 &self,
684 task_ids: Vec<u64>,
685 task_status: TaskStatus,
686 ) -> Result<Vec<bool>> {
687 assert!(CANCEL_STATUS_SET.contains(&task_status));
688 let tasks = task_ids
689 .into_iter()
690 .map(|task_id| ReportTask {
691 task_id,
692 task_status,
693 sorted_output_ssts: vec![],
694 table_stats_change: HashMap::default(),
695 object_timestamps: HashMap::default(),
696 })
697 .collect_vec();
698 let rets = self.report_compact_tasks(tasks).await?;
699 #[cfg(test)]
700 {
701 self.check_state_consistency().await;
702 }
703 Ok(rets)
704 }
705
706 async fn get_compact_tasks(
707 &self,
708 mut compaction_groups: Vec<CompactionGroupId>,
709 max_select_count: usize,
710 selector: &mut dyn CompactionSelector,
711 ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
712 fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
713 anyhow::anyhow!("failpoint metastore error")
714 )));
715 compaction_groups.shuffle(&mut thread_rng());
716 let (mut tasks, groups) = self
717 .get_compact_tasks_impl(compaction_groups, max_select_count, selector)
718 .await?;
719 tasks.retain(|task| {
720 if task.task_status == TaskStatus::Success {
721 debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
722 false
723 } else {
724 true
725 }
726 });
727 Ok((tasks, groups))
728 }
729
730 pub async fn get_compact_task(
731 &self,
732 compaction_group_id: CompactionGroupId,
733 selector: &mut dyn CompactionSelector,
734 ) -> Result<Option<CompactTask>> {
735 fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
736 anyhow::anyhow!("failpoint metastore error")
737 )));
738
739 let (normal_tasks, _) = self
740 .get_compact_tasks_impl(vec![compaction_group_id], 1, selector)
741 .await?;
742 for task in normal_tasks {
743 if task.task_status != TaskStatus::Success {
744 return Ok(Some(task));
745 }
746 debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
747 }
748 Ok(None)
749 }
750
751 pub async fn manual_get_compact_task(
752 &self,
753 compaction_group_id: CompactionGroupId,
754 manual_compaction_option: ManualCompactionOption,
755 ) -> Result<Option<CompactTask>> {
756 let (task, _) = self
757 .manual_get_compact_task_with_info(compaction_group_id, manual_compaction_option)
758 .await?;
759 Ok(task)
760 }
761
762 pub async fn manual_get_compact_task_with_info(
763 &self,
764 compaction_group_id: CompactionGroupId,
765 manual_compaction_option: ManualCompactionOption,
766 ) -> Result<(Option<CompactTask>, bool)> {
767 let mut selector = ManualCompactionSelector::new(manual_compaction_option);
768 let task = self
769 .get_compact_task(compaction_group_id, &mut selector)
770 .await?;
771 if let Some(err) = selector.validation_error() {
772 return Err(Error::InvalidManualCompactionOption(err.to_owned()));
773 }
774 Ok((task, selector.blocked_by_pending()))
775 }
776
777 pub async fn report_compact_task(
778 &self,
779 task_id: u64,
780 task_status: TaskStatus,
781 sorted_output_ssts: Vec<SstableInfo>,
782 table_stats_change: Option<PbTableStatsMap>,
783 object_timestamps: HashMap<HummockSstableObjectId, u64>,
784 ) -> Result<bool> {
785 let rets = self
786 .report_compact_tasks(vec![ReportTask {
787 task_id,
788 task_status,
789 sorted_output_ssts,
790 table_stats_change: table_stats_change.unwrap_or_default(),
791 object_timestamps,
792 }])
793 .await?;
794 Ok(rets[0])
795 }
796
797 pub async fn report_compact_tasks(&self, report_tasks: Vec<ReportTask>) -> Result<Vec<bool>> {
798 let compaction_guard = self.compaction.write().await;
799 let versioning_guard = self.versioning.write().await;
800
801 self.report_compact_tasks_impl(report_tasks, compaction_guard, versioning_guard)
802 .await
803 }
804
805 pub async fn report_compact_tasks_impl(
813 &self,
814 report_tasks: Vec<ReportTask>,
815 mut compaction_guard: RwLockWriteGuard<'_, Compaction>,
816 mut versioning_guard: RwLockWriteGuard<'_, Versioning>,
817 ) -> Result<Vec<bool>> {
818 let deterministic_mode = self.env.opts.compaction_deterministic_test;
819 let compaction: &mut Compaction = &mut compaction_guard;
820 let start_time = Instant::now();
821 let original_keys = compaction.compaction_statuses.keys().cloned().collect_vec();
822 let mut compact_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
823 let mut rets = vec![false; report_tasks.len()];
824 let mut compact_task_assignment =
825 BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
826 let versioning: &mut Versioning = &mut versioning_guard;
828 let _timer = start_measure_real_process_timer!(self, "report_compact_tasks");
829
830 for group_id in original_keys {
832 if !versioning.current_version.levels.contains_key(&group_id) {
833 compact_statuses.remove(group_id);
834 }
835 }
836 let mut tasks = vec![];
837
838 let mut version = HummockVersionTransaction::new(
839 &mut versioning.current_version,
840 &mut versioning.hummock_version_deltas,
841 &mut versioning.table_change_log,
842 self.env.notification_manager(),
843 None,
844 &self.metrics,
845 &self.env.opts,
846 );
847
848 if deterministic_mode {
849 version.disable_apply_to_txn();
850 }
851
852 let mut version_stats = HummockVersionStatsTransaction::new(
853 &mut versioning.version_stats,
854 self.env.notification_manager(),
855 );
856 let mut success_count = 0;
857 let mut report_results = Vec::with_capacity(rets.len());
858 for (idx, task) in report_tasks.into_iter().enumerate() {
859 rets[idx] = true;
860 let task_id = task.task_id;
861 let mut task_status = task.task_status;
862 let mut compact_task = match compact_task_assignment.remove(task.task_id) {
863 Some(compact_task_assignment) => compact_task_assignment.compact_task,
864 None => {
865 tracing::warn!("{}", format!("compact task {} not found", task.task_id));
866 rets[idx] = false;
867 report_results.push(CompactionTaskReportResult {
868 task_id,
869 task_status,
870 reported: false,
871 });
872 continue;
873 }
874 };
875
876 {
877 compact_task.task_status = task.task_status;
879 compact_task.sorted_output_ssts = task.sorted_output_ssts;
880 }
881
882 match compact_statuses.get_mut(compact_task.compaction_group_id) {
883 Some(mut compact_status) => {
884 compact_status.report_compact_task(&compact_task);
885 }
886 None => {
887 compact_task.task_status = TaskStatus::InvalidGroupCanceled;
893 }
894 }
895
896 let is_success = if let TaskStatus::Success = compact_task.task_status {
897 match self
898 .report_compaction_sanity_check(&task.object_timestamps)
899 .await
900 {
901 Err(e) => {
902 warn!(
903 "failed to commit compaction task {} {}",
904 compact_task.task_id,
905 e.as_report()
906 );
907 compact_task.task_status = TaskStatus::RetentionTimeRejected;
908 false
909 }
910 _ => {
911 let group = version
912 .latest_version()
913 .levels
914 .get(&compact_task.compaction_group_id)
915 .unwrap();
916 let is_expired = compact_task.is_expired(group.compaction_group_version_id);
917 if is_expired {
918 compact_task.task_status = TaskStatus::InputOutdatedCanceled;
919 warn!(
920 "The task may be expired because of group split, task:\n {:?}",
921 compact_task_to_string(&compact_task)
922 );
923 }
924 !is_expired
925 }
926 }
927 } else {
928 false
929 };
930 if is_success {
931 success_count += 1;
932 version.apply_compact_task(&compact_task);
933 if purge_prost_table_stats(
934 &mut version_stats.table_stats,
935 version.latest_version(),
936 &HashSet::default(),
937 ) {
938 self.metrics.version_stats.reset();
939 versioning.local_metrics.clear();
940 }
941 add_prost_table_stats_map(&mut version_stats.table_stats, &task.table_stats_change);
942 trigger_local_table_stat(
943 &self.metrics,
944 &mut versioning.local_metrics,
945 &version_stats,
946 &task.table_stats_change,
947 );
948 }
949 task_status = compact_task.task_status;
950 report_results.push(CompactionTaskReportResult {
951 task_id,
952 task_status,
953 reported: rets[idx],
954 });
955 tasks.push(compact_task);
956 }
957 if success_count > 0 {
958 commit_multi_var!(
959 self.meta_store_ref(),
960 compact_statuses,
961 compact_task_assignment,
962 version,
963 version_stats
964 )?;
965
966 self.metrics
967 .compact_task_batch_count
968 .with_label_values(&["batch_report_task"])
969 .observe(success_count as f64);
970 } else {
971 commit_multi_var!(
973 self.meta_store_ref(),
974 compact_statuses,
975 compact_task_assignment
976 )?;
977 }
978
979 self.notify_compaction_task_report_waiters(report_results);
980
981 let mut success_groups = vec![];
982 for compact_task in &tasks {
983 self.compactor_manager
984 .remove_task_heartbeat(compact_task.task_id);
985 tracing::trace!(
986 "Reported compaction task. {}. cost time: {:?}",
987 compact_task_to_string(compact_task),
988 start_time.elapsed(),
989 );
990
991 if !deterministic_mode
992 && (matches!(compact_task.task_type, compact_task::TaskType::Dynamic)
993 || matches!(compact_task.task_type, compact_task::TaskType::Emergency))
994 {
995 self.try_send_compaction_request(
997 compact_task.compaction_group_id,
998 compact_task::TaskType::Dynamic,
999 );
1000 }
1001
1002 if compact_task.task_status == TaskStatus::Success {
1003 success_groups.push(compact_task.compaction_group_id);
1004 }
1005 }
1006
1007 trigger_compact_tasks_stat(
1008 &self.metrics,
1009 &tasks,
1010 &compaction.compaction_statuses,
1011 &versioning_guard.current_version,
1012 );
1013 drop(versioning_guard);
1014 if !success_groups.is_empty() {
1015 self.try_update_write_limits(&success_groups).await;
1016 }
1017 Ok(rets)
1018 }
1019
1020 pub async fn trigger_compaction_deterministic(
1023 &self,
1024 _base_version_id: HummockVersionId,
1025 compaction_groups: Vec<CompactionGroupId>,
1026 ) -> Result<()> {
1027 self.on_current_version(|old_version| {
1028 tracing::info!(
1029 "Trigger compaction for version {}, groups {:?}",
1030 old_version.id,
1031 compaction_groups
1032 );
1033 })
1034 .await;
1035
1036 if compaction_groups.is_empty() {
1037 return Ok(());
1038 }
1039 for compaction_group in compaction_groups {
1040 self.try_send_compaction_request(compaction_group, compact_task::TaskType::Dynamic);
1041 }
1042 Ok(())
1043 }
1044
1045 pub async fn trigger_manual_compaction(
1046 &self,
1047 compaction_group: CompactionGroupId,
1048 manual_compaction_option: ManualCompactionOption,
1049 ) -> Result<ManualCompactionTriggerResult> {
1050 let start_time = Instant::now();
1051 let exclusive = manual_compaction_option.exclusive;
1052
1053 let compactor = match self.compactor_manager.next_compactor() {
1055 Some(compactor) => compactor,
1056 None => {
1057 tracing::warn!("trigger_manual_compaction No compactor is available.");
1058 return Err(anyhow::anyhow!(
1059 "trigger_manual_compaction No compactor is available. compaction_group {}",
1060 compaction_group
1061 )
1062 .into());
1063 }
1064 };
1065
1066 let compact_task = self
1068 .manual_get_compact_task_with_info(compaction_group, manual_compaction_option)
1069 .await;
1070 let (compact_task, blocked_by_pending) = match compact_task {
1071 Ok((compact_task, blocked_by_pending)) => (compact_task, blocked_by_pending),
1072 Err(err) => {
1073 tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
1074 if matches!(err, Error::InvalidManualCompactionOption(_)) {
1075 return Err(err);
1076 }
1077
1078 return Err(anyhow::anyhow!(err)
1079 .context(format!(
1080 "Failed to get compaction task for compaction_group {}",
1081 compaction_group,
1082 ))
1083 .into());
1084 }
1085 };
1086 let compact_task = match compact_task {
1087 Some(compact_task) => compact_task,
1088 None => {
1089 if exclusive && blocked_by_pending {
1090 return Ok(ManualCompactionTriggerResult::Retry);
1091 }
1092 return Err(anyhow::anyhow!(
1094 "trigger_manual_compaction No compaction_task is available. compaction_group {}",
1095 compaction_group
1096 )
1097 .into());
1098 }
1099 };
1100
1101 let task_id = compact_task.task_id;
1103 let compact_task_string = compact_task_to_string(&compact_task);
1104 tracing::info!(
1105 compact_task_string,
1106 duration = ?start_time.elapsed(),
1107 "Triggered manual compaction task."
1108 );
1109
1110 let report_rx = self.register_compaction_task_report_waiter(task_id);
1111 if let Err(err) = compactor
1112 .send_event(ResponseEvent::CompactTask(compact_task.into()))
1113 .with_context(|| {
1114 format!(
1115 "Failed to trigger compaction task for compaction_group {}",
1116 compaction_group,
1117 )
1118 })
1119 {
1120 self.remove_compaction_task_report_waiter(task_id);
1121 return Err(err.into());
1122 }
1123
1124 let report_result = match report_rx.await {
1125 Ok(result) => result,
1126 Err(_) => {
1127 self.remove_compaction_task_report_waiter(task_id);
1128 return Err(anyhow::anyhow!(
1129 "trigger_manual_compaction wait report failed. compaction_group {}",
1130 compaction_group
1131 )
1132 .into());
1133 }
1134 };
1135 if !report_result.reported {
1136 return Err(anyhow::anyhow!(
1137 "trigger_manual_compaction report not accepted. task_id {}",
1138 report_result.task_id
1139 )
1140 .into());
1141 }
1142
1143 if report_result.task_status == TaskStatus::NoAvailCpuResourceCanceled
1144 || report_result.task_status == TaskStatus::NoAvailMemoryResourceCanceled
1145 {
1146 return Ok(ManualCompactionTriggerResult::Retry);
1147 }
1148
1149 tracing::info!(
1150 ?report_result,
1151 duration = ?start_time.elapsed(),
1152 "Completed manual compaction task."
1153 );
1154
1155 Ok(ManualCompactionTriggerResult::Submitted)
1156 }
1157
1158 pub fn try_send_compaction_request(
1160 &self,
1161 compaction_group: CompactionGroupId,
1162 task_type: compact_task::TaskType,
1163 ) -> bool {
1164 self.compaction_state.try_sched_compaction(
1165 compaction_group,
1166 task_type,
1167 ScheduleTrigger::NewData,
1168 )
1169 }
1170
1171 fn apply_split_weight_by_vnode_partition(
1174 &self,
1175 compact_task: &mut CompactTask,
1176 compaction_config: &CompactionConfig,
1177 compact_table_ids: &[TableId],
1178 ) {
1179 if compaction_config.split_weight_by_vnode > 0 {
1180 for table_id in compact_table_ids {
1181 compact_task
1182 .table_vnode_partition
1183 .insert(*table_id, compact_task.split_weight_by_vnode);
1184 }
1185
1186 return;
1187 }
1188
1189 let mut table_size_info: HashMap<TableId, u64> = HashMap::default();
1191 for input_ssts in &compact_task.input_ssts {
1192 for sst in &input_ssts.table_infos {
1193 for table_id in &sst.table_ids {
1194 *table_size_info.entry(*table_id).or_default() +=
1195 sst.sst_size / (sst.table_ids.len() as u64);
1196 }
1197 }
1198 }
1199
1200 let hybrid_vnode_count = self.env.opts.hybrid_partition_node_count;
1201 let default_partition_count = self.env.opts.partition_vnode_count;
1202 let compact_task_table_size_partition_threshold_low = self
1203 .env
1204 .opts
1205 .compact_task_table_size_partition_threshold_low;
1206 let compact_task_table_size_partition_threshold_high = self
1207 .env
1208 .opts
1209 .compact_task_table_size_partition_threshold_high;
1210
1211 let table_write_throughput_statistic_manager =
1213 self.table_write_throughput_statistic_manager.read();
1214 let timestamp = chrono::Utc::now().timestamp();
1215
1216 for (table_id, compact_table_size) in table_size_info {
1217 let write_throughput = table_write_throughput_statistic_manager
1218 .get_table_throughput_descending(table_id, timestamp)
1219 .peekable()
1220 .peek()
1221 .map(|item| item.throughput)
1222 .unwrap_or(0);
1223
1224 if compact_table_size > compact_task_table_size_partition_threshold_high
1225 && default_partition_count > 0
1226 {
1227 compact_task
1228 .table_vnode_partition
1229 .insert(table_id, default_partition_count);
1230 } else if (compact_table_size > compact_task_table_size_partition_threshold_low
1231 || (write_throughput > self.env.opts.table_high_write_throughput_threshold
1232 && compact_table_size > compaction_config.target_file_size_base))
1233 && hybrid_vnode_count > 0
1234 {
1235 compact_task
1236 .table_vnode_partition
1237 .insert(table_id, hybrid_vnode_count);
1238 } else if compact_table_size > compaction_config.target_file_size_base {
1239 compact_task.table_vnode_partition.insert(table_id, 1);
1240 }
1241 }
1242
1243 compact_task
1244 .table_vnode_partition
1245 .retain(|table_id, _| compact_table_ids.contains(table_id));
1246 }
1247
1248 pub(crate) fn calculate_vnode_partition(
1249 &self,
1250 compact_task: &mut CompactTask,
1251 compaction_config: &CompactionConfig,
1252 compact_table_ids: &[TableId],
1253 ) {
1254 if compact_task.target_level > compact_task.base_level {
1259 return;
1260 }
1261
1262 self.apply_split_weight_by_vnode_partition(
1264 compact_task,
1265 compaction_config,
1266 compact_table_ids,
1267 );
1268 }
1269
1270 fn build_ready_compact_task(
1271 &self,
1272 picked_task: PickedCompactionTask,
1273 context: CompactTaskBuildContext,
1274 table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
1275 all_versioned_table_schemas: &HashMap<TableId, Vec<i32>>,
1276 ) -> BuiltCompactTask {
1277 let compaction_config = context.compaction_config.clone();
1278 let (mut compact_task, compact_table_ids) = build_base_compact_task(picked_task, context);
1279
1280 if compact_task.is_trivial_reclaim() {
1281 compact_task.task_status = TaskStatus::Success;
1282 compact_task.sorted_output_ssts.clear();
1283 return BuiltCompactTask::MetaFinished(compact_task);
1284 }
1285
1286 if compact_task.is_trivial_move_task() {
1287 compact_task.task_status = TaskStatus::Success;
1288 compact_task.sorted_output_ssts = compact_task.input_ssts[0]
1289 .read_sstable_infos()
1290 .cloned()
1291 .collect();
1292 return BuiltCompactTask::MetaFinished(compact_task);
1293 }
1294
1295 self.prepare_compact_task_for_assignment(
1296 &mut compact_task,
1297 compaction_config.as_ref(),
1298 &compact_table_ids,
1299 safe_epoch_table_watermarks_impl(table_watermarks, &compact_table_ids),
1300 all_versioned_table_schemas,
1301 );
1302
1303 BuiltCompactTask::PendingAssignment(compact_task)
1304 }
1305
1306 fn prepare_compact_task_for_assignment(
1307 &self,
1308 compact_task: &mut CompactTask,
1309 compaction_config: &CompactionConfig,
1310 compact_table_ids: &[TableId],
1311 table_watermarks: BTreeMap<TableId, TableWatermarks>,
1312 all_versioned_table_schemas: &HashMap<TableId, Vec<i32>>,
1313 ) {
1314 self.calculate_vnode_partition(compact_task, compaction_config, compact_table_ids);
1315 attach_compact_task_table_metadata(
1316 compact_task,
1317 compact_table_ids,
1318 table_watermarks,
1319 all_versioned_table_schemas,
1320 );
1321 }
1322
1323 pub fn compactor_manager_ref(&self) -> crate::hummock::CompactorManagerRef {
1324 self.compactor_manager.clone()
1325 }
1326
1327 fn register_compaction_task_report_waiter(
1328 &self,
1329 task_id: HummockCompactionTaskId,
1330 ) -> Receiver<CompactionTaskReportResult> {
1331 let (tx, rx) = tokio::sync::oneshot::channel();
1332 self.compaction_task_report_notifiers
1333 .lock()
1334 .register(task_id, tx);
1335 rx
1336 }
1337
1338 fn remove_compaction_task_report_waiter(&self, task_id: HummockCompactionTaskId) {
1339 self.compaction_task_report_notifiers.lock().remove(task_id);
1340 }
1341
1342 fn notify_compaction_task_report_waiters(&self, results: Vec<CompactionTaskReportResult>) {
1343 let mut guard = self.compaction_task_report_notifiers.lock();
1344 for result in results {
1345 guard.notify(result);
1346 }
1347 }
1348}
1349
1350#[cfg(any(test, feature = "test"))]
1351impl HummockManager {
1352 pub async fn compaction_task_from_assignment_for_test(
1353 &self,
1354 task_id: u64,
1355 ) -> Option<CompactTaskAssignment> {
1356 let compaction_guard = self.compaction.read().await;
1357 let assignment_ref = &compaction_guard.compact_task_assignment;
1358 assignment_ref.get(&task_id).cloned()
1359 }
1360
1361 pub async fn report_compact_task_for_test(
1362 &self,
1363 task_id: u64,
1364 compact_task: Option<CompactTask>,
1365 task_status: TaskStatus,
1366 sorted_output_ssts: Vec<SstableInfo>,
1367 table_stats_change: Option<PbTableStatsMap>,
1368 ) -> Result<()> {
1369 if let Some(task) = compact_task {
1370 let mut guard = self.compaction.write().await;
1371 guard.compact_task_assignment.insert(
1372 task_id,
1373 CompactTaskAssignment {
1374 compact_task: task,
1375 context_id: 0.into(),
1376 },
1377 );
1378 }
1379
1380 self.report_compact_tasks(vec![ReportTask {
1383 task_id,
1384 task_status,
1385 sorted_output_ssts,
1386 table_stats_change: table_stats_change.unwrap_or_default(),
1387 object_timestamps: HashMap::default(),
1388 }])
1389 .await?;
1390 Ok(())
1391 }
1392}
1393
1394#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1396pub enum ScheduleTrigger {
1397 NewData,
1399 Periodic,
1401}
1402
1403pub struct CompactionScheduleSnapshot {
1408 scheduled: HashSet<(CompactionGroupId, compact_task::TaskType)>,
1409 snapshot_time: Instant,
1410}
1411
1412impl CompactionScheduleSnapshot {
1413 const TASK_TYPE_PRIORITY: &[TaskType] = &[
1415 TaskType::Dynamic,
1416 TaskType::SpaceReclaim,
1417 TaskType::Ttl,
1418 TaskType::Tombstone,
1419 TaskType::VnodeWatermark,
1420 ];
1421
1422 pub fn snapshot_time(&self) -> Instant {
1423 self.snapshot_time
1424 }
1425
1426 pub fn pick_compaction_groups_and_type(&self) -> Option<(Vec<CompactionGroupId>, TaskType)> {
1431 let group_ids = self.group_ids_shuffled();
1432 let mut normal_groups = vec![];
1433 for cg_id in group_ids {
1434 if let Some(pick_type) = self.pick_type(cg_id) {
1435 if pick_type == TaskType::Dynamic {
1436 normal_groups.push(cg_id);
1437 } else if normal_groups.is_empty() {
1438 return Some((vec![cg_id], pick_type));
1439 }
1440 }
1441 }
1442 if normal_groups.is_empty() {
1443 None
1444 } else {
1445 Some((normal_groups, TaskType::Dynamic))
1446 }
1447 }
1448
1449 fn group_ids_shuffled(&self) -> Vec<CompactionGroupId> {
1450 let mut group_ids: Vec<_> = self.scheduled.iter().map(|(g, _)| *g).unique().collect();
1451 group_ids.shuffle(&mut thread_rng());
1452 group_ids
1453 }
1454
1455 fn pick_type(&self, group: CompactionGroupId) -> Option<TaskType> {
1456 Self::TASK_TYPE_PRIORITY
1457 .iter()
1458 .find(|t| self.scheduled.contains(&(group, **t)))
1459 .copied()
1460 }
1461}
1462
1463#[derive(Debug, Default)]
1468pub struct CompactionState {
1469 inner: Mutex<CompactionStateInner>,
1470}
1471
1472#[derive(Debug, Default)]
1473struct CompactionStateInner {
1474 scheduled: HashSet<(CompactionGroupId, compact_task::TaskType)>,
1475 dynamic_cooldown: HashSet<CompactionGroupId>,
1477 last_new_data_time: HashMap<CompactionGroupId, Instant>,
1479}
1480
1481impl CompactionState {
1482 pub fn new() -> Self {
1483 Self {
1484 inner: Default::default(),
1485 }
1486 }
1487
1488 pub fn try_sched_compaction(
1492 &self,
1493 compaction_group: CompactionGroupId,
1494 task_type: TaskType,
1495 trigger: ScheduleTrigger,
1496 ) -> bool {
1497 let mut guard = self.inner.lock();
1498 if task_type == TaskType::Dynamic {
1499 match trigger {
1500 ScheduleTrigger::NewData => {
1501 guard.dynamic_cooldown.remove(&compaction_group);
1502 guard
1503 .last_new_data_time
1504 .insert(compaction_group, Instant::now());
1505 }
1506 ScheduleTrigger::Periodic => {
1507 if guard.dynamic_cooldown.contains(&compaction_group) {
1508 return false;
1509 }
1510 }
1511 }
1512 }
1513 guard.scheduled.insert((compaction_group, task_type))
1514 }
1515
1516 pub fn unschedule(
1519 &self,
1520 compaction_group: CompactionGroupId,
1521 task_type: compact_task::TaskType,
1522 snapshot_time: Instant,
1523 ) {
1524 let mut guard = self.inner.lock();
1525 guard.scheduled.remove(&(compaction_group, task_type));
1526 if task_type == TaskType::Dynamic {
1527 let has_new_data = guard
1528 .last_new_data_time
1529 .get(&compaction_group)
1530 .is_some_and(|t| *t > snapshot_time);
1531 if !has_new_data {
1532 guard.dynamic_cooldown.insert(compaction_group);
1533 }
1534 }
1535 }
1536
1537 pub fn snapshot(&self) -> CompactionScheduleSnapshot {
1539 let guard = self.inner.lock();
1540 let snapshot_time = Instant::now();
1542 CompactionScheduleSnapshot {
1543 scheduled: guard.scheduled.clone(),
1544 snapshot_time,
1545 }
1546 }
1547
1548 pub fn remove_compaction_group(&self, compaction_group: CompactionGroupId) {
1550 let mut guard = self.inner.lock();
1551 guard
1552 .scheduled
1553 .retain(|(group, _)| *group != compaction_group);
1554 guard.dynamic_cooldown.remove(&compaction_group);
1555 guard.last_new_data_time.remove(&compaction_group);
1556 }
1557}
1558
1559impl Compaction {
1560 pub fn get_compact_task_assignments_by_group_id(
1561 &self,
1562 compaction_group_id: CompactionGroupId,
1563 ) -> Vec<CompactTaskAssignment> {
1564 self.compact_task_assignment
1565 .values()
1566 .filter_map(|assignment| {
1567 if assignment.compact_task.compaction_group_id == compaction_group_id {
1568 Some(assignment.clone())
1569 } else {
1570 None
1571 }
1572 })
1573 .collect()
1574 }
1575}
1576
1577#[derive(Clone, Default)]
1578pub struct CompactionGroupStatistic {
1579 pub group_id: CompactionGroupId,
1580 pub group_size: u64,
1581 pub table_statistic: BTreeMap<StateTableId, u64>,
1582 pub compaction_group_config: CompactionGroup,
1583}
1584
1585fn update_table_stats_for_vnode_watermark_trivial_reclaim(
1587 table_stats: &mut PbTableStatsMap,
1588 task: &CompactTask,
1589) {
1590 if task.task_type != TaskType::VnodeWatermark {
1591 return;
1592 }
1593 let mut deleted_table_keys: HashMap<TableId, u64> = HashMap::default();
1594 for s in task.input_ssts.iter().flat_map(|l| l.table_infos.iter()) {
1595 assert_eq!(s.table_ids.len(), 1);
1596 let e = deleted_table_keys.entry(s.table_ids[0]).or_insert(0);
1597 *e += s.total_key_count;
1598 }
1599 for (table_id, delete_count) in deleted_table_keys {
1600 let Some(stats) = table_stats.get_mut(&table_id) else {
1601 continue;
1602 };
1603 if stats.total_key_count == 0 {
1604 continue;
1605 }
1606 let new_total_key_count = stats.total_key_count.saturating_sub(delete_count as i64);
1607 let ratio = new_total_key_count as f64 / stats.total_key_count as f64;
1608 stats.total_key_count = new_total_key_count;
1610 stats.total_key_size = (stats.total_key_size as f64 * ratio).ceil() as i64;
1612 stats.total_value_size = (stats.total_value_size as f64 * ratio).ceil() as i64;
1613 }
1614}
1615
1616#[derive(Debug, Clone)]
1617pub enum GroupState {
1618 Normal,
1620
1621 Emergency(String), WriteStop(String), }
1627
1628impl GroupState {
1629 pub fn is_write_stop(&self) -> bool {
1630 matches!(self, Self::WriteStop(_))
1631 }
1632
1633 pub fn is_emergency(&self) -> bool {
1634 matches!(self, Self::Emergency(_))
1635 }
1636
1637 pub fn reason(&self) -> Option<&str> {
1638 match self {
1639 Self::Emergency(reason) | Self::WriteStop(reason) => Some(reason),
1640 _ => None,
1641 }
1642 }
1643}
1644
1645#[derive(Clone, Default)]
1646pub struct GroupStateValidator;
1647
1648impl GroupStateValidator {
1649 pub fn write_stop_sub_level_count(
1650 level_count: usize,
1651 compaction_config: &CompactionConfig,
1652 ) -> bool {
1653 let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize;
1654 level_count > threshold
1655 }
1656
1657 pub fn write_stop_l0_size(l0_size: u64, compaction_config: &CompactionConfig) -> bool {
1658 l0_size
1659 > compaction_config
1660 .level0_stop_write_threshold_max_size
1661 .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1662 }
1663
1664 pub fn write_stop_l0_file_count(
1665 l0_file_count: usize,
1666 compaction_config: &CompactionConfig,
1667 ) -> bool {
1668 l0_file_count
1669 > compaction_config
1670 .level0_stop_write_threshold_max_sst_count
1671 .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1672 as usize
1673 }
1674
1675 pub fn emergency_l0_file_count(
1676 l0_file_count: usize,
1677 compaction_config: &CompactionConfig,
1678 ) -> bool {
1679 l0_file_count
1680 > compaction_config
1681 .emergency_level0_sst_file_count
1682 .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1683 as usize
1684 }
1685
1686 pub fn emergency_l0_partition_count(
1687 last_l0_sub_level_partition_count: usize,
1688 compaction_config: &CompactionConfig,
1689 ) -> bool {
1690 last_l0_sub_level_partition_count
1691 > compaction_config
1692 .emergency_level0_sub_level_partition
1693 .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1694 as usize
1695 }
1696
1697 pub fn check_single_group_write_stop(
1698 levels: &Levels,
1699 compaction_config: &CompactionConfig,
1700 ) -> GroupState {
1701 if Self::write_stop_sub_level_count(levels.l0.sub_levels.len(), compaction_config) {
1702 return GroupState::WriteStop(format!(
1703 "WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels",
1704 levels.l0.sub_levels.len(),
1705 compaction_config.level0_stop_write_threshold_sub_level_number
1706 ));
1707 }
1708
1709 if Self::write_stop_l0_file_count(
1710 levels
1711 .l0
1712 .sub_levels
1713 .iter()
1714 .map(|l| l.table_infos.len())
1715 .sum(),
1716 compaction_config,
1717 ) {
1718 return GroupState::WriteStop(format!(
1719 "WriteStop(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1720 levels
1721 .l0
1722 .sub_levels
1723 .iter()
1724 .map(|l| l.table_infos.len())
1725 .sum::<usize>(),
1726 compaction_config
1727 .level0_stop_write_threshold_max_sst_count
1728 .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1729 ));
1730 }
1731
1732 if Self::write_stop_l0_size(levels.l0.total_file_size, compaction_config) {
1733 return GroupState::WriteStop(format!(
1734 "WriteStop(l0_size: {}, threshold: {}) too large L0 size",
1735 levels.l0.total_file_size,
1736 compaction_config
1737 .level0_stop_write_threshold_max_size
1738 .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1739 ));
1740 }
1741
1742 GroupState::Normal
1743 }
1744
1745 pub fn check_single_group_emergency(
1746 levels: &Levels,
1747 compaction_config: &CompactionConfig,
1748 ) -> GroupState {
1749 if Self::emergency_l0_file_count(
1750 levels
1751 .l0
1752 .sub_levels
1753 .iter()
1754 .map(|l| l.table_infos.len())
1755 .sum(),
1756 compaction_config,
1757 ) {
1758 return GroupState::Emergency(format!(
1759 "Emergency(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1760 levels
1761 .l0
1762 .sub_levels
1763 .iter()
1764 .map(|l| l.table_infos.len())
1765 .sum::<usize>(),
1766 compaction_config
1767 .emergency_level0_sst_file_count
1768 .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1769 ));
1770 }
1771
1772 if Self::emergency_l0_partition_count(
1773 levels
1774 .l0
1775 .sub_levels
1776 .first()
1777 .map(|l| l.table_infos.len())
1778 .unwrap_or(0),
1779 compaction_config,
1780 ) {
1781 return GroupState::Emergency(format!(
1782 "Emergency(l0_partition_count: {}, threshold: {}) too many L0 partitions",
1783 levels
1784 .l0
1785 .sub_levels
1786 .first()
1787 .map(|l| l.table_infos.len())
1788 .unwrap_or(0),
1789 compaction_config
1790 .emergency_level0_sub_level_partition
1791 .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1792 ));
1793 }
1794
1795 GroupState::Normal
1796 }
1797
1798 pub fn group_state(levels: &Levels, compaction_config: &CompactionConfig) -> GroupState {
1799 let state = Self::check_single_group_write_stop(levels, compaction_config);
1800 if state.is_write_stop() {
1801 return state;
1802 }
1803
1804 Self::check_single_group_emergency(levels, compaction_config)
1805 }
1806}
1807
1808#[cfg(test)]
1809mod prefetched_task_id_tests {
1810 use crate::manager::MetaOpts;
1811
1812 #[test]
1813 fn test_compaction_task_id_refill_capacity_default() {
1814 assert_eq!(MetaOpts::test(false).compaction_task_id_refill_capacity, 64);
1815 }
1816}
1817
1818#[cfg(test)]
1819mod compaction_state_tests {
1820 use risingwave_pb::hummock::compact_task::TaskType;
1821
1822 use super::*;
1823
1824 #[test]
1825 fn test_basic_schedule_and_unschedule() {
1826 let state = CompactionState::new();
1827 let group_id: CompactionGroupId = 1.into();
1828
1829 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1831 assert!(!state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1833 assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1835
1836 let snapshot = state.snapshot();
1838 assert!(snapshot.scheduled.contains(&(group_id, TaskType::Dynamic)));
1839 assert!(snapshot.scheduled.contains(&(group_id, TaskType::Ttl)));
1840
1841 state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1843 let snapshot2 = state.snapshot();
1844 assert!(!snapshot2.scheduled.contains(&(group_id, TaskType::Dynamic)));
1845 assert!(snapshot2.scheduled.contains(&(group_id, TaskType::Ttl)));
1846 }
1847
1848 #[test]
1849 fn test_cooldown_blocks_periodic_trigger() {
1850 let state = CompactionState::new();
1851 let group_id: CompactionGroupId = 1.into();
1852
1853 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1855 let snapshot = state.snapshot();
1856 state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1857
1858 assert!(state.inner.lock().dynamic_cooldown.contains(&group_id));
1860
1861 assert!(!state.try_sched_compaction(
1863 group_id,
1864 TaskType::Dynamic,
1865 ScheduleTrigger::Periodic
1866 ));
1867 }
1868
1869 #[test]
1870 fn test_new_data_clears_cooldown() {
1871 let state = CompactionState::new();
1872 let group_id: CompactionGroupId = 1.into();
1873
1874 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1876 let snapshot = state.snapshot();
1877 state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1878 assert!(state.inner.lock().dynamic_cooldown.contains(&group_id));
1879
1880 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1882 assert!(!state.inner.lock().dynamic_cooldown.contains(&group_id));
1883 }
1884
1885 #[test]
1886 fn test_cooldown_only_affects_dynamic_type() {
1887 let state = CompactionState::new();
1888 let group_id: CompactionGroupId = 1.into();
1889
1890 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1892 let snapshot = state.snapshot();
1893 state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1894
1895 let group_id_2: CompactionGroupId = 2.into();
1897 assert!(state.try_sched_compaction(group_id_2, TaskType::Ttl, ScheduleTrigger::Periodic));
1898 let snapshot2 = state.snapshot();
1899 state.unschedule(group_id_2, TaskType::Ttl, snapshot2.snapshot_time());
1900 assert!(!state.inner.lock().dynamic_cooldown.contains(&group_id_2));
1901
1902 assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1904 assert!(state.try_sched_compaction(
1905 group_id,
1906 TaskType::SpaceReclaim,
1907 ScheduleTrigger::Periodic
1908 ));
1909 }
1910
1911 #[test]
1912 fn test_race_condition_new_data_after_snapshot() {
1913 let state = CompactionState::new();
1914 let group_id: CompactionGroupId = 1.into();
1915
1916 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1917 let snapshot = state.snapshot();
1918
1919 {
1921 let mut guard = state.inner.lock();
1922 guard.last_new_data_time.insert(group_id, Instant::now());
1923 }
1924
1925 state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1927 assert!(
1928 !state.inner.lock().dynamic_cooldown.contains(&group_id),
1929 "Should skip cooldown when new data arrived after snapshot"
1930 );
1931 }
1932
1933 #[test]
1934 fn test_remove_compaction_group_cleans_all_state() {
1935 let state = CompactionState::new();
1936 let group_id: CompactionGroupId = 1.into();
1937
1938 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1940 assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1941 state.inner.lock().dynamic_cooldown.insert(group_id);
1942
1943 state.remove_compaction_group(group_id);
1945
1946 let guard = state.inner.lock();
1948 assert!(!guard.scheduled.contains(&(group_id, TaskType::Dynamic)));
1949 assert!(!guard.scheduled.contains(&(group_id, TaskType::Ttl)));
1950 assert!(!guard.dynamic_cooldown.contains(&group_id));
1951 assert!(!guard.last_new_data_time.contains_key(&group_id));
1952 }
1953
1954 #[test]
1955 fn test_snapshot_pick_type_priority() {
1956 let state = CompactionState::new();
1957 let group_id: CompactionGroupId = 1.into();
1958
1959 assert_eq!(state.snapshot().pick_type(group_id), None);
1961
1962 state.try_sched_compaction(
1964 group_id,
1965 TaskType::VnodeWatermark,
1966 ScheduleTrigger::Periodic,
1967 );
1968 assert_eq!(
1969 state.snapshot().pick_type(group_id),
1970 Some(TaskType::VnodeWatermark)
1971 );
1972
1973 state.try_sched_compaction(group_id, TaskType::Tombstone, ScheduleTrigger::Periodic);
1974 assert_eq!(
1975 state.snapshot().pick_type(group_id),
1976 Some(TaskType::Tombstone)
1977 );
1978
1979 state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic);
1980 assert_eq!(state.snapshot().pick_type(group_id), Some(TaskType::Ttl));
1981
1982 state.try_sched_compaction(group_id, TaskType::SpaceReclaim, ScheduleTrigger::Periodic);
1983 assert_eq!(
1984 state.snapshot().pick_type(group_id),
1985 Some(TaskType::SpaceReclaim)
1986 );
1987
1988 state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData);
1989 assert_eq!(
1990 state.snapshot().pick_type(group_id),
1991 Some(TaskType::Dynamic)
1992 );
1993 }
1994
1995 #[test]
1996 fn test_multiple_groups_independent_cooldown() {
1997 let state = CompactionState::new();
1998 let g1: CompactionGroupId = 1.into();
1999 let g2: CompactionGroupId = 2.into();
2000
2001 state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
2002 state.try_sched_compaction(g2, TaskType::Dynamic, ScheduleTrigger::NewData);
2003 let snapshot = state.snapshot();
2004
2005 state.unschedule(g1, TaskType::Dynamic, snapshot.snapshot_time());
2007
2008 let guard = state.inner.lock();
2009 assert!(guard.dynamic_cooldown.contains(&g1));
2010 assert!(!guard.dynamic_cooldown.contains(&g2));
2011 }
2012
2013 #[test]
2014 fn test_pick_compaction_groups_empty() {
2015 let state = CompactionState::new();
2016 let snapshot = state.snapshot();
2017 assert!(snapshot.pick_compaction_groups_and_type().is_none());
2019 }
2020
2021 #[test]
2022 fn test_pick_compaction_groups_mixed_types() {
2023 let state = CompactionState::new();
2024 let g1: CompactionGroupId = 1.into();
2025 let g2: CompactionGroupId = 2.into();
2026 let g3: CompactionGroupId = 3.into();
2027
2028 state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
2030 state.try_sched_compaction(g2, TaskType::Ttl, ScheduleTrigger::Periodic);
2031 state.try_sched_compaction(g3, TaskType::Dynamic, ScheduleTrigger::NewData);
2032
2033 let snapshot = state.snapshot();
2034 let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
2035
2036 if task_type == TaskType::Dynamic {
2041 assert!(groups.contains(&g1));
2042 assert!(groups.contains(&g3));
2043 assert!(!groups.contains(&g2)); } else {
2045 assert_eq!(task_type, TaskType::Ttl);
2046 assert_eq!(groups, vec![g2]);
2047 }
2048 }
2049
2050 #[test]
2051 fn test_pick_compaction_groups_all_dynamic() {
2052 let state = CompactionState::new();
2053 let g1: CompactionGroupId = 1.into();
2054 let g2: CompactionGroupId = 2.into();
2055
2056 state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
2057 state.try_sched_compaction(g2, TaskType::Dynamic, ScheduleTrigger::NewData);
2058
2059 let snapshot = state.snapshot();
2060 let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
2061 assert_eq!(task_type, TaskType::Dynamic);
2062 assert!(groups.contains(&g1));
2063 assert!(groups.contains(&g2));
2064 }
2065
2066 #[test]
2067 fn test_pick_compaction_groups_single_non_dynamic() {
2068 let state = CompactionState::new();
2069 let g1: CompactionGroupId = 1.into();
2070
2071 state.try_sched_compaction(g1, TaskType::SpaceReclaim, ScheduleTrigger::Periodic);
2072
2073 let snapshot = state.snapshot();
2074 let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
2075 assert_eq!(task_type, TaskType::SpaceReclaim);
2076 assert_eq!(groups, vec![g1]);
2077 }
2078}