risingwave_meta/hummock/manager/compaction/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17use std::time::Instant;
18
19use anyhow::Context;
20use compaction_event_loop::{
21    HummockCompactionEventDispatcher, HummockCompactionEventHandler, HummockCompactionEventLoop,
22    HummockCompactorDedicatedEventLoop,
23};
24use fail::fail_point;
25use itertools::Itertools;
26use parking_lot::Mutex;
27use rand::rng as thread_rng;
28use rand::seq::SliceRandom;
29use risingwave_common::catalog::TableId;
30use risingwave_common::config::meta::default::compaction_config;
31use risingwave_common::util::epoch::Epoch;
32use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask};
33use risingwave_hummock_sdk::compaction_group::StateTableId;
34use risingwave_hummock_sdk::key_range::KeyRange;
35use risingwave_hummock_sdk::level::Levels;
36use risingwave_hummock_sdk::sstable_info::SstableInfo;
37use risingwave_hummock_sdk::table_stats::{
38    PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats,
39};
40use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
41use risingwave_hummock_sdk::version::{GroupDelta, IntraLevelDelta};
42use risingwave_hummock_sdk::{
43    CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockSstableId,
44    HummockSstableObjectId, HummockVersionId, compact_task_to_string, statistics_compact_task,
45};
46use risingwave_meta_model::hummock_sequence::COMPACTION_TASK_ID;
47use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
48use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
49use risingwave_pb::hummock::{
50    CompactTaskAssignment, CompactionConfig, PbCompactStatus, PbCompactTaskAssignment,
51    SubscribeCompactionEventRequest, TableOption, TableSchema, compact_task,
52};
53use thiserror_ext::AsReport;
54use tokio::sync::RwLockWriteGuard;
55use tokio::sync::mpsc::UnboundedReceiver;
56use tokio::sync::oneshot::{Receiver, Sender};
57use tokio::task::JoinHandle;
58use tonic::Streaming;
59use tracing::warn;
60
61use crate::hummock::compaction::selector::level_selector::PickerInfo;
62use crate::hummock::compaction::selector::{
63    DynamicLevelSelector, DynamicLevelSelectorCore, LocalSelectorStatistic, ManualCompactionOption,
64    ManualCompactionSelector, SpaceReclaimCompactionSelector, TombstoneCompactionSelector,
65    TtlCompactionSelector, VnodeWatermarkCompactionSelector,
66};
67use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig, CompactionSelector};
68use crate::hummock::error::{Error, Result};
69use crate::hummock::manager::CompactionTaskReportResult;
70use crate::hummock::manager::transaction::{
71    HummockVersionStatsTransaction, HummockVersionTransaction,
72};
73use crate::hummock::manager::versioning::Versioning;
74use crate::hummock::metrics_utils::{
75    build_compact_task_level_type_metrics_label, trigger_compact_tasks_stat,
76    trigger_local_table_stat,
77};
78use crate::hummock::model::CompactionGroup;
79use crate::hummock::{HummockManager, commit_multi_var, start_measure_real_process_timer};
80use crate::manager::META_NODE_ID;
81use crate::model::BTreeMapTransaction;
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84pub enum ManualCompactionTriggerResult {
85    Submitted,
86    Retry,
87}
88
89pub mod compaction_event_loop;
90pub mod compaction_group_manager;
91pub mod compaction_group_schedule;
92
93static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
94    [
95        TaskStatus::ManualCanceled,
96        TaskStatus::SendFailCanceled,
97        TaskStatus::AssignFailCanceled,
98        TaskStatus::HeartbeatCanceled,
99        TaskStatus::InvalidGroupCanceled,
100        TaskStatus::NoAvailMemoryResourceCanceled,
101        TaskStatus::NoAvailCpuResourceCanceled,
102        TaskStatus::HeartbeatProgressCanceled,
103    ]
104    .into_iter()
105    .collect()
106});
107
108fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> {
109    let mut compaction_selectors: HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> =
110        HashMap::default();
111    compaction_selectors.insert(
112        compact_task::TaskType::Dynamic,
113        Box::<DynamicLevelSelector>::default(),
114    );
115    compaction_selectors.insert(
116        compact_task::TaskType::SpaceReclaim,
117        Box::<SpaceReclaimCompactionSelector>::default(),
118    );
119    compaction_selectors.insert(
120        compact_task::TaskType::Ttl,
121        Box::<TtlCompactionSelector>::default(),
122    );
123    compaction_selectors.insert(
124        compact_task::TaskType::Tombstone,
125        Box::<TombstoneCompactionSelector>::default(),
126    );
127    compaction_selectors.insert(
128        compact_task::TaskType::VnodeWatermark,
129        Box::<VnodeWatermarkCompactionSelector>::default(),
130    );
131    compaction_selectors
132}
133
134impl HummockVersionTransaction<'_> {
135    fn apply_compact_task(&mut self, compact_task: &CompactTask) {
136        let mut version_delta = self.new_delta();
137        let trivial_move = compact_task.is_trivial_move_task();
138        version_delta.trivial_move = trivial_move;
139
140        let group_deltas = &mut version_delta
141            .group_deltas
142            .entry(compact_task.compaction_group_id)
143            .or_default()
144            .group_deltas;
145        let mut removed_table_ids_map: BTreeMap<u32, HashSet<HummockSstableId>> =
146            BTreeMap::default();
147
148        for level in &compact_task.input_ssts {
149            let level_idx = level.level_idx;
150
151            removed_table_ids_map
152                .entry(level_idx)
153                .or_default()
154                .extend(level.table_infos.iter().map(|sst| sst.sst_id));
155        }
156
157        for (level_idx, removed_table_ids) in removed_table_ids_map {
158            let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
159                level_idx,
160                0, // default
161                removed_table_ids,
162                vec![], // default
163                0,      // default
164                compact_task.compaction_group_version_id,
165            ));
166
167            group_deltas.push(group_delta);
168        }
169
170        let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
171            compact_task.target_level,
172            compact_task.target_sub_level_id,
173            HashSet::new(), // default
174            compact_task.sorted_output_ssts.clone(),
175            compact_task.split_weight_by_vnode,
176            compact_task.compaction_group_version_id,
177        ));
178
179        group_deltas.push(group_delta);
180        version_delta.pre_apply();
181    }
182}
183
184#[derive(Default)]
185pub struct Compaction {
186    /// Compaction task that is already assigned to a compactor
187    pub compact_task_assignment: BTreeMap<HummockCompactionTaskId, PbCompactTaskAssignment>,
188    /// `CompactStatus` of each compaction group
189    pub compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus>,
190
191    pub _deterministic_mode: bool,
192}
193
194impl HummockManager {
195    pub async fn get_assigned_compact_task_num(&self) -> u64 {
196        self.compaction.read().await.compact_task_assignment.len() as u64
197    }
198
199    pub async fn list_compaction_status(
200        &self,
201    ) -> (Vec<PbCompactStatus>, Vec<CompactTaskAssignment>) {
202        let compaction = self.compaction.read().await;
203        (
204            compaction.compaction_statuses.values().map_into().collect(),
205            compaction
206                .compact_task_assignment
207                .values()
208                .cloned()
209                .collect(),
210        )
211    }
212
213    pub async fn get_compaction_scores(
214        &self,
215        compaction_group_id: CompactionGroupId,
216    ) -> Vec<PickerInfo> {
217        let (status, levels, group) = {
218            let compaction = self.compaction.read().await;
219            let versioning = self.versioning.read().await;
220            let config_manager = self.compaction_group_manager.read().await;
221            match (
222                compaction.compaction_statuses.get(&compaction_group_id),
223                versioning.current_version.levels.get(&compaction_group_id),
224                config_manager.try_get_compaction_group_config(compaction_group_id),
225            ) {
226                (Some(cs), Some(v), Some(cf)) => (cs.to_owned(), v.to_owned(), cf),
227                _ => {
228                    return vec![];
229                }
230            }
231        };
232        let dynamic_level_core = DynamicLevelSelectorCore::new(
233            group.compaction_config,
234            Arc::new(CompactionDeveloperConfig::default()),
235        );
236        let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers);
237        ctx.score_levels
238    }
239}
240
241impl HummockManager {
242    pub fn compaction_event_loop(
243        hummock_manager: Arc<Self>,
244        compactor_streams_change_rx: UnboundedReceiver<(
245            HummockContextId,
246            Streaming<SubscribeCompactionEventRequest>,
247        )>,
248    ) -> Vec<(JoinHandle<()>, Sender<()>)> {
249        let mut join_handle_vec = Vec::default();
250
251        let hummock_compaction_event_handler =
252            HummockCompactionEventHandler::new(hummock_manager.clone());
253
254        let dedicated_event_loop = HummockCompactorDedicatedEventLoop::new(
255            hummock_manager.clone(),
256            hummock_compaction_event_handler.clone(),
257        );
258
259        let (dedicated_event_loop_join_handle, event_tx, shutdown_tx) = dedicated_event_loop.run();
260        join_handle_vec.push((dedicated_event_loop_join_handle, shutdown_tx));
261
262        let hummock_compaction_event_dispatcher = HummockCompactionEventDispatcher::new(
263            hummock_manager.env.opts.clone(),
264            hummock_compaction_event_handler,
265            Some(event_tx),
266        );
267
268        let event_loop = HummockCompactionEventLoop::new(
269            hummock_compaction_event_dispatcher,
270            hummock_manager.metrics.clone(),
271            compactor_streams_change_rx,
272        );
273
274        let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
275        join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
276
277        join_handle_vec
278    }
279
280    pub fn add_compactor_stream(
281        &self,
282        context_id: HummockContextId,
283        req_stream: Streaming<SubscribeCompactionEventRequest>,
284    ) {
285        self.compactor_streams_change_tx
286            .send((context_id, req_stream))
287            .unwrap();
288    }
289}
290
291impl HummockManager {
292    /// Gets one compaction task id with best-effort batching while ensuring concurrent callers
293    /// share the same refill instead of wasting an allocated range.
294    async fn next_compaction_task_id_with_prefetch(&self, refill_capacity: u32) -> Result<u64> {
295        self.prefetched_compaction_task_ids
296            .next(refill_capacity, |count| async move {
297                self.env
298                    .hummock_seq
299                    .next_interval(COMPACTION_TASK_ID, count)
300                    .await
301            })
302            .await
303    }
304
305    pub async fn get_compact_tasks_impl(
306        &self,
307        compaction_groups: Vec<CompactionGroupId>,
308        max_select_count: usize,
309        selector: &mut dyn CompactionSelector,
310    ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
311        let deterministic_mode = self.env.opts.compaction_deterministic_test;
312
313        let mut compaction_guard = self.compaction.write().await;
314        let compaction: &mut Compaction = &mut compaction_guard;
315        let mut versioning_guard = self.versioning.write().await;
316        let versioning: &mut Versioning = &mut versioning_guard;
317
318        let _timer = start_measure_real_process_timer!(self, "get_compact_tasks_impl");
319
320        let start_time = Instant::now();
321        let mut compaction_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
322
323        let mut compact_task_assignment =
324            BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
325
326        let mut version = HummockVersionTransaction::new(
327            &mut versioning.current_version,
328            &mut versioning.hummock_version_deltas,
329            &mut versioning.table_change_log,
330            self.env.notification_manager(),
331            None,
332            &self.metrics,
333            &self.env.opts,
334        );
335        // Apply stats changes.
336        let mut version_stats = HummockVersionStatsTransaction::new(
337            &mut versioning.version_stats,
338            self.env.notification_manager(),
339        );
340
341        if deterministic_mode {
342            version.disable_apply_to_txn();
343        }
344        let all_versioned_table_schemas = if self.env.opts.enable_dropped_column_reclaim {
345            self.metadata_manager
346                .catalog_controller
347                .get_versioned_table_schemas()
348                .await
349                .map_err(|e| Error::Internal(e.into()))?
350        } else {
351            HashMap::default()
352        };
353        let mut unschedule_groups = vec![];
354        let mut trivial_tasks = vec![];
355        let mut pick_tasks = vec![];
356        let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts(
357            &self.env.opts,
358        ));
359        // Reuse prefetched task ids from previous loops.
360        // Each group consumes at most one task_id (trivial tasks share the same id with normal
361        // task). When prefetched ids are exhausted, refill in fixed-size chunks to avoid
362        // per-group SQL transactions while keeping the in-memory cache small.
363        'outside: for compaction_group_id in compaction_groups {
364            if pick_tasks.len() >= max_select_count {
365                break;
366            }
367
368            if !version
369                .latest_version()
370                .levels
371                .contains_key(&compaction_group_id)
372            {
373                continue;
374            }
375
376            // When the last table of a compaction group is deleted, the compaction group (and its
377            // config) is destroyed as well. Then a compaction task for this group may come later and
378            // cannot find its config.
379            let group_config = {
380                let config_manager = self.compaction_group_manager.read().await;
381
382                match config_manager.try_get_compaction_group_config(compaction_group_id) {
383                    Some(config) => config,
384                    None => continue,
385                }
386            };
387
388            // Use prefetched task id if available; when exhausted, refill in chunks first.
389            let task_id = self
390                .next_compaction_task_id_with_prefetch(
391                    self.env.opts.compaction_task_id_refill_capacity,
392                )
393                .await?;
394
395            if !compaction_statuses.contains_key(&compaction_group_id) {
396                // lazy initialize.
397                compaction_statuses.insert(
398                    compaction_group_id,
399                    CompactStatus::new(
400                        compaction_group_id,
401                        group_config.compaction_config.max_level,
402                    ),
403                );
404            }
405            let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap();
406
407            let can_trivial_move = matches!(selector.task_type(), TaskType::Dynamic)
408                || matches!(selector.task_type(), TaskType::Emergency);
409
410            let mut stats = LocalSelectorStatistic::default();
411            let member_table_ids: Vec<_> = version
412                .latest_version()
413                .state_table_info
414                .compaction_group_member_table_ids(compaction_group_id)
415                .iter()
416                .copied()
417                .collect();
418
419            let mut table_id_to_option: HashMap<TableId, _> = HashMap::default();
420
421            {
422                let guard = self.table_id_to_table_option.read();
423                for table_id in &member_table_ids {
424                    if let Some(opts) = guard.get(table_id) {
425                        table_id_to_option.insert(*table_id, *opts);
426                    }
427                }
428            }
429
430            while let Some(compact_task) = compact_status.get_compact_task(
431                version
432                    .latest_version()
433                    .get_compaction_group_levels(compaction_group_id),
434                version
435                    .latest_version()
436                    .state_table_info
437                    .compaction_group_member_table_ids(compaction_group_id),
438                task_id as HummockCompactionTaskId,
439                &group_config,
440                &mut stats,
441                selector,
442                &table_id_to_option,
443                developer_config.clone(),
444                &version.latest_version().table_watermarks,
445                &version.latest_version().state_table_info,
446            ) {
447                let target_level_id = compact_task.input.target_level as u32;
448                let compaction_group_version_id = version
449                    .latest_version()
450                    .get_compaction_group_levels(compaction_group_id)
451                    .compaction_group_version_id;
452                let compression_algorithm = match compact_task.compression_algorithm.as_str() {
453                    "Lz4" => 1,
454                    "Zstd" => 2,
455                    _ => 0,
456                };
457                let vnode_partition_count = compact_task.input.vnode_partition_count;
458                let mut compact_task = CompactTask {
459                    input_ssts: compact_task.input.input_levels,
460                    splits: vec![KeyRange::inf()],
461                    sorted_output_ssts: vec![],
462                    task_id,
463                    target_level: target_level_id,
464                    // only gc delete keys in last level because there may be older version in more bottom
465                    // level.
466                    gc_delete_keys: version
467                        .latest_version()
468                        .get_compaction_group_levels(compaction_group_id)
469                        .is_last_level(target_level_id),
470                    base_level: compact_task.base_level as u32,
471                    task_status: TaskStatus::Pending,
472                    compaction_group_id: group_config.group_id,
473                    compaction_group_version_id,
474                    existing_table_ids: member_table_ids.clone(),
475                    compression_algorithm,
476                    target_file_size: compact_task.target_file_size,
477                    table_options: table_id_to_option
478                        .iter()
479                        .map(|(table_id, table_option)| {
480                            (*table_id, TableOption::from(table_option))
481                        })
482                        .collect(),
483                    current_epoch_time: Epoch::now().0,
484                    compaction_filter_mask: group_config.compaction_config.compaction_filter_mask,
485                    target_sub_level_id: compact_task.input.target_sub_level_id,
486                    task_type: compact_task.compaction_task_type,
487                    split_weight_by_vnode: vnode_partition_count,
488                    max_sub_compaction: group_config.compaction_config.max_sub_compaction,
489                    max_kv_count_for_xor16: group_config.compaction_config.max_kv_count_for_xor16,
490                    max_vnode_key_range_bytes: group_config
491                        .compaction_config
492                        .max_vnode_key_range_bytes,
493                    ..Default::default()
494                };
495
496                let is_trivial_reclaim = compact_task.is_trivial_reclaim();
497                let is_trivial_move = compact_task.is_trivial_move_task();
498                if is_trivial_reclaim || (is_trivial_move && can_trivial_move) {
499                    let log_label = if is_trivial_reclaim {
500                        "TrivialReclaim"
501                    } else {
502                        "TrivialMove"
503                    };
504                    let label = if is_trivial_reclaim {
505                        "trivial-space-reclaim"
506                    } else {
507                        "trivial-move"
508                    };
509
510                    tracing::debug!(
511                        "{} for compaction group {}: input: {:?}, cost time: {:?}",
512                        log_label,
513                        compact_task.compaction_group_id,
514                        compact_task.input_ssts,
515                        start_time.elapsed()
516                    );
517                    compact_task.task_status = TaskStatus::Success;
518                    compact_status.report_compact_task(&compact_task);
519                    if !is_trivial_reclaim {
520                        compact_task
521                            .sorted_output_ssts
522                            .clone_from(&compact_task.input_ssts[0].table_infos);
523                    }
524                    update_table_stats_for_vnode_watermark_trivial_reclaim(
525                        &mut version_stats.table_stats,
526                        &compact_task,
527                    );
528                    self.metrics
529                        .compact_frequency
530                        .with_label_values(&[
531                            label,
532                            &compact_task.compaction_group_id.to_string(),
533                            selector.task_type().as_str_name(),
534                            "SUCCESS",
535                        ])
536                        .inc();
537
538                    version.apply_compact_task(&compact_task);
539                    trivial_tasks.push(compact_task);
540                    if trivial_tasks.len() >= self.env.opts.max_trivial_move_task_count_per_loop {
541                        break 'outside;
542                    }
543                } else {
544                    self.calculate_vnode_partition(
545                        &mut compact_task,
546                        group_config.compaction_config.as_ref(),
547                    );
548
549                    let table_ids_to_be_compacted = compact_task.build_compact_table_ids();
550
551                    let mut pk_prefix_table_watermarks = BTreeMap::default();
552                    let mut non_pk_prefix_table_watermarks = BTreeMap::default();
553                    let mut value_table_watermarks = BTreeMap::default();
554                    for (table_id, watermark) in version
555                        .latest_version()
556                        .safe_epoch_table_watermarks(&table_ids_to_be_compacted)
557                    {
558                        match watermark.watermark_type {
559                            WatermarkSerdeType::PkPrefix => {
560                                pk_prefix_table_watermarks.insert(table_id, watermark);
561                            }
562                            WatermarkSerdeType::NonPkPrefix => {
563                                non_pk_prefix_table_watermarks.insert(table_id, watermark);
564                            }
565                            WatermarkSerdeType::Value => {
566                                value_table_watermarks.insert(table_id, watermark);
567                            }
568                        }
569                    }
570                    compact_task.pk_prefix_table_watermarks = pk_prefix_table_watermarks;
571                    compact_task.non_pk_prefix_table_watermarks = non_pk_prefix_table_watermarks;
572                    compact_task.value_table_watermarks = value_table_watermarks;
573
574                    compact_task.table_schemas = compact_task
575                        .existing_table_ids
576                        .iter()
577                        .filter_map(|table_id| {
578                            all_versioned_table_schemas.get(table_id).map(|column_ids| {
579                                (
580                                    *table_id,
581                                    TableSchema {
582                                        column_ids: column_ids.clone(),
583                                    },
584                                )
585                            })
586                        })
587                        .collect();
588
589                    compact_task_assignment.insert(
590                        compact_task.task_id,
591                        CompactTaskAssignment {
592                            compact_task: Some(compact_task.clone().into()),
593                            context_id: META_NODE_ID, // deprecated
594                        },
595                    );
596
597                    pick_tasks.push(compact_task);
598                    break;
599                }
600
601                stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
602                stats = LocalSelectorStatistic::default();
603            }
604            if pick_tasks
605                .last()
606                .map(|task| task.compaction_group_id != compaction_group_id)
607                .unwrap_or(true)
608            {
609                unschedule_groups.push(compaction_group_id);
610            }
611            stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
612        }
613
614        if !trivial_tasks.is_empty() {
615            commit_multi_var!(
616                self.meta_store_ref(),
617                compaction_statuses,
618                compact_task_assignment,
619                version,
620                version_stats
621            )?;
622            self.metrics
623                .compact_task_batch_count
624                .with_label_values(&["batch_trivial_move"])
625                .observe(trivial_tasks.len() as f64);
626
627            for trivial_task in &trivial_tasks {
628                self.metrics
629                    .compact_task_trivial_move_sst_count
630                    .with_label_values(&[&trivial_task.compaction_group_id.to_string()])
631                    .observe(trivial_task.input_ssts[0].table_infos.len() as _);
632            }
633
634            drop(versioning_guard);
635        } else {
636            // We are using a single transaction to ensure that each task has progress when it is
637            // created.
638            drop(versioning_guard);
639            commit_multi_var!(
640                self.meta_store_ref(),
641                compaction_statuses,
642                compact_task_assignment
643            )?;
644        }
645        drop(compaction_guard);
646        if !pick_tasks.is_empty() {
647            self.metrics
648                .compact_task_batch_count
649                .with_label_values(&["batch_get_compact_task"])
650                .observe(pick_tasks.len() as f64);
651        }
652
653        for compact_task in &mut pick_tasks {
654            let compaction_group_id = compact_task.compaction_group_id;
655
656            // Initiate heartbeat for the task to track its progress.
657            self.compactor_manager
658                .initiate_task_heartbeat(compact_task.clone());
659
660            // this task has been finished.
661            compact_task.task_status = TaskStatus::Pending;
662            let compact_task_statistics = statistics_compact_task(compact_task);
663
664            let level_type_label = build_compact_task_level_type_metrics_label(
665                compact_task.input_ssts[0].level_idx as usize,
666                compact_task.input_ssts.last().unwrap().level_idx as usize,
667            );
668
669            let level_count = compact_task.input_ssts.len();
670            if compact_task.input_ssts[0].level_idx == 0 {
671                self.metrics
672                    .l0_compact_level_count
673                    .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
674                    .observe(level_count as _);
675            }
676
677            self.metrics
678                .compact_task_size
679                .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
680                .observe(compact_task_statistics.total_file_size as _);
681
682            self.metrics
683                .compact_task_size
684                .with_label_values(&[
685                    &compaction_group_id.to_string(),
686                    &format!("{} uncompressed", level_type_label),
687                ])
688                .observe(compact_task_statistics.total_uncompressed_file_size as _);
689
690            self.metrics
691                .compact_task_file_count
692                .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
693                .observe(compact_task_statistics.total_file_count as _);
694
695            tracing::trace!(
696                "For compaction group {}: pick up {} {} sub_level in level {} to compact to target {}. cost time: {:?} compact_task_statistics {:?}",
697                compaction_group_id,
698                level_count,
699                compact_task.input_ssts[0].level_type.as_str_name(),
700                compact_task.input_ssts[0].level_idx,
701                compact_task.target_level,
702                start_time.elapsed(),
703                compact_task_statistics
704            );
705        }
706
707        #[cfg(test)]
708        {
709            self.check_state_consistency().await;
710        }
711        pick_tasks.extend(trivial_tasks);
712        Ok((pick_tasks, unschedule_groups))
713    }
714
715    /// Cancels a compaction task no matter it's assigned or unassigned.
716    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
717        fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore(
718            anyhow::anyhow!("failpoint metastore err")
719        )));
720        let ret = self
721            .cancel_compact_task_impl(vec![task_id], task_status)
722            .await?;
723        Ok(ret[0])
724    }
725
726    pub async fn cancel_compact_tasks(
727        &self,
728        tasks: Vec<u64>,
729        task_status: TaskStatus,
730    ) -> Result<Vec<bool>> {
731        self.cancel_compact_task_impl(tasks, task_status).await
732    }
733
734    async fn cancel_compact_task_impl(
735        &self,
736        task_ids: Vec<u64>,
737        task_status: TaskStatus,
738    ) -> Result<Vec<bool>> {
739        assert!(CANCEL_STATUS_SET.contains(&task_status));
740        let tasks = task_ids
741            .into_iter()
742            .map(|task_id| ReportTask {
743                task_id,
744                task_status,
745                sorted_output_ssts: vec![],
746                table_stats_change: HashMap::default(),
747                object_timestamps: HashMap::default(),
748            })
749            .collect_vec();
750        let rets = self.report_compact_tasks(tasks).await?;
751        #[cfg(test)]
752        {
753            self.check_state_consistency().await;
754        }
755        Ok(rets)
756    }
757
758    async fn get_compact_tasks(
759        &self,
760        mut compaction_groups: Vec<CompactionGroupId>,
761        max_select_count: usize,
762        selector: &mut dyn CompactionSelector,
763    ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
764        fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
765            anyhow::anyhow!("failpoint metastore error")
766        )));
767        compaction_groups.shuffle(&mut thread_rng());
768        let (mut tasks, groups) = self
769            .get_compact_tasks_impl(compaction_groups, max_select_count, selector)
770            .await?;
771        tasks.retain(|task| {
772            if task.task_status == TaskStatus::Success {
773                debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
774                false
775            } else {
776                true
777            }
778        });
779        Ok((tasks, groups))
780    }
781
782    pub async fn get_compact_task(
783        &self,
784        compaction_group_id: CompactionGroupId,
785        selector: &mut dyn CompactionSelector,
786    ) -> Result<Option<CompactTask>> {
787        fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
788            anyhow::anyhow!("failpoint metastore error")
789        )));
790
791        let (normal_tasks, _) = self
792            .get_compact_tasks_impl(vec![compaction_group_id], 1, selector)
793            .await?;
794        for task in normal_tasks {
795            if task.task_status != TaskStatus::Success {
796                return Ok(Some(task));
797            }
798            debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
799        }
800        Ok(None)
801    }
802
803    pub async fn manual_get_compact_task(
804        &self,
805        compaction_group_id: CompactionGroupId,
806        manual_compaction_option: ManualCompactionOption,
807    ) -> Result<Option<CompactTask>> {
808        let (task, _) = self
809            .manual_get_compact_task_with_info(compaction_group_id, manual_compaction_option)
810            .await?;
811        Ok(task)
812    }
813
814    pub async fn manual_get_compact_task_with_info(
815        &self,
816        compaction_group_id: CompactionGroupId,
817        manual_compaction_option: ManualCompactionOption,
818    ) -> Result<(Option<CompactTask>, bool)> {
819        let mut selector = ManualCompactionSelector::new(manual_compaction_option);
820        let task = self
821            .get_compact_task(compaction_group_id, &mut selector)
822            .await?;
823        Ok((task, selector.blocked_by_pending()))
824    }
825
826    pub async fn report_compact_task(
827        &self,
828        task_id: u64,
829        task_status: TaskStatus,
830        sorted_output_ssts: Vec<SstableInfo>,
831        table_stats_change: Option<PbTableStatsMap>,
832        object_timestamps: HashMap<HummockSstableObjectId, u64>,
833    ) -> Result<bool> {
834        let rets = self
835            .report_compact_tasks(vec![ReportTask {
836                task_id,
837                task_status,
838                sorted_output_ssts,
839                table_stats_change: table_stats_change.unwrap_or_default(),
840                object_timestamps,
841            }])
842            .await?;
843        Ok(rets[0])
844    }
845
846    pub async fn report_compact_tasks(&self, report_tasks: Vec<ReportTask>) -> Result<Vec<bool>> {
847        let compaction_guard = self.compaction.write().await;
848        let versioning_guard = self.versioning.write().await;
849
850        self.report_compact_tasks_impl(report_tasks, compaction_guard, versioning_guard)
851            .await
852    }
853
854    /// Finishes or cancels a compaction task, according to `task_status`.
855    ///
856    /// If `context_id` is not None, its validity will be checked when writing meta store.
857    /// Its ownership of the task is checked as well.
858    ///
859    /// Return Ok(false) indicates either the task is not found,
860    /// or the task is not owned by `context_id` when `context_id` is not None.
861    pub async fn report_compact_tasks_impl(
862        &self,
863        report_tasks: Vec<ReportTask>,
864        mut compaction_guard: RwLockWriteGuard<'_, Compaction>,
865        mut versioning_guard: RwLockWriteGuard<'_, Versioning>,
866    ) -> Result<Vec<bool>> {
867        let deterministic_mode = self.env.opts.compaction_deterministic_test;
868        let compaction: &mut Compaction = &mut compaction_guard;
869        let start_time = Instant::now();
870        let original_keys = compaction.compaction_statuses.keys().cloned().collect_vec();
871        let mut compact_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
872        let mut rets = vec![false; report_tasks.len()];
873        let mut compact_task_assignment =
874            BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
875        // The compaction task is finished.
876        let versioning: &mut Versioning = &mut versioning_guard;
877        let _timer = start_measure_real_process_timer!(self, "report_compact_tasks");
878
879        // purge stale compact_status
880        for group_id in original_keys {
881            if !versioning.current_version.levels.contains_key(&group_id) {
882                compact_statuses.remove(group_id);
883            }
884        }
885        let mut tasks = vec![];
886
887        let mut version = HummockVersionTransaction::new(
888            &mut versioning.current_version,
889            &mut versioning.hummock_version_deltas,
890            &mut versioning.table_change_log,
891            self.env.notification_manager(),
892            None,
893            &self.metrics,
894            &self.env.opts,
895        );
896
897        if deterministic_mode {
898            version.disable_apply_to_txn();
899        }
900
901        let mut version_stats = HummockVersionStatsTransaction::new(
902            &mut versioning.version_stats,
903            self.env.notification_manager(),
904        );
905        let mut success_count = 0;
906        let mut report_results = Vec::with_capacity(rets.len());
907        for (idx, task) in report_tasks.into_iter().enumerate() {
908            rets[idx] = true;
909            let task_id = task.task_id;
910            let mut task_status = task.task_status;
911            let mut compact_task = match compact_task_assignment.remove(task.task_id) {
912                Some(compact_task) => CompactTask::from(compact_task.compact_task.unwrap()),
913                None => {
914                    tracing::warn!("{}", format!("compact task {} not found", task.task_id));
915                    rets[idx] = false;
916                    report_results.push(CompactionTaskReportResult {
917                        task_id,
918                        task_status,
919                        reported: false,
920                    });
921                    continue;
922                }
923            };
924
925            {
926                // apply result
927                compact_task.task_status = task.task_status;
928                compact_task.sorted_output_ssts = task.sorted_output_ssts;
929            }
930
931            match compact_statuses.get_mut(compact_task.compaction_group_id) {
932                Some(mut compact_status) => {
933                    compact_status.report_compact_task(&compact_task);
934                }
935                None => {
936                    // When the group_id is not found in the compaction_statuses, it means the group has been removed.
937                    // The task is invalid and should be canceled.
938                    // e.g.
939                    // 1. The group is removed by the user unregistering the tables
940                    // 2. The group is removed by the group scheduling algorithm
941                    compact_task.task_status = TaskStatus::InvalidGroupCanceled;
942                }
943            }
944
945            let is_success = if let TaskStatus::Success = compact_task.task_status {
946                match self
947                    .report_compaction_sanity_check(&task.object_timestamps)
948                    .await
949                {
950                    Err(e) => {
951                        warn!(
952                            "failed to commit compaction task {} {}",
953                            compact_task.task_id,
954                            e.as_report()
955                        );
956                        compact_task.task_status = TaskStatus::RetentionTimeRejected;
957                        false
958                    }
959                    _ => {
960                        let group = version
961                            .latest_version()
962                            .levels
963                            .get(&compact_task.compaction_group_id)
964                            .unwrap();
965                        let is_expired = compact_task.is_expired(group.compaction_group_version_id);
966                        if is_expired {
967                            compact_task.task_status = TaskStatus::InputOutdatedCanceled;
968                            warn!(
969                                "The task may be expired because of group split, task:\n {:?}",
970                                compact_task_to_string(&compact_task)
971                            );
972                        }
973                        !is_expired
974                    }
975                }
976            } else {
977                false
978            };
979            if is_success {
980                success_count += 1;
981                version.apply_compact_task(&compact_task);
982                if purge_prost_table_stats(
983                    &mut version_stats.table_stats,
984                    version.latest_version(),
985                    &HashSet::default(),
986                ) {
987                    self.metrics.version_stats.reset();
988                    versioning.local_metrics.clear();
989                }
990                add_prost_table_stats_map(&mut version_stats.table_stats, &task.table_stats_change);
991                trigger_local_table_stat(
992                    &self.metrics,
993                    &mut versioning.local_metrics,
994                    &version_stats,
995                    &task.table_stats_change,
996                );
997            }
998            task_status = compact_task.task_status;
999            report_results.push(CompactionTaskReportResult {
1000                task_id,
1001                task_status,
1002                reported: rets[idx],
1003            });
1004            tasks.push(compact_task);
1005        }
1006        if success_count > 0 {
1007            commit_multi_var!(
1008                self.meta_store_ref(),
1009                compact_statuses,
1010                compact_task_assignment,
1011                version,
1012                version_stats
1013            )?;
1014
1015            self.metrics
1016                .compact_task_batch_count
1017                .with_label_values(&["batch_report_task"])
1018                .observe(success_count as f64);
1019        } else {
1020            // The compaction task is cancelled or failed.
1021            commit_multi_var!(
1022                self.meta_store_ref(),
1023                compact_statuses,
1024                compact_task_assignment
1025            )?;
1026        }
1027
1028        self.notify_compaction_task_report_waiters(report_results);
1029
1030        let mut success_groups = vec![];
1031        for compact_task in &tasks {
1032            self.compactor_manager
1033                .remove_task_heartbeat(compact_task.task_id);
1034            tracing::trace!(
1035                "Reported compaction task. {}. cost time: {:?}",
1036                compact_task_to_string(compact_task),
1037                start_time.elapsed(),
1038            );
1039
1040            if !deterministic_mode
1041                && (matches!(compact_task.task_type, compact_task::TaskType::Dynamic)
1042                    || matches!(compact_task.task_type, compact_task::TaskType::Emergency))
1043            {
1044                // only try send Dynamic compaction
1045                self.try_send_compaction_request(
1046                    compact_task.compaction_group_id,
1047                    compact_task::TaskType::Dynamic,
1048                );
1049            }
1050
1051            if compact_task.task_status == TaskStatus::Success {
1052                success_groups.push(compact_task.compaction_group_id);
1053            }
1054        }
1055
1056        trigger_compact_tasks_stat(
1057            &self.metrics,
1058            &tasks,
1059            &compaction.compaction_statuses,
1060            &versioning_guard.current_version,
1061        );
1062        drop(versioning_guard);
1063        if !success_groups.is_empty() {
1064            self.try_update_write_limits(&success_groups).await;
1065        }
1066        Ok(rets)
1067    }
1068
1069    /// Triggers compacitons to specified compaction groups.
1070    /// Don't wait for compaction finish
1071    pub async fn trigger_compaction_deterministic(
1072        &self,
1073        _base_version_id: HummockVersionId,
1074        compaction_groups: Vec<CompactionGroupId>,
1075    ) -> Result<()> {
1076        self.on_current_version(|old_version| {
1077            tracing::info!(
1078                "Trigger compaction for version {}, groups {:?}",
1079                old_version.id,
1080                compaction_groups
1081            );
1082        })
1083        .await;
1084
1085        if compaction_groups.is_empty() {
1086            return Ok(());
1087        }
1088        for compaction_group in compaction_groups {
1089            self.try_send_compaction_request(compaction_group, compact_task::TaskType::Dynamic);
1090        }
1091        Ok(())
1092    }
1093
1094    pub async fn trigger_manual_compaction(
1095        &self,
1096        compaction_group: CompactionGroupId,
1097        manual_compaction_option: ManualCompactionOption,
1098    ) -> Result<ManualCompactionTriggerResult> {
1099        let start_time = Instant::now();
1100        let exclusive = manual_compaction_option.exclusive;
1101
1102        // 1. Get idle compactor.
1103        let compactor = match self.compactor_manager.next_compactor() {
1104            Some(compactor) => compactor,
1105            None => {
1106                tracing::warn!("trigger_manual_compaction No compactor is available.");
1107                return Err(anyhow::anyhow!(
1108                    "trigger_manual_compaction No compactor is available. compaction_group {}",
1109                    compaction_group
1110                )
1111                .into());
1112            }
1113        };
1114
1115        // 2. Get manual compaction task.
1116        let compact_task = self
1117            .manual_get_compact_task_with_info(compaction_group, manual_compaction_option)
1118            .await;
1119        let (compact_task, blocked_by_pending) = match compact_task {
1120            Ok((compact_task, blocked_by_pending)) => (compact_task, blocked_by_pending),
1121            Err(err) => {
1122                tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
1123
1124                return Err(anyhow::anyhow!(err)
1125                    .context(format!(
1126                        "Failed to get compaction task for compaction_group {}",
1127                        compaction_group,
1128                    ))
1129                    .into());
1130            }
1131        };
1132        let compact_task = match compact_task {
1133            Some(compact_task) => compact_task,
1134            None => {
1135                if exclusive && blocked_by_pending {
1136                    return Ok(ManualCompactionTriggerResult::Retry);
1137                }
1138                // No compaction task available.
1139                return Err(anyhow::anyhow!(
1140                    "trigger_manual_compaction No compaction_task is available. compaction_group {}",
1141                    compaction_group
1142                )
1143                .into());
1144            }
1145        };
1146
1147        // 3. send task to compactor
1148        let task_id = compact_task.task_id;
1149        let compact_task_string = compact_task_to_string(&compact_task);
1150        tracing::info!(
1151            compact_task_string,
1152            duration = ?start_time.elapsed(),
1153            "Triggered manual compaction task."
1154        );
1155
1156        let report_rx = self.register_compaction_task_report_waiter(task_id);
1157        if let Err(err) = compactor
1158            .send_event(ResponseEvent::CompactTask(compact_task.into()))
1159            .with_context(|| {
1160                format!(
1161                    "Failed to trigger compaction task for compaction_group {}",
1162                    compaction_group,
1163                )
1164            })
1165        {
1166            self.remove_compaction_task_report_waiter(task_id);
1167            return Err(err.into());
1168        }
1169
1170        let report_result = match report_rx.await {
1171            Ok(result) => result,
1172            Err(_) => {
1173                self.remove_compaction_task_report_waiter(task_id);
1174                return Err(anyhow::anyhow!(
1175                    "trigger_manual_compaction wait report failed. compaction_group {}",
1176                    compaction_group
1177                )
1178                .into());
1179            }
1180        };
1181        if !report_result.reported {
1182            return Err(anyhow::anyhow!(
1183                "trigger_manual_compaction report not accepted. task_id {}",
1184                report_result.task_id
1185            )
1186            .into());
1187        }
1188
1189        if report_result.task_status == TaskStatus::NoAvailCpuResourceCanceled
1190            || report_result.task_status == TaskStatus::NoAvailMemoryResourceCanceled
1191        {
1192            return Ok(ManualCompactionTriggerResult::Retry);
1193        }
1194
1195        tracing::info!(
1196            ?report_result,
1197            duration = ?start_time.elapsed(),
1198            "Completed manual compaction task."
1199        );
1200
1201        Ok(ManualCompactionTriggerResult::Submitted)
1202    }
1203
1204    /// Sends a compaction request for new data (clears cooldown).
1205    pub fn try_send_compaction_request(
1206        &self,
1207        compaction_group: CompactionGroupId,
1208        task_type: compact_task::TaskType,
1209    ) -> bool {
1210        self.compaction_state.try_sched_compaction(
1211            compaction_group,
1212            task_type,
1213            ScheduleTrigger::NewData,
1214        )
1215    }
1216
1217    /// Apply `split_weight_by_vnode` based partition strategy.
1218    /// This handles dynamic partitioning based on table size and write throughput.
1219    fn apply_split_weight_by_vnode_partition(
1220        &self,
1221        compact_task: &mut CompactTask,
1222        compaction_config: &CompactionConfig,
1223    ) {
1224        if compaction_config.split_weight_by_vnode > 0 {
1225            for table_id in &compact_task.existing_table_ids {
1226                compact_task
1227                    .table_vnode_partition
1228                    .insert(*table_id, compact_task.split_weight_by_vnode);
1229            }
1230
1231            return;
1232        }
1233
1234        // Calculate per-table size from input SSTs
1235        let mut table_size_info: HashMap<TableId, u64> = HashMap::default();
1236        let mut existing_table_ids: HashSet<TableId> = HashSet::default();
1237        for input_ssts in &compact_task.input_ssts {
1238            for sst in &input_ssts.table_infos {
1239                existing_table_ids.extend(sst.table_ids.iter());
1240                for table_id in &sst.table_ids {
1241                    *table_size_info.entry(*table_id).or_default() +=
1242                        sst.sst_size / (sst.table_ids.len() as u64);
1243                }
1244            }
1245        }
1246        compact_task
1247            .existing_table_ids
1248            .retain(|table_id| existing_table_ids.contains(table_id));
1249
1250        let hybrid_vnode_count = self.env.opts.hybrid_partition_node_count;
1251        let default_partition_count = self.env.opts.partition_vnode_count;
1252        let compact_task_table_size_partition_threshold_low = self
1253            .env
1254            .opts
1255            .compact_task_table_size_partition_threshold_low;
1256        let compact_task_table_size_partition_threshold_high = self
1257            .env
1258            .opts
1259            .compact_task_table_size_partition_threshold_high;
1260
1261        // Check latest write throughput
1262        let table_write_throughput_statistic_manager =
1263            self.table_write_throughput_statistic_manager.read();
1264        let timestamp = chrono::Utc::now().timestamp();
1265
1266        for (table_id, compact_table_size) in table_size_info {
1267            let write_throughput = table_write_throughput_statistic_manager
1268                .get_table_throughput_descending(table_id, timestamp)
1269                .peekable()
1270                .peek()
1271                .map(|item| item.throughput)
1272                .unwrap_or(0);
1273
1274            if compact_table_size > compact_task_table_size_partition_threshold_high
1275                && default_partition_count > 0
1276            {
1277                compact_task
1278                    .table_vnode_partition
1279                    .insert(table_id, default_partition_count);
1280            } else if (compact_table_size > compact_task_table_size_partition_threshold_low
1281                || (write_throughput > self.env.opts.table_high_write_throughput_threshold
1282                    && compact_table_size > compaction_config.target_file_size_base))
1283                && hybrid_vnode_count > 0
1284            {
1285                compact_task
1286                    .table_vnode_partition
1287                    .insert(table_id, hybrid_vnode_count);
1288            } else if compact_table_size > compaction_config.target_file_size_base {
1289                compact_task.table_vnode_partition.insert(table_id, 1);
1290            }
1291        }
1292
1293        compact_task
1294            .table_vnode_partition
1295            .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id));
1296    }
1297
1298    pub(crate) fn calculate_vnode_partition(
1299        &self,
1300        compact_task: &mut CompactTask,
1301        compaction_config: &CompactionConfig,
1302    ) {
1303        // Do not split sst by vnode partition when target_level > base_level
1304        // The purpose of data alignment is mainly to improve the parallelism of base level compaction
1305        // and reduce write amplification. However, at high level, the size of the sst file is often
1306        // larger and only contains the data of a single table_id, so there is no need to cut it.
1307        if compact_task.target_level > compact_task.base_level {
1308            return;
1309        }
1310
1311        // Apply split_weight_by_vnode based partition strategy
1312        self.apply_split_weight_by_vnode_partition(compact_task, compaction_config);
1313    }
1314
1315    pub fn compactor_manager_ref(&self) -> crate::hummock::CompactorManagerRef {
1316        self.compactor_manager.clone()
1317    }
1318
1319    fn register_compaction_task_report_waiter(
1320        &self,
1321        task_id: HummockCompactionTaskId,
1322    ) -> Receiver<CompactionTaskReportResult> {
1323        let (tx, rx) = tokio::sync::oneshot::channel();
1324        self.compaction_task_report_notifiers
1325            .lock()
1326            .register(task_id, tx);
1327        rx
1328    }
1329
1330    fn remove_compaction_task_report_waiter(&self, task_id: HummockCompactionTaskId) {
1331        self.compaction_task_report_notifiers.lock().remove(task_id);
1332    }
1333
1334    fn notify_compaction_task_report_waiters(&self, results: Vec<CompactionTaskReportResult>) {
1335        let mut guard = self.compaction_task_report_notifiers.lock();
1336        for result in results {
1337            guard.notify(result);
1338        }
1339    }
1340}
1341
1342#[cfg(any(test, feature = "test"))]
1343impl HummockManager {
1344    pub async fn compaction_task_from_assignment_for_test(
1345        &self,
1346        task_id: u64,
1347    ) -> Option<CompactTaskAssignment> {
1348        let compaction_guard = self.compaction.read().await;
1349        let assignment_ref = &compaction_guard.compact_task_assignment;
1350        assignment_ref.get(&task_id).cloned()
1351    }
1352
1353    pub async fn report_compact_task_for_test(
1354        &self,
1355        task_id: u64,
1356        compact_task: Option<CompactTask>,
1357        task_status: TaskStatus,
1358        sorted_output_ssts: Vec<SstableInfo>,
1359        table_stats_change: Option<PbTableStatsMap>,
1360    ) -> Result<()> {
1361        if let Some(task) = compact_task {
1362            let mut guard = self.compaction.write().await;
1363            guard.compact_task_assignment.insert(
1364                task_id,
1365                CompactTaskAssignment {
1366                    compact_task: Some(task.into()),
1367                    context_id: 0.into(),
1368                },
1369            );
1370        }
1371
1372        // In the test, the contents of the compact task may have been modified directly, while the contents of compact_task_assignment were not modified.
1373        // So we pass the modified compact_task directly into the `report_compact_task_impl`
1374        self.report_compact_tasks(vec![ReportTask {
1375            task_id,
1376            task_status,
1377            sorted_output_ssts,
1378            table_stats_change: table_stats_change.unwrap_or_default(),
1379            object_timestamps: HashMap::default(),
1380        }])
1381        .await?;
1382        Ok(())
1383    }
1384}
1385
1386/// What triggered the compaction schedule request.
1387#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1388pub enum ScheduleTrigger {
1389    /// New data arrived (e.g., `commit_epoch`). Clears cooldown.
1390    NewData,
1391    /// Periodic timer. Respects cooldown for Dynamic type.
1392    Periodic,
1393}
1394
1395/// A point-in-time snapshot of the compaction schedule state.
1396///
1397/// `snapshot_time` is used by `unschedule()` to detect whether new data arrived
1398/// after the snapshot was taken, preventing incorrect cooldown.
1399pub struct CompactionScheduleSnapshot {
1400    scheduled: HashSet<(CompactionGroupId, compact_task::TaskType)>,
1401    snapshot_time: Instant,
1402}
1403
1404impl CompactionScheduleSnapshot {
1405    /// Task type priority order for scheduling (checked first = higher priority).
1406    const TASK_TYPE_PRIORITY: &[TaskType] = &[
1407        TaskType::Dynamic,
1408        TaskType::SpaceReclaim,
1409        TaskType::Ttl,
1410        TaskType::Tombstone,
1411        TaskType::VnodeWatermark,
1412    ];
1413
1414    pub fn snapshot_time(&self) -> Instant {
1415        self.snapshot_time
1416    }
1417
1418    /// Pick compaction groups and task type from this snapshot.
1419    ///
1420    /// Returns groups in shuffled order. Non-Dynamic types have higher priority
1421    /// and return a single group; Dynamic groups are batched together.
1422    pub fn pick_compaction_groups_and_type(&self) -> Option<(Vec<CompactionGroupId>, TaskType)> {
1423        let group_ids = self.group_ids_shuffled();
1424        let mut normal_groups = vec![];
1425        for cg_id in group_ids {
1426            if let Some(pick_type) = self.pick_type(cg_id) {
1427                if pick_type == TaskType::Dynamic {
1428                    normal_groups.push(cg_id);
1429                } else if normal_groups.is_empty() {
1430                    return Some((vec![cg_id], pick_type));
1431                }
1432            }
1433        }
1434        if normal_groups.is_empty() {
1435            None
1436        } else {
1437            Some((normal_groups, TaskType::Dynamic))
1438        }
1439    }
1440
1441    fn group_ids_shuffled(&self) -> Vec<CompactionGroupId> {
1442        let mut group_ids: Vec<_> = self.scheduled.iter().map(|(g, _)| *g).unique().collect();
1443        group_ids.shuffle(&mut thread_rng());
1444        group_ids
1445    }
1446
1447    fn pick_type(&self, group: CompactionGroupId) -> Option<TaskType> {
1448        Self::TASK_TYPE_PRIORITY
1449            .iter()
1450            .find(|t| self.scheduled.contains(&(group, **t)))
1451            .copied()
1452    }
1453}
1454
1455/// Tracks which (`compaction_group`, `task_type`) pairs are scheduled for compaction.
1456///
1457/// For `Dynamic` type, includes a cooldown mechanism: groups with no compaction work
1458/// are skipped by periodic triggers until new data arrives via `commit_epoch`.
1459#[derive(Debug, Default)]
1460pub struct CompactionState {
1461    inner: Mutex<CompactionStateInner>,
1462}
1463
1464#[derive(Debug, Default)]
1465struct CompactionStateInner {
1466    scheduled: HashSet<(CompactionGroupId, compact_task::TaskType)>,
1467    /// Groups skipped by periodic Dynamic trigger until new data arrives.
1468    dynamic_cooldown: HashSet<CompactionGroupId>,
1469    /// Tracks new-data arrival time per group for cooldown race detection.
1470    last_new_data_time: HashMap<CompactionGroupId, Instant>,
1471}
1472
1473impl CompactionState {
1474    pub fn new() -> Self {
1475        Self {
1476            inner: Default::default(),
1477        }
1478    }
1479
1480    /// Enqueues a compaction request. Returns `true` if newly scheduled.
1481    ///
1482    /// `trigger` only affects `Dynamic` type — see [`ScheduleTrigger`].
1483    pub fn try_sched_compaction(
1484        &self,
1485        compaction_group: CompactionGroupId,
1486        task_type: TaskType,
1487        trigger: ScheduleTrigger,
1488    ) -> bool {
1489        let mut guard = self.inner.lock();
1490        if task_type == TaskType::Dynamic {
1491            match trigger {
1492                ScheduleTrigger::NewData => {
1493                    guard.dynamic_cooldown.remove(&compaction_group);
1494                    guard
1495                        .last_new_data_time
1496                        .insert(compaction_group, Instant::now());
1497                }
1498                ScheduleTrigger::Periodic => {
1499                    if guard.dynamic_cooldown.contains(&compaction_group) {
1500                        return false;
1501                    }
1502                }
1503            }
1504        }
1505        guard.scheduled.insert((compaction_group, task_type))
1506    }
1507
1508    /// Removes a scheduled entry. For Dynamic type, adds to cooldown unless
1509    /// new data arrived after `snapshot_time`.
1510    pub fn unschedule(
1511        &self,
1512        compaction_group: CompactionGroupId,
1513        task_type: compact_task::TaskType,
1514        snapshot_time: Instant,
1515    ) {
1516        let mut guard = self.inner.lock();
1517        guard.scheduled.remove(&(compaction_group, task_type));
1518        if task_type == TaskType::Dynamic {
1519            let has_new_data = guard
1520                .last_new_data_time
1521                .get(&compaction_group)
1522                .is_some_and(|t| *t > snapshot_time);
1523            if !has_new_data {
1524                guard.dynamic_cooldown.insert(compaction_group);
1525            }
1526        }
1527    }
1528
1529    /// Takes a snapshot of the current schedule state.
1530    pub fn snapshot(&self) -> CompactionScheduleSnapshot {
1531        let guard = self.inner.lock();
1532        // Record time after lock to ensure accurate ordering vs. try_sched_compaction
1533        let snapshot_time = Instant::now();
1534        CompactionScheduleSnapshot {
1535            scheduled: guard.scheduled.clone(),
1536            snapshot_time,
1537        }
1538    }
1539
1540    /// Removes all schedule state for a deleted or merged group.
1541    pub fn remove_compaction_group(&self, compaction_group: CompactionGroupId) {
1542        let mut guard = self.inner.lock();
1543        guard
1544            .scheduled
1545            .retain(|(group, _)| *group != compaction_group);
1546        guard.dynamic_cooldown.remove(&compaction_group);
1547        guard.last_new_data_time.remove(&compaction_group);
1548    }
1549}
1550
1551impl Compaction {
1552    pub fn get_compact_task_assignments_by_group_id(
1553        &self,
1554        compaction_group_id: CompactionGroupId,
1555    ) -> Vec<CompactTaskAssignment> {
1556        self.compact_task_assignment
1557            .iter()
1558            .filter_map(|(_, assignment)| {
1559                if assignment
1560                    .compact_task
1561                    .as_ref()
1562                    .is_some_and(|task| task.compaction_group_id == compaction_group_id)
1563                {
1564                    Some(CompactTaskAssignment {
1565                        compact_task: assignment.compact_task.clone(),
1566                        context_id: assignment.context_id,
1567                    })
1568                } else {
1569                    None
1570                }
1571            })
1572            .collect()
1573    }
1574}
1575
1576#[derive(Clone, Default)]
1577pub struct CompactionGroupStatistic {
1578    pub group_id: CompactionGroupId,
1579    pub group_size: u64,
1580    pub table_statistic: BTreeMap<StateTableId, u64>,
1581    pub compaction_group_config: CompactionGroup,
1582}
1583
1584/// Updates table stats caused by vnode watermark trivial reclaim compaction.
1585fn update_table_stats_for_vnode_watermark_trivial_reclaim(
1586    table_stats: &mut PbTableStatsMap,
1587    task: &CompactTask,
1588) {
1589    if task.task_type != TaskType::VnodeWatermark {
1590        return;
1591    }
1592    let mut deleted_table_keys: HashMap<TableId, u64> = HashMap::default();
1593    for s in task.input_ssts.iter().flat_map(|l| l.table_infos.iter()) {
1594        assert_eq!(s.table_ids.len(), 1);
1595        let e = deleted_table_keys.entry(s.table_ids[0]).or_insert(0);
1596        *e += s.total_key_count;
1597    }
1598    for (table_id, delete_count) in deleted_table_keys {
1599        let Some(stats) = table_stats.get_mut(&table_id) else {
1600            continue;
1601        };
1602        if stats.total_key_count == 0 {
1603            continue;
1604        }
1605        let new_total_key_count = stats.total_key_count.saturating_sub(delete_count as i64);
1606        let ratio = new_total_key_count as f64 / stats.total_key_count as f64;
1607        // total_key_count is updated accurately.
1608        stats.total_key_count = new_total_key_count;
1609        // others are updated approximately.
1610        stats.total_key_size = (stats.total_key_size as f64 * ratio).ceil() as i64;
1611        stats.total_value_size = (stats.total_value_size as f64 * ratio).ceil() as i64;
1612    }
1613}
1614
1615#[derive(Debug, Clone)]
1616pub enum GroupState {
1617    /// The compaction group is not in emergency state.
1618    Normal,
1619
1620    /// The compaction group is in emergency state.
1621    Emergency(String), // reason
1622
1623    /// The compaction group is in write stop state.
1624    WriteStop(String), // reason
1625}
1626
1627impl GroupState {
1628    pub fn is_write_stop(&self) -> bool {
1629        matches!(self, Self::WriteStop(_))
1630    }
1631
1632    pub fn is_emergency(&self) -> bool {
1633        matches!(self, Self::Emergency(_))
1634    }
1635
1636    pub fn reason(&self) -> Option<&str> {
1637        match self {
1638            Self::Emergency(reason) | Self::WriteStop(reason) => Some(reason),
1639            _ => None,
1640        }
1641    }
1642}
1643
1644#[derive(Clone, Default)]
1645pub struct GroupStateValidator;
1646
1647impl GroupStateValidator {
1648    pub fn write_stop_sub_level_count(
1649        level_count: usize,
1650        compaction_config: &CompactionConfig,
1651    ) -> bool {
1652        let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize;
1653        level_count > threshold
1654    }
1655
1656    pub fn write_stop_l0_size(l0_size: u64, compaction_config: &CompactionConfig) -> bool {
1657        l0_size
1658            > compaction_config
1659                .level0_stop_write_threshold_max_size
1660                .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1661    }
1662
1663    pub fn write_stop_l0_file_count(
1664        l0_file_count: usize,
1665        compaction_config: &CompactionConfig,
1666    ) -> bool {
1667        l0_file_count
1668            > compaction_config
1669                .level0_stop_write_threshold_max_sst_count
1670                .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1671                as usize
1672    }
1673
1674    pub fn emergency_l0_file_count(
1675        l0_file_count: usize,
1676        compaction_config: &CompactionConfig,
1677    ) -> bool {
1678        l0_file_count
1679            > compaction_config
1680                .emergency_level0_sst_file_count
1681                .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1682                as usize
1683    }
1684
1685    pub fn emergency_l0_partition_count(
1686        last_l0_sub_level_partition_count: usize,
1687        compaction_config: &CompactionConfig,
1688    ) -> bool {
1689        last_l0_sub_level_partition_count
1690            > compaction_config
1691                .emergency_level0_sub_level_partition
1692                .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1693                as usize
1694    }
1695
1696    pub fn check_single_group_write_stop(
1697        levels: &Levels,
1698        compaction_config: &CompactionConfig,
1699    ) -> GroupState {
1700        if Self::write_stop_sub_level_count(levels.l0.sub_levels.len(), compaction_config) {
1701            return GroupState::WriteStop(format!(
1702                "WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels",
1703                levels.l0.sub_levels.len(),
1704                compaction_config.level0_stop_write_threshold_sub_level_number
1705            ));
1706        }
1707
1708        if Self::write_stop_l0_file_count(
1709            levels
1710                .l0
1711                .sub_levels
1712                .iter()
1713                .map(|l| l.table_infos.len())
1714                .sum(),
1715            compaction_config,
1716        ) {
1717            return GroupState::WriteStop(format!(
1718                "WriteStop(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1719                levels
1720                    .l0
1721                    .sub_levels
1722                    .iter()
1723                    .map(|l| l.table_infos.len())
1724                    .sum::<usize>(),
1725                compaction_config
1726                    .level0_stop_write_threshold_max_sst_count
1727                    .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1728            ));
1729        }
1730
1731        if Self::write_stop_l0_size(levels.l0.total_file_size, compaction_config) {
1732            return GroupState::WriteStop(format!(
1733                "WriteStop(l0_size: {}, threshold: {}) too large L0 size",
1734                levels.l0.total_file_size,
1735                compaction_config
1736                    .level0_stop_write_threshold_max_size
1737                    .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1738            ));
1739        }
1740
1741        GroupState::Normal
1742    }
1743
1744    pub fn check_single_group_emergency(
1745        levels: &Levels,
1746        compaction_config: &CompactionConfig,
1747    ) -> GroupState {
1748        if Self::emergency_l0_file_count(
1749            levels
1750                .l0
1751                .sub_levels
1752                .iter()
1753                .map(|l| l.table_infos.len())
1754                .sum(),
1755            compaction_config,
1756        ) {
1757            return GroupState::Emergency(format!(
1758                "Emergency(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1759                levels
1760                    .l0
1761                    .sub_levels
1762                    .iter()
1763                    .map(|l| l.table_infos.len())
1764                    .sum::<usize>(),
1765                compaction_config
1766                    .emergency_level0_sst_file_count
1767                    .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1768            ));
1769        }
1770
1771        if Self::emergency_l0_partition_count(
1772            levels
1773                .l0
1774                .sub_levels
1775                .first()
1776                .map(|l| l.table_infos.len())
1777                .unwrap_or(0),
1778            compaction_config,
1779        ) {
1780            return GroupState::Emergency(format!(
1781                "Emergency(l0_partition_count: {}, threshold: {}) too many L0 partitions",
1782                levels
1783                    .l0
1784                    .sub_levels
1785                    .first()
1786                    .map(|l| l.table_infos.len())
1787                    .unwrap_or(0),
1788                compaction_config
1789                    .emergency_level0_sub_level_partition
1790                    .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1791            ));
1792        }
1793
1794        GroupState::Normal
1795    }
1796
1797    pub fn group_state(levels: &Levels, compaction_config: &CompactionConfig) -> GroupState {
1798        let state = Self::check_single_group_write_stop(levels, compaction_config);
1799        if state.is_write_stop() {
1800            return state;
1801        }
1802
1803        Self::check_single_group_emergency(levels, compaction_config)
1804    }
1805}
1806
1807#[cfg(test)]
1808mod prefetched_task_id_tests {
1809    use crate::manager::MetaOpts;
1810
1811    #[test]
1812    fn test_compaction_task_id_refill_capacity_default() {
1813        assert_eq!(MetaOpts::test(false).compaction_task_id_refill_capacity, 64);
1814    }
1815}
1816
1817#[cfg(test)]
1818mod compaction_state_tests {
1819    use risingwave_pb::hummock::compact_task::TaskType;
1820
1821    use super::*;
1822
1823    #[test]
1824    fn test_basic_schedule_and_unschedule() {
1825        let state = CompactionState::new();
1826        let group_id: CompactionGroupId = 1.into();
1827
1828        // First schedule should succeed
1829        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1830        // Duplicate schedule should fail
1831        assert!(!state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1832        // Different task type should succeed
1833        assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1834
1835        // Snapshot should contain both
1836        let snapshot = state.snapshot();
1837        assert!(snapshot.scheduled.contains(&(group_id, TaskType::Dynamic)));
1838        assert!(snapshot.scheduled.contains(&(group_id, TaskType::Ttl)));
1839
1840        // Unschedule removes from scheduled set
1841        state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1842        let snapshot2 = state.snapshot();
1843        assert!(!snapshot2.scheduled.contains(&(group_id, TaskType::Dynamic)));
1844        assert!(snapshot2.scheduled.contains(&(group_id, TaskType::Ttl)));
1845    }
1846
1847    #[test]
1848    fn test_cooldown_blocks_periodic_trigger() {
1849        let state = CompactionState::new();
1850        let group_id: CompactionGroupId = 1.into();
1851
1852        // Schedule then unschedule - should add to cooldown
1853        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1854        let snapshot = state.snapshot();
1855        state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1856
1857        // Verify in cooldown
1858        assert!(state.inner.lock().dynamic_cooldown.contains(&group_id));
1859
1860        // Periodic trigger should be blocked
1861        assert!(!state.try_sched_compaction(
1862            group_id,
1863            TaskType::Dynamic,
1864            ScheduleTrigger::Periodic
1865        ));
1866    }
1867
1868    #[test]
1869    fn test_new_data_clears_cooldown() {
1870        let state = CompactionState::new();
1871        let group_id: CompactionGroupId = 1.into();
1872
1873        // Put group in cooldown
1874        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1875        let snapshot = state.snapshot();
1876        state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1877        assert!(state.inner.lock().dynamic_cooldown.contains(&group_id));
1878
1879        // NewData trigger should clear cooldown and schedule
1880        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1881        assert!(!state.inner.lock().dynamic_cooldown.contains(&group_id));
1882    }
1883
1884    #[test]
1885    fn test_cooldown_only_affects_dynamic_type() {
1886        let state = CompactionState::new();
1887        let group_id: CompactionGroupId = 1.into();
1888
1889        // Put group in cooldown for Dynamic
1890        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1891        let snapshot = state.snapshot();
1892        state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1893
1894        // Ttl unschedule should NOT add to cooldown
1895        let group_id_2: CompactionGroupId = 2.into();
1896        assert!(state.try_sched_compaction(group_id_2, TaskType::Ttl, ScheduleTrigger::Periodic));
1897        let snapshot2 = state.snapshot();
1898        state.unschedule(group_id_2, TaskType::Ttl, snapshot2.snapshot_time());
1899        assert!(!state.inner.lock().dynamic_cooldown.contains(&group_id_2));
1900
1901        // Other task types should work regardless of cooldown
1902        assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1903        assert!(state.try_sched_compaction(
1904            group_id,
1905            TaskType::SpaceReclaim,
1906            ScheduleTrigger::Periodic
1907        ));
1908    }
1909
1910    #[test]
1911    fn test_race_condition_new_data_after_snapshot() {
1912        let state = CompactionState::new();
1913        let group_id: CompactionGroupId = 1.into();
1914
1915        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1916        let snapshot = state.snapshot();
1917
1918        // Simulate new data arriving AFTER snapshot
1919        {
1920            let mut guard = state.inner.lock();
1921            guard.last_new_data_time.insert(group_id, Instant::now());
1922        }
1923
1924        // Unschedule should NOT add to cooldown (new data arrived after snapshot)
1925        state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1926        assert!(
1927            !state.inner.lock().dynamic_cooldown.contains(&group_id),
1928            "Should skip cooldown when new data arrived after snapshot"
1929        );
1930    }
1931
1932    #[test]
1933    fn test_remove_compaction_group_cleans_all_state() {
1934        let state = CompactionState::new();
1935        let group_id: CompactionGroupId = 1.into();
1936
1937        // Set up state
1938        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1939        assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1940        state.inner.lock().dynamic_cooldown.insert(group_id);
1941
1942        // Remove group
1943        state.remove_compaction_group(group_id);
1944
1945        // Verify all state cleaned up
1946        let guard = state.inner.lock();
1947        assert!(!guard.scheduled.contains(&(group_id, TaskType::Dynamic)));
1948        assert!(!guard.scheduled.contains(&(group_id, TaskType::Ttl)));
1949        assert!(!guard.dynamic_cooldown.contains(&group_id));
1950        assert!(!guard.last_new_data_time.contains_key(&group_id));
1951    }
1952
1953    #[test]
1954    fn test_snapshot_pick_type_priority() {
1955        let state = CompactionState::new();
1956        let group_id: CompactionGroupId = 1.into();
1957
1958        // Empty group returns None
1959        assert_eq!(state.snapshot().pick_type(group_id), None);
1960
1961        // Priority order: Dynamic > SpaceReclaim > Ttl > Tombstone > VnodeWatermark
1962        state.try_sched_compaction(
1963            group_id,
1964            TaskType::VnodeWatermark,
1965            ScheduleTrigger::Periodic,
1966        );
1967        assert_eq!(
1968            state.snapshot().pick_type(group_id),
1969            Some(TaskType::VnodeWatermark)
1970        );
1971
1972        state.try_sched_compaction(group_id, TaskType::Tombstone, ScheduleTrigger::Periodic);
1973        assert_eq!(
1974            state.snapshot().pick_type(group_id),
1975            Some(TaskType::Tombstone)
1976        );
1977
1978        state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic);
1979        assert_eq!(state.snapshot().pick_type(group_id), Some(TaskType::Ttl));
1980
1981        state.try_sched_compaction(group_id, TaskType::SpaceReclaim, ScheduleTrigger::Periodic);
1982        assert_eq!(
1983            state.snapshot().pick_type(group_id),
1984            Some(TaskType::SpaceReclaim)
1985        );
1986
1987        state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData);
1988        assert_eq!(
1989            state.snapshot().pick_type(group_id),
1990            Some(TaskType::Dynamic)
1991        );
1992    }
1993
1994    #[test]
1995    fn test_multiple_groups_independent_cooldown() {
1996        let state = CompactionState::new();
1997        let g1: CompactionGroupId = 1.into();
1998        let g2: CompactionGroupId = 2.into();
1999
2000        state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
2001        state.try_sched_compaction(g2, TaskType::Dynamic, ScheduleTrigger::NewData);
2002        let snapshot = state.snapshot();
2003
2004        // Only unschedule g1
2005        state.unschedule(g1, TaskType::Dynamic, snapshot.snapshot_time());
2006
2007        let guard = state.inner.lock();
2008        assert!(guard.dynamic_cooldown.contains(&g1));
2009        assert!(!guard.dynamic_cooldown.contains(&g2));
2010    }
2011
2012    #[test]
2013    fn test_pick_compaction_groups_empty() {
2014        let state = CompactionState::new();
2015        let snapshot = state.snapshot();
2016        // No scheduled groups → returns None
2017        assert!(snapshot.pick_compaction_groups_and_type().is_none());
2018    }
2019
2020    #[test]
2021    fn test_pick_compaction_groups_mixed_types() {
2022        let state = CompactionState::new();
2023        let g1: CompactionGroupId = 1.into();
2024        let g2: CompactionGroupId = 2.into();
2025        let g3: CompactionGroupId = 3.into();
2026
2027        // g1: Dynamic, g2: Ttl, g3: Dynamic
2028        state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
2029        state.try_sched_compaction(g2, TaskType::Ttl, ScheduleTrigger::Periodic);
2030        state.try_sched_compaction(g3, TaskType::Dynamic, ScheduleTrigger::NewData);
2031
2032        let snapshot = state.snapshot();
2033        let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
2034
2035        // Due to shuffle, either:
2036        // - Ttl group is encountered first → returns (vec![g2], Ttl)
2037        // - Dynamic group is encountered first → collects all Dynamic, skips Ttl
2038        //   → returns ([g1, g3] in some order, Dynamic)
2039        if task_type == TaskType::Dynamic {
2040            assert!(groups.contains(&g1));
2041            assert!(groups.contains(&g3));
2042            assert!(!groups.contains(&g2)); // Ttl group excluded from Dynamic result
2043        } else {
2044            assert_eq!(task_type, TaskType::Ttl);
2045            assert_eq!(groups, vec![g2]);
2046        }
2047    }
2048
2049    #[test]
2050    fn test_pick_compaction_groups_all_dynamic() {
2051        let state = CompactionState::new();
2052        let g1: CompactionGroupId = 1.into();
2053        let g2: CompactionGroupId = 2.into();
2054
2055        state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
2056        state.try_sched_compaction(g2, TaskType::Dynamic, ScheduleTrigger::NewData);
2057
2058        let snapshot = state.snapshot();
2059        let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
2060        assert_eq!(task_type, TaskType::Dynamic);
2061        assert!(groups.contains(&g1));
2062        assert!(groups.contains(&g2));
2063    }
2064
2065    #[test]
2066    fn test_pick_compaction_groups_single_non_dynamic() {
2067        let state = CompactionState::new();
2068        let g1: CompactionGroupId = 1.into();
2069
2070        state.try_sched_compaction(g1, TaskType::SpaceReclaim, ScheduleTrigger::Periodic);
2071
2072        let snapshot = state.snapshot();
2073        let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
2074        assert_eq!(task_type, TaskType::SpaceReclaim);
2075        assert_eq!(groups, vec![g1]);
2076    }
2077}