risingwave_meta/hummock/manager/compaction/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17use std::time::Instant;
18
19use anyhow::Context;
20use compaction_event_loop::{
21    HummockCompactionEventDispatcher, HummockCompactionEventHandler, HummockCompactionEventLoop,
22    HummockCompactorDedicatedEventLoop,
23};
24use fail::fail_point;
25use itertools::Itertools;
26use parking_lot::Mutex;
27use rand::rng as thread_rng;
28use rand::seq::SliceRandom;
29use risingwave_common::catalog::TableId;
30use risingwave_common::config::meta::default::compaction_config;
31use risingwave_common::hash::VnodeCountCompat;
32use risingwave_common::util::epoch::Epoch;
33use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask};
34use risingwave_hummock_sdk::compaction_group::StateTableId;
35use risingwave_hummock_sdk::key_range::KeyRange;
36use risingwave_hummock_sdk::level::Levels;
37use risingwave_hummock_sdk::sstable_info::SstableInfo;
38use risingwave_hummock_sdk::table_stats::{
39    PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats,
40};
41use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
42use risingwave_hummock_sdk::version::{GroupDelta, IntraLevelDelta};
43use risingwave_hummock_sdk::{
44    CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockSstableId,
45    HummockSstableObjectId, HummockVersionId, compact_task_to_string, statistics_compact_task,
46};
47use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
48use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
49use risingwave_pb::hummock::{
50    CompactTaskAssignment, CompactionConfig, PbCompactStatus, PbCompactTaskAssignment,
51    SubscribeCompactionEventRequest, TableOption, TableSchema, compact_task,
52};
53use thiserror_ext::AsReport;
54use tokio::sync::RwLockWriteGuard;
55use tokio::sync::mpsc::UnboundedReceiver;
56use tokio::sync::oneshot::Sender;
57use tokio::task::JoinHandle;
58use tonic::Streaming;
59use tracing::warn;
60
61use crate::hummock::compaction::selector::level_selector::PickerInfo;
62use crate::hummock::compaction::selector::{
63    DynamicLevelSelector, DynamicLevelSelectorCore, LocalSelectorStatistic, ManualCompactionOption,
64    ManualCompactionSelector, SpaceReclaimCompactionSelector, TombstoneCompactionSelector,
65    TtlCompactionSelector, VnodeWatermarkCompactionSelector,
66};
67use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig, CompactionSelector};
68use crate::hummock::error::{Error, Result};
69use crate::hummock::manager::transaction::{
70    HummockVersionStatsTransaction, HummockVersionTransaction,
71};
72use crate::hummock::manager::versioning::Versioning;
73use crate::hummock::metrics_utils::{
74    build_compact_task_level_type_metrics_label, trigger_compact_tasks_stat,
75    trigger_local_table_stat,
76};
77use crate::hummock::model::CompactionGroup;
78use crate::hummock::sequence::next_compaction_task_id;
79use crate::hummock::{HummockManager, commit_multi_var, start_measure_real_process_timer};
80use crate::manager::META_NODE_ID;
81use crate::model::BTreeMapTransaction;
82
83pub mod compaction_event_loop;
84pub mod compaction_group_manager;
85pub mod compaction_group_schedule;
86
87static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
88    [
89        TaskStatus::ManualCanceled,
90        TaskStatus::SendFailCanceled,
91        TaskStatus::AssignFailCanceled,
92        TaskStatus::HeartbeatCanceled,
93        TaskStatus::InvalidGroupCanceled,
94        TaskStatus::NoAvailMemoryResourceCanceled,
95        TaskStatus::NoAvailCpuResourceCanceled,
96        TaskStatus::HeartbeatProgressCanceled,
97    ]
98    .into_iter()
99    .collect()
100});
101
102fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> {
103    let mut compaction_selectors: HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> =
104        HashMap::default();
105    compaction_selectors.insert(
106        compact_task::TaskType::Dynamic,
107        Box::<DynamicLevelSelector>::default(),
108    );
109    compaction_selectors.insert(
110        compact_task::TaskType::SpaceReclaim,
111        Box::<SpaceReclaimCompactionSelector>::default(),
112    );
113    compaction_selectors.insert(
114        compact_task::TaskType::Ttl,
115        Box::<TtlCompactionSelector>::default(),
116    );
117    compaction_selectors.insert(
118        compact_task::TaskType::Tombstone,
119        Box::<TombstoneCompactionSelector>::default(),
120    );
121    compaction_selectors.insert(
122        compact_task::TaskType::VnodeWatermark,
123        Box::<VnodeWatermarkCompactionSelector>::default(),
124    );
125    compaction_selectors
126}
127
128impl HummockVersionTransaction<'_> {
129    fn apply_compact_task(&mut self, compact_task: &CompactTask) {
130        let mut version_delta = self.new_delta();
131        let trivial_move = compact_task.is_trivial_move_task();
132        version_delta.trivial_move = trivial_move;
133
134        let group_deltas = &mut version_delta
135            .group_deltas
136            .entry(compact_task.compaction_group_id)
137            .or_default()
138            .group_deltas;
139        let mut removed_table_ids_map: BTreeMap<u32, HashSet<HummockSstableId>> =
140            BTreeMap::default();
141
142        for level in &compact_task.input_ssts {
143            let level_idx = level.level_idx;
144
145            removed_table_ids_map
146                .entry(level_idx)
147                .or_default()
148                .extend(level.table_infos.iter().map(|sst| sst.sst_id));
149        }
150
151        for (level_idx, removed_table_ids) in removed_table_ids_map {
152            let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
153                level_idx,
154                0, // default
155                removed_table_ids,
156                vec![], // default
157                0,      // default
158                compact_task.compaction_group_version_id,
159            ));
160
161            group_deltas.push(group_delta);
162        }
163
164        let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
165            compact_task.target_level,
166            compact_task.target_sub_level_id,
167            HashSet::new(), // default
168            compact_task.sorted_output_ssts.clone(),
169            compact_task.split_weight_by_vnode,
170            compact_task.compaction_group_version_id,
171        ));
172
173        group_deltas.push(group_delta);
174        version_delta.pre_apply();
175    }
176}
177
178#[derive(Default)]
179pub struct Compaction {
180    /// Compaction task that is already assigned to a compactor
181    pub compact_task_assignment: BTreeMap<HummockCompactionTaskId, PbCompactTaskAssignment>,
182    /// `CompactStatus` of each compaction group
183    pub compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus>,
184
185    pub _deterministic_mode: bool,
186}
187
188impl HummockManager {
189    pub async fn get_assigned_compact_task_num(&self) -> u64 {
190        self.compaction.read().await.compact_task_assignment.len() as u64
191    }
192
193    pub async fn list_compaction_status(
194        &self,
195    ) -> (Vec<PbCompactStatus>, Vec<CompactTaskAssignment>) {
196        let compaction = self.compaction.read().await;
197        (
198            compaction.compaction_statuses.values().map_into().collect(),
199            compaction
200                .compact_task_assignment
201                .values()
202                .cloned()
203                .collect(),
204        )
205    }
206
207    pub async fn get_compaction_scores(
208        &self,
209        compaction_group_id: CompactionGroupId,
210    ) -> Vec<PickerInfo> {
211        let (status, levels, group) = {
212            let compaction = self.compaction.read().await;
213            let versioning = self.versioning.read().await;
214            let config_manager = self.compaction_group_manager.read().await;
215            match (
216                compaction.compaction_statuses.get(&compaction_group_id),
217                versioning.current_version.levels.get(&compaction_group_id),
218                config_manager.try_get_compaction_group_config(compaction_group_id),
219            ) {
220                (Some(cs), Some(v), Some(cf)) => (cs.to_owned(), v.to_owned(), cf),
221                _ => {
222                    return vec![];
223                }
224            }
225        };
226        let dynamic_level_core = DynamicLevelSelectorCore::new(
227            group.compaction_config,
228            Arc::new(CompactionDeveloperConfig::default()),
229        );
230        let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers);
231        ctx.score_levels
232    }
233}
234
235impl HummockManager {
236    pub fn compaction_event_loop(
237        hummock_manager: Arc<Self>,
238        compactor_streams_change_rx: UnboundedReceiver<(
239            HummockContextId,
240            Streaming<SubscribeCompactionEventRequest>,
241        )>,
242    ) -> Vec<(JoinHandle<()>, Sender<()>)> {
243        let mut join_handle_vec = Vec::default();
244
245        let hummock_compaction_event_handler =
246            HummockCompactionEventHandler::new(hummock_manager.clone());
247
248        let dedicated_event_loop = HummockCompactorDedicatedEventLoop::new(
249            hummock_manager.clone(),
250            hummock_compaction_event_handler.clone(),
251        );
252
253        let (dedicated_event_loop_join_handle, event_tx, shutdown_tx) = dedicated_event_loop.run();
254        join_handle_vec.push((dedicated_event_loop_join_handle, shutdown_tx));
255
256        let hummock_compaction_event_dispatcher = HummockCompactionEventDispatcher::new(
257            hummock_manager.env.opts.clone(),
258            hummock_compaction_event_handler,
259            Some(event_tx),
260        );
261
262        let event_loop = HummockCompactionEventLoop::new(
263            hummock_compaction_event_dispatcher,
264            hummock_manager.metrics.clone(),
265            compactor_streams_change_rx,
266        );
267
268        let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
269        join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
270
271        join_handle_vec
272    }
273
274    pub fn add_compactor_stream(
275        &self,
276        context_id: HummockContextId,
277        req_stream: Streaming<SubscribeCompactionEventRequest>,
278    ) {
279        self.compactor_streams_change_tx
280            .send((context_id, req_stream))
281            .unwrap();
282    }
283}
284
285impl HummockManager {
286    pub async fn get_compact_tasks_impl(
287        &self,
288        compaction_groups: Vec<CompactionGroupId>,
289        max_select_count: usize,
290        selector: &mut Box<dyn CompactionSelector>,
291    ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
292        let deterministic_mode = self.env.opts.compaction_deterministic_test;
293
294        let mut compaction_guard = self.compaction.write().await;
295        let compaction: &mut Compaction = &mut compaction_guard;
296        let mut versioning_guard = self.versioning.write().await;
297        let versioning: &mut Versioning = &mut versioning_guard;
298
299        let _timer = start_measure_real_process_timer!(self, "get_compact_tasks_impl");
300
301        let start_time = Instant::now();
302        let mut compaction_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
303
304        let mut compact_task_assignment =
305            BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
306
307        let mut version = HummockVersionTransaction::new(
308            &mut versioning.current_version,
309            &mut versioning.hummock_version_deltas,
310            self.env.notification_manager(),
311            None,
312            &self.metrics,
313        );
314        // Apply stats changes.
315        let mut version_stats = HummockVersionStatsTransaction::new(
316            &mut versioning.version_stats,
317            self.env.notification_manager(),
318        );
319
320        if deterministic_mode {
321            version.disable_apply_to_txn();
322        }
323        let all_versioned_table_schemas = if self.env.opts.enable_dropped_column_reclaim {
324            self.metadata_manager
325                .catalog_controller
326                .get_versioned_table_schemas()
327                .await
328                .map_err(|e| Error::Internal(e.into()))?
329        } else {
330            HashMap::default()
331        };
332        let mut unschedule_groups = vec![];
333        let mut trivial_tasks = vec![];
334        let mut pick_tasks = vec![];
335        let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts(
336            &self.env.opts,
337        ));
338        'outside: for compaction_group_id in compaction_groups {
339            if pick_tasks.len() >= max_select_count {
340                break;
341            }
342
343            if !version
344                .latest_version()
345                .levels
346                .contains_key(&compaction_group_id)
347            {
348                continue;
349            }
350
351            // When the last table of a compaction group is deleted, the compaction group (and its
352            // config) is destroyed as well. Then a compaction task for this group may come later and
353            // cannot find its config.
354            let group_config = {
355                let config_manager = self.compaction_group_manager.read().await;
356
357                match config_manager.try_get_compaction_group_config(compaction_group_id) {
358                    Some(config) => config,
359                    None => continue,
360                }
361            };
362
363            // StoredIdGenerator already implements ids pre-allocation by ID_PREALLOCATE_INTERVAL.
364            let task_id = next_compaction_task_id(&self.env).await?;
365
366            if !compaction_statuses.contains_key(&compaction_group_id) {
367                // lazy initialize.
368                compaction_statuses.insert(
369                    compaction_group_id,
370                    CompactStatus::new(
371                        compaction_group_id,
372                        group_config.compaction_config.max_level,
373                    ),
374                );
375            }
376            let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap();
377
378            let can_trivial_move = matches!(selector.task_type(), TaskType::Dynamic)
379                || matches!(selector.task_type(), TaskType::Emergency);
380
381            let mut stats = LocalSelectorStatistic::default();
382            let member_table_ids: Vec<_> = version
383                .latest_version()
384                .state_table_info
385                .compaction_group_member_table_ids(compaction_group_id)
386                .iter()
387                .copied()
388                .collect();
389
390            let mut table_id_to_option: HashMap<TableId, _> = HashMap::default();
391
392            {
393                let guard = self.table_id_to_table_option.read();
394                for table_id in &member_table_ids {
395                    if let Some(opts) = guard.get(table_id) {
396                        table_id_to_option.insert(*table_id, *opts);
397                    }
398                }
399            }
400
401            while let Some(compact_task) = compact_status.get_compact_task(
402                version
403                    .latest_version()
404                    .get_compaction_group_levels(compaction_group_id),
405                version
406                    .latest_version()
407                    .state_table_info
408                    .compaction_group_member_table_ids(compaction_group_id),
409                task_id as HummockCompactionTaskId,
410                &group_config,
411                &mut stats,
412                selector,
413                &table_id_to_option,
414                developer_config.clone(),
415                &version.latest_version().table_watermarks,
416                &version.latest_version().state_table_info,
417            ) {
418                let target_level_id = compact_task.input.target_level as u32;
419                let compaction_group_version_id = version
420                    .latest_version()
421                    .get_compaction_group_levels(compaction_group_id)
422                    .compaction_group_version_id;
423                let compression_algorithm = match compact_task.compression_algorithm.as_str() {
424                    "Lz4" => 1,
425                    "Zstd" => 2,
426                    _ => 0,
427                };
428                let vnode_partition_count = compact_task.input.vnode_partition_count;
429                let mut compact_task = CompactTask {
430                    input_ssts: compact_task.input.input_levels,
431                    splits: vec![KeyRange::inf()],
432                    sorted_output_ssts: vec![],
433                    task_id,
434                    target_level: target_level_id,
435                    // only gc delete keys in last level because there may be older version in more bottom
436                    // level.
437                    gc_delete_keys: version
438                        .latest_version()
439                        .get_compaction_group_levels(compaction_group_id)
440                        .is_last_level(target_level_id),
441                    base_level: compact_task.base_level as u32,
442                    task_status: TaskStatus::Pending,
443                    compaction_group_id: group_config.group_id,
444                    compaction_group_version_id,
445                    existing_table_ids: member_table_ids.clone(),
446                    compression_algorithm,
447                    target_file_size: compact_task.target_file_size,
448                    table_options: table_id_to_option
449                        .iter()
450                        .map(|(table_id, table_option)| {
451                            (*table_id, TableOption::from(table_option))
452                        })
453                        .collect(),
454                    current_epoch_time: Epoch::now().0,
455                    compaction_filter_mask: group_config.compaction_config.compaction_filter_mask,
456                    target_sub_level_id: compact_task.input.target_sub_level_id,
457                    task_type: compact_task.compaction_task_type,
458                    split_weight_by_vnode: vnode_partition_count,
459                    max_sub_compaction: group_config.compaction_config.max_sub_compaction,
460                    max_kv_count_for_xor16: group_config.compaction_config.max_kv_count_for_xor16,
461                    ..Default::default()
462                };
463
464                let is_trivial_reclaim = compact_task.is_trivial_reclaim();
465                let is_trivial_move = compact_task.is_trivial_move_task();
466                if is_trivial_reclaim || (is_trivial_move && can_trivial_move) {
467                    let log_label = if is_trivial_reclaim {
468                        "TrivialReclaim"
469                    } else {
470                        "TrivialMove"
471                    };
472                    let label = if is_trivial_reclaim {
473                        "trivial-space-reclaim"
474                    } else {
475                        "trivial-move"
476                    };
477
478                    tracing::debug!(
479                        "{} for compaction group {}: input: {:?}, cost time: {:?}",
480                        log_label,
481                        compact_task.compaction_group_id,
482                        compact_task.input_ssts,
483                        start_time.elapsed()
484                    );
485                    compact_task.task_status = TaskStatus::Success;
486                    compact_status.report_compact_task(&compact_task);
487                    if !is_trivial_reclaim {
488                        compact_task
489                            .sorted_output_ssts
490                            .clone_from(&compact_task.input_ssts[0].table_infos);
491                    }
492                    update_table_stats_for_vnode_watermark_trivial_reclaim(
493                        &mut version_stats.table_stats,
494                        &compact_task,
495                    );
496                    self.metrics
497                        .compact_frequency
498                        .with_label_values(&[
499                            label,
500                            &compact_task.compaction_group_id.to_string(),
501                            selector.task_type().as_str_name(),
502                            "SUCCESS",
503                        ])
504                        .inc();
505
506                    version.apply_compact_task(&compact_task);
507                    trivial_tasks.push(compact_task);
508                    if trivial_tasks.len() >= self.env.opts.max_trivial_move_task_count_per_loop {
509                        break 'outside;
510                    }
511                } else {
512                    self.calculate_vnode_partition(
513                        &mut compact_task,
514                        group_config.compaction_config.as_ref(),
515                        version
516                            .latest_version()
517                            .get_compaction_group_levels(compaction_group_id),
518                    )
519                    .await?;
520
521                    let table_ids_to_be_compacted = compact_task.build_compact_table_ids();
522
523                    let mut pk_prefix_table_watermarks = BTreeMap::default();
524                    let mut non_pk_prefix_table_watermarks = BTreeMap::default();
525                    let mut value_table_watermarks = BTreeMap::default();
526                    for (table_id, watermark) in version
527                        .latest_version()
528                        .safe_epoch_table_watermarks(&table_ids_to_be_compacted)
529                    {
530                        match watermark.watermark_type {
531                            WatermarkSerdeType::PkPrefix => {
532                                pk_prefix_table_watermarks.insert(table_id, watermark);
533                            }
534                            WatermarkSerdeType::NonPkPrefix => {
535                                non_pk_prefix_table_watermarks.insert(table_id, watermark);
536                            }
537                            WatermarkSerdeType::Value => {
538                                value_table_watermarks.insert(table_id, watermark);
539                            }
540                        }
541                    }
542                    compact_task.pk_prefix_table_watermarks = pk_prefix_table_watermarks;
543                    compact_task.non_pk_prefix_table_watermarks = non_pk_prefix_table_watermarks;
544                    compact_task.value_table_watermarks = value_table_watermarks;
545
546                    compact_task.table_schemas = compact_task
547                        .existing_table_ids
548                        .iter()
549                        .filter_map(|table_id| {
550                            all_versioned_table_schemas.get(table_id).map(|column_ids| {
551                                (
552                                    *table_id,
553                                    TableSchema {
554                                        column_ids: column_ids.clone(),
555                                    },
556                                )
557                            })
558                        })
559                        .collect();
560
561                    compact_task_assignment.insert(
562                        compact_task.task_id,
563                        CompactTaskAssignment {
564                            compact_task: Some(compact_task.clone().into()),
565                            context_id: META_NODE_ID, // deprecated
566                        },
567                    );
568
569                    pick_tasks.push(compact_task);
570                    break;
571                }
572
573                stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
574                stats = LocalSelectorStatistic::default();
575            }
576            if pick_tasks
577                .last()
578                .map(|task| task.compaction_group_id != compaction_group_id)
579                .unwrap_or(true)
580            {
581                unschedule_groups.push(compaction_group_id);
582            }
583            stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
584        }
585
586        if !trivial_tasks.is_empty() {
587            commit_multi_var!(
588                self.meta_store_ref(),
589                compaction_statuses,
590                compact_task_assignment,
591                version,
592                version_stats
593            )?;
594            self.metrics
595                .compact_task_batch_count
596                .with_label_values(&["batch_trivial_move"])
597                .observe(trivial_tasks.len() as f64);
598
599            for trivial_task in &trivial_tasks {
600                self.metrics
601                    .compact_task_trivial_move_sst_count
602                    .with_label_values(&[&trivial_task.compaction_group_id.to_string()])
603                    .observe(trivial_task.input_ssts[0].table_infos.len() as _);
604            }
605
606            drop(versioning_guard);
607        } else {
608            // We are using a single transaction to ensure that each task has progress when it is
609            // created.
610            drop(versioning_guard);
611            commit_multi_var!(
612                self.meta_store_ref(),
613                compaction_statuses,
614                compact_task_assignment
615            )?;
616        }
617        drop(compaction_guard);
618        if !pick_tasks.is_empty() {
619            self.metrics
620                .compact_task_batch_count
621                .with_label_values(&["batch_get_compact_task"])
622                .observe(pick_tasks.len() as f64);
623        }
624
625        for compact_task in &mut pick_tasks {
626            let compaction_group_id = compact_task.compaction_group_id;
627
628            // Initiate heartbeat for the task to track its progress.
629            self.compactor_manager
630                .initiate_task_heartbeat(compact_task.clone());
631
632            // this task has been finished.
633            compact_task.task_status = TaskStatus::Pending;
634            let compact_task_statistics = statistics_compact_task(compact_task);
635
636            let level_type_label = build_compact_task_level_type_metrics_label(
637                compact_task.input_ssts[0].level_idx as usize,
638                compact_task.input_ssts.last().unwrap().level_idx as usize,
639            );
640
641            let level_count = compact_task.input_ssts.len();
642            if compact_task.input_ssts[0].level_idx == 0 {
643                self.metrics
644                    .l0_compact_level_count
645                    .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
646                    .observe(level_count as _);
647            }
648
649            self.metrics
650                .compact_task_size
651                .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
652                .observe(compact_task_statistics.total_file_size as _);
653
654            self.metrics
655                .compact_task_size
656                .with_label_values(&[
657                    &compaction_group_id.to_string(),
658                    &format!("{} uncompressed", level_type_label),
659                ])
660                .observe(compact_task_statistics.total_uncompressed_file_size as _);
661
662            self.metrics
663                .compact_task_file_count
664                .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
665                .observe(compact_task_statistics.total_file_count as _);
666
667            tracing::trace!(
668                "For compaction group {}: pick up {} {} sub_level in level {} to compact to target {}. cost time: {:?} compact_task_statistics {:?}",
669                compaction_group_id,
670                level_count,
671                compact_task.input_ssts[0].level_type.as_str_name(),
672                compact_task.input_ssts[0].level_idx,
673                compact_task.target_level,
674                start_time.elapsed(),
675                compact_task_statistics
676            );
677        }
678
679        #[cfg(test)]
680        {
681            self.check_state_consistency().await;
682        }
683        pick_tasks.extend(trivial_tasks);
684        Ok((pick_tasks, unschedule_groups))
685    }
686
687    /// Cancels a compaction task no matter it's assigned or unassigned.
688    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
689        fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore(
690            anyhow::anyhow!("failpoint metastore err")
691        )));
692        let ret = self
693            .cancel_compact_task_impl(vec![task_id], task_status)
694            .await?;
695        Ok(ret[0])
696    }
697
698    pub async fn cancel_compact_tasks(
699        &self,
700        tasks: Vec<u64>,
701        task_status: TaskStatus,
702    ) -> Result<Vec<bool>> {
703        self.cancel_compact_task_impl(tasks, task_status).await
704    }
705
706    async fn cancel_compact_task_impl(
707        &self,
708        task_ids: Vec<u64>,
709        task_status: TaskStatus,
710    ) -> Result<Vec<bool>> {
711        assert!(CANCEL_STATUS_SET.contains(&task_status));
712        let tasks = task_ids
713            .into_iter()
714            .map(|task_id| ReportTask {
715                task_id,
716                task_status,
717                sorted_output_ssts: vec![],
718                table_stats_change: HashMap::default(),
719                object_timestamps: HashMap::default(),
720            })
721            .collect_vec();
722        let rets = self.report_compact_tasks(tasks).await?;
723        #[cfg(test)]
724        {
725            self.check_state_consistency().await;
726        }
727        Ok(rets)
728    }
729
730    async fn get_compact_tasks(
731        &self,
732        mut compaction_groups: Vec<CompactionGroupId>,
733        max_select_count: usize,
734        selector: &mut Box<dyn CompactionSelector>,
735    ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
736        fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
737            anyhow::anyhow!("failpoint metastore error")
738        )));
739        compaction_groups.shuffle(&mut thread_rng());
740        let (mut tasks, groups) = self
741            .get_compact_tasks_impl(compaction_groups, max_select_count, selector)
742            .await?;
743        tasks.retain(|task| {
744            if task.task_status == TaskStatus::Success {
745                debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
746                false
747            } else {
748                true
749            }
750        });
751        Ok((tasks, groups))
752    }
753
754    pub async fn get_compact_task(
755        &self,
756        compaction_group_id: CompactionGroupId,
757        selector: &mut Box<dyn CompactionSelector>,
758    ) -> Result<Option<CompactTask>> {
759        fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
760            anyhow::anyhow!("failpoint metastore error")
761        )));
762
763        let (normal_tasks, _) = self
764            .get_compact_tasks_impl(vec![compaction_group_id], 1, selector)
765            .await?;
766        for task in normal_tasks {
767            if task.task_status != TaskStatus::Success {
768                return Ok(Some(task));
769            }
770            debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
771        }
772        Ok(None)
773    }
774
775    pub async fn manual_get_compact_task(
776        &self,
777        compaction_group_id: CompactionGroupId,
778        manual_compaction_option: ManualCompactionOption,
779    ) -> Result<Option<CompactTask>> {
780        let mut selector: Box<dyn CompactionSelector> =
781            Box::new(ManualCompactionSelector::new(manual_compaction_option));
782        self.get_compact_task(compaction_group_id, &mut selector)
783            .await
784    }
785
786    pub async fn report_compact_task(
787        &self,
788        task_id: u64,
789        task_status: TaskStatus,
790        sorted_output_ssts: Vec<SstableInfo>,
791        table_stats_change: Option<PbTableStatsMap>,
792        object_timestamps: HashMap<HummockSstableObjectId, u64>,
793    ) -> Result<bool> {
794        let rets = self
795            .report_compact_tasks(vec![ReportTask {
796                task_id,
797                task_status,
798                sorted_output_ssts,
799                table_stats_change: table_stats_change.unwrap_or_default(),
800                object_timestamps,
801            }])
802            .await?;
803        Ok(rets[0])
804    }
805
806    pub async fn report_compact_tasks(&self, report_tasks: Vec<ReportTask>) -> Result<Vec<bool>> {
807        let compaction_guard = self.compaction.write().await;
808        let versioning_guard = self.versioning.write().await;
809
810        self.report_compact_tasks_impl(report_tasks, compaction_guard, versioning_guard)
811            .await
812    }
813
814    /// Finishes or cancels a compaction task, according to `task_status`.
815    ///
816    /// If `context_id` is not None, its validity will be checked when writing meta store.
817    /// Its ownership of the task is checked as well.
818    ///
819    /// Return Ok(false) indicates either the task is not found,
820    /// or the task is not owned by `context_id` when `context_id` is not None.
821    pub async fn report_compact_tasks_impl(
822        &self,
823        report_tasks: Vec<ReportTask>,
824        mut compaction_guard: RwLockWriteGuard<'_, Compaction>,
825        mut versioning_guard: RwLockWriteGuard<'_, Versioning>,
826    ) -> Result<Vec<bool>> {
827        let deterministic_mode = self.env.opts.compaction_deterministic_test;
828        let compaction: &mut Compaction = &mut compaction_guard;
829        let start_time = Instant::now();
830        let original_keys = compaction.compaction_statuses.keys().cloned().collect_vec();
831        let mut compact_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
832        let mut rets = vec![false; report_tasks.len()];
833        let mut compact_task_assignment =
834            BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
835        // The compaction task is finished.
836        let versioning: &mut Versioning = &mut versioning_guard;
837        let _timer = start_measure_real_process_timer!(self, "report_compact_tasks");
838
839        // purge stale compact_status
840        for group_id in original_keys {
841            if !versioning.current_version.levels.contains_key(&group_id) {
842                compact_statuses.remove(group_id);
843            }
844        }
845        let mut tasks = vec![];
846
847        let mut version = HummockVersionTransaction::new(
848            &mut versioning.current_version,
849            &mut versioning.hummock_version_deltas,
850            self.env.notification_manager(),
851            None,
852            &self.metrics,
853        );
854
855        if deterministic_mode {
856            version.disable_apply_to_txn();
857        }
858
859        let mut version_stats = HummockVersionStatsTransaction::new(
860            &mut versioning.version_stats,
861            self.env.notification_manager(),
862        );
863        let mut success_count = 0;
864        for (idx, task) in report_tasks.into_iter().enumerate() {
865            rets[idx] = true;
866            let mut compact_task = match compact_task_assignment.remove(task.task_id) {
867                Some(compact_task) => CompactTask::from(compact_task.compact_task.unwrap()),
868                None => {
869                    tracing::warn!("{}", format!("compact task {} not found", task.task_id));
870                    rets[idx] = false;
871                    continue;
872                }
873            };
874
875            {
876                // apply result
877                compact_task.task_status = task.task_status;
878                compact_task.sorted_output_ssts = task.sorted_output_ssts;
879            }
880
881            match compact_statuses.get_mut(compact_task.compaction_group_id) {
882                Some(mut compact_status) => {
883                    compact_status.report_compact_task(&compact_task);
884                }
885                None => {
886                    // When the group_id is not found in the compaction_statuses, it means the group has been removed.
887                    // The task is invalid and should be canceled.
888                    // e.g.
889                    // 1. The group is removed by the user unregistering the tables
890                    // 2. The group is removed by the group scheduling algorithm
891                    compact_task.task_status = TaskStatus::InvalidGroupCanceled;
892                }
893            }
894
895            let is_success = if let TaskStatus::Success = compact_task.task_status {
896                match self
897                    .report_compaction_sanity_check(&task.object_timestamps)
898                    .await
899                {
900                    Err(e) => {
901                        warn!(
902                            "failed to commit compaction task {} {}",
903                            compact_task.task_id,
904                            e.as_report()
905                        );
906                        compact_task.task_status = TaskStatus::RetentionTimeRejected;
907                        false
908                    }
909                    _ => {
910                        let group = version
911                            .latest_version()
912                            .levels
913                            .get(&compact_task.compaction_group_id)
914                            .unwrap();
915                        let is_expired = compact_task.is_expired(group.compaction_group_version_id);
916                        if is_expired {
917                            compact_task.task_status = TaskStatus::InputOutdatedCanceled;
918                            warn!(
919                                "The task may be expired because of group split, task:\n {:?}",
920                                compact_task_to_string(&compact_task)
921                            );
922                        }
923                        !is_expired
924                    }
925                }
926            } else {
927                false
928            };
929            if is_success {
930                success_count += 1;
931                version.apply_compact_task(&compact_task);
932                if purge_prost_table_stats(
933                    &mut version_stats.table_stats,
934                    version.latest_version(),
935                    &HashSet::default(),
936                ) {
937                    self.metrics.version_stats.reset();
938                    versioning.local_metrics.clear();
939                }
940                add_prost_table_stats_map(&mut version_stats.table_stats, &task.table_stats_change);
941                trigger_local_table_stat(
942                    &self.metrics,
943                    &mut versioning.local_metrics,
944                    &version_stats,
945                    &task.table_stats_change,
946                );
947            }
948            tasks.push(compact_task);
949        }
950        if success_count > 0 {
951            commit_multi_var!(
952                self.meta_store_ref(),
953                compact_statuses,
954                compact_task_assignment,
955                version,
956                version_stats
957            )?;
958
959            self.metrics
960                .compact_task_batch_count
961                .with_label_values(&["batch_report_task"])
962                .observe(success_count as f64);
963        } else {
964            // The compaction task is cancelled or failed.
965            commit_multi_var!(
966                self.meta_store_ref(),
967                compact_statuses,
968                compact_task_assignment
969            )?;
970        }
971
972        let mut success_groups = vec![];
973        for compact_task in &tasks {
974            self.compactor_manager
975                .remove_task_heartbeat(compact_task.task_id);
976            tracing::trace!(
977                "Reported compaction task. {}. cost time: {:?}",
978                compact_task_to_string(compact_task),
979                start_time.elapsed(),
980            );
981
982            if !deterministic_mode
983                && (matches!(compact_task.task_type, compact_task::TaskType::Dynamic)
984                    || matches!(compact_task.task_type, compact_task::TaskType::Emergency))
985            {
986                // only try send Dynamic compaction
987                self.try_send_compaction_request(
988                    compact_task.compaction_group_id,
989                    compact_task::TaskType::Dynamic,
990                );
991            }
992
993            if compact_task.task_status == TaskStatus::Success {
994                success_groups.push(compact_task.compaction_group_id);
995            }
996        }
997
998        trigger_compact_tasks_stat(
999            &self.metrics,
1000            &tasks,
1001            &compaction.compaction_statuses,
1002            &versioning_guard.current_version,
1003        );
1004        drop(versioning_guard);
1005        if !success_groups.is_empty() {
1006            self.try_update_write_limits(&success_groups).await;
1007        }
1008        Ok(rets)
1009    }
1010
1011    /// Triggers compacitons to specified compaction groups.
1012    /// Don't wait for compaction finish
1013    pub async fn trigger_compaction_deterministic(
1014        &self,
1015        _base_version_id: HummockVersionId,
1016        compaction_groups: Vec<CompactionGroupId>,
1017    ) -> Result<()> {
1018        self.on_current_version(|old_version| {
1019            tracing::info!(
1020                "Trigger compaction for version {}, groups {:?}",
1021                old_version.id,
1022                compaction_groups
1023            );
1024        })
1025        .await;
1026
1027        if compaction_groups.is_empty() {
1028            return Ok(());
1029        }
1030        for compaction_group in compaction_groups {
1031            self.try_send_compaction_request(compaction_group, compact_task::TaskType::Dynamic);
1032        }
1033        Ok(())
1034    }
1035
1036    pub async fn trigger_manual_compaction(
1037        &self,
1038        compaction_group: CompactionGroupId,
1039        manual_compaction_option: ManualCompactionOption,
1040    ) -> Result<()> {
1041        let start_time = Instant::now();
1042
1043        // 1. Get idle compactor.
1044        let compactor = match self.compactor_manager.next_compactor() {
1045            Some(compactor) => compactor,
1046            None => {
1047                tracing::warn!("trigger_manual_compaction No compactor is available.");
1048                return Err(anyhow::anyhow!(
1049                    "trigger_manual_compaction No compactor is available. compaction_group {}",
1050                    compaction_group
1051                )
1052                .into());
1053            }
1054        };
1055
1056        // 2. Get manual compaction task.
1057        let compact_task = self
1058            .manual_get_compact_task(compaction_group, manual_compaction_option)
1059            .await;
1060        let compact_task = match compact_task {
1061            Ok(Some(compact_task)) => compact_task,
1062            Ok(None) => {
1063                // No compaction task available.
1064                return Err(anyhow::anyhow!(
1065                    "trigger_manual_compaction No compaction_task is available. compaction_group {}",
1066                    compaction_group
1067                )
1068                    .into());
1069            }
1070            Err(err) => {
1071                tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
1072
1073                return Err(anyhow::anyhow!(err)
1074                    .context(format!(
1075                        "Failed to get compaction task for compaction_group {}",
1076                        compaction_group,
1077                    ))
1078                    .into());
1079            }
1080        };
1081
1082        // 3. send task to compactor
1083        let compact_task_string = compact_task_to_string(&compact_task);
1084        // TODO: shall we need to cancel on meta ?
1085        compactor
1086            .send_event(ResponseEvent::CompactTask(compact_task.into()))
1087            .with_context(|| {
1088                format!(
1089                    "Failed to trigger compaction task for compaction_group {}",
1090                    compaction_group,
1091                )
1092            })?;
1093
1094        tracing::info!(
1095            "Trigger manual compaction task. {}. cost time: {:?}",
1096            &compact_task_string,
1097            start_time.elapsed(),
1098        );
1099
1100        Ok(())
1101    }
1102
1103    /// Sends a compaction request for new data (clears cooldown).
1104    pub fn try_send_compaction_request(
1105        &self,
1106        compaction_group: CompactionGroupId,
1107        task_type: compact_task::TaskType,
1108    ) -> bool {
1109        self.compaction_state.try_sched_compaction(
1110            compaction_group,
1111            task_type,
1112            ScheduleTrigger::NewData,
1113        )
1114    }
1115
1116    /// Apply vnode-aligned compaction for large single-table levels.
1117    /// This enables one-vnode-per-SST alignment for precise query pruning.
1118    async fn try_apply_vnode_aligned_partition(
1119        &self,
1120        compact_task: &mut CompactTask,
1121        compaction_config: &CompactionConfig,
1122        levels: &Levels,
1123    ) -> Result<bool> {
1124        // Check if vnode-aligned compaction is enabled for this level
1125        // Only enable for single-table scenarios to avoid cross-table complexity
1126        let Some(threshold) = compaction_config.vnode_aligned_level_size_threshold else {
1127            return Ok(false);
1128        };
1129
1130        if compact_task.target_level < compact_task.base_level
1131            || compact_task.existing_table_ids.len() != 1
1132        {
1133            return Ok(false);
1134        }
1135
1136        // Calculate total size of the entire target level
1137        let target_level_size = levels
1138            .get_level(compact_task.target_level as usize)
1139            .total_file_size;
1140
1141        if target_level_size < threshold {
1142            return Ok(false);
1143        }
1144
1145        // Enable strict one-vnode-per-SST alignment for single table
1146        let table_id = compact_task.existing_table_ids[0];
1147
1148        // Get the actual vnode count from table catalog
1149        let table = self
1150            .metadata_manager
1151            .get_table_catalog_by_ids(&[table_id])
1152            .await
1153            .with_context(|| {
1154                format!(
1155                    "Failed to get table catalog for table_id {} in compaction_group {}",
1156                    table_id, compact_task.compaction_group_id
1157                )
1158            })
1159            .map_err(Error::Internal)?
1160            .into_iter()
1161            .next()
1162            .ok_or_else(|| {
1163                Error::Internal(anyhow::anyhow!(
1164                    "Table catalog not found for table_id {} in compaction_group {}",
1165                    table_id,
1166                    compact_task.compaction_group_id
1167                ))
1168            })?;
1169
1170        compact_task
1171            .table_vnode_partition
1172            .insert(table_id, table.vnode_count() as u32);
1173
1174        Ok(true)
1175    }
1176
1177    /// Apply `split_weight_by_vnode` based partition strategy.
1178    /// This handles dynamic partitioning based on table size and write throughput.
1179    fn apply_split_weight_by_vnode_partition(
1180        &self,
1181        compact_task: &mut CompactTask,
1182        compaction_config: &CompactionConfig,
1183    ) {
1184        if compaction_config.split_weight_by_vnode > 0 {
1185            for table_id in &compact_task.existing_table_ids {
1186                compact_task
1187                    .table_vnode_partition
1188                    .insert(*table_id, compact_task.split_weight_by_vnode);
1189            }
1190
1191            return;
1192        }
1193
1194        // Calculate per-table size from input SSTs
1195        let mut table_size_info: HashMap<TableId, u64> = HashMap::default();
1196        let mut existing_table_ids: HashSet<TableId> = HashSet::default();
1197        for input_ssts in &compact_task.input_ssts {
1198            for sst in &input_ssts.table_infos {
1199                existing_table_ids.extend(sst.table_ids.iter());
1200                for table_id in &sst.table_ids {
1201                    *table_size_info.entry(*table_id).or_default() +=
1202                        sst.sst_size / (sst.table_ids.len() as u64);
1203                }
1204            }
1205        }
1206        compact_task
1207            .existing_table_ids
1208            .retain(|table_id| existing_table_ids.contains(table_id));
1209
1210        let hybrid_vnode_count = self.env.opts.hybrid_partition_node_count;
1211        let default_partition_count = self.env.opts.partition_vnode_count;
1212        let compact_task_table_size_partition_threshold_low = self
1213            .env
1214            .opts
1215            .compact_task_table_size_partition_threshold_low;
1216        let compact_task_table_size_partition_threshold_high = self
1217            .env
1218            .opts
1219            .compact_task_table_size_partition_threshold_high;
1220
1221        // Check latest write throughput
1222        let table_write_throughput_statistic_manager =
1223            self.table_write_throughput_statistic_manager.read();
1224        let timestamp = chrono::Utc::now().timestamp();
1225
1226        for (table_id, compact_table_size) in table_size_info {
1227            let write_throughput = table_write_throughput_statistic_manager
1228                .get_table_throughput_descending(table_id, timestamp)
1229                .peekable()
1230                .peek()
1231                .map(|item| item.throughput)
1232                .unwrap_or(0);
1233
1234            if compact_table_size > compact_task_table_size_partition_threshold_high
1235                && default_partition_count > 0
1236            {
1237                compact_task
1238                    .table_vnode_partition
1239                    .insert(table_id, default_partition_count);
1240            } else if (compact_table_size > compact_task_table_size_partition_threshold_low
1241                || (write_throughput > self.env.opts.table_high_write_throughput_threshold
1242                    && compact_table_size > compaction_config.target_file_size_base))
1243                && hybrid_vnode_count > 0
1244            {
1245                compact_task
1246                    .table_vnode_partition
1247                    .insert(table_id, hybrid_vnode_count);
1248            } else if compact_table_size > compaction_config.target_file_size_base {
1249                compact_task.table_vnode_partition.insert(table_id, 1);
1250            }
1251        }
1252
1253        compact_task
1254            .table_vnode_partition
1255            .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id));
1256    }
1257
1258    pub(crate) async fn calculate_vnode_partition(
1259        &self,
1260        compact_task: &mut CompactTask,
1261        compaction_config: &CompactionConfig,
1262        levels: &Levels,
1263    ) -> Result<()> {
1264        // Try vnode-aligned partition first (for large single-table levels)
1265        if self
1266            .try_apply_vnode_aligned_partition(compact_task, compaction_config, levels)
1267            .await?
1268        {
1269            return Ok(());
1270        }
1271
1272        // Do not split sst by vnode partition when target_level > base_level
1273        // The purpose of data alignment is mainly to improve the parallelism of base level compaction
1274        // and reduce write amplification. However, at high level, the size of the sst file is often
1275        // larger and only contains the data of a single table_id, so there is no need to cut it.
1276        if compact_task.target_level > compact_task.base_level {
1277            return Ok(());
1278        }
1279
1280        // Apply split_weight_by_vnode based partition strategy
1281        self.apply_split_weight_by_vnode_partition(compact_task, compaction_config);
1282
1283        Ok(())
1284    }
1285
1286    pub fn compactor_manager_ref(&self) -> crate::hummock::CompactorManagerRef {
1287        self.compactor_manager.clone()
1288    }
1289}
1290
1291#[cfg(any(test, feature = "test"))]
1292impl HummockManager {
1293    pub async fn compaction_task_from_assignment_for_test(
1294        &self,
1295        task_id: u64,
1296    ) -> Option<CompactTaskAssignment> {
1297        let compaction_guard = self.compaction.read().await;
1298        let assignment_ref = &compaction_guard.compact_task_assignment;
1299        assignment_ref.get(&task_id).cloned()
1300    }
1301
1302    pub async fn report_compact_task_for_test(
1303        &self,
1304        task_id: u64,
1305        compact_task: Option<CompactTask>,
1306        task_status: TaskStatus,
1307        sorted_output_ssts: Vec<SstableInfo>,
1308        table_stats_change: Option<PbTableStatsMap>,
1309    ) -> Result<()> {
1310        if let Some(task) = compact_task {
1311            let mut guard = self.compaction.write().await;
1312            guard.compact_task_assignment.insert(
1313                task_id,
1314                CompactTaskAssignment {
1315                    compact_task: Some(task.into()),
1316                    context_id: 0.into(),
1317                },
1318            );
1319        }
1320
1321        // In the test, the contents of the compact task may have been modified directly, while the contents of compact_task_assignment were not modified.
1322        // So we pass the modified compact_task directly into the `report_compact_task_impl`
1323        self.report_compact_tasks(vec![ReportTask {
1324            task_id,
1325            task_status,
1326            sorted_output_ssts,
1327            table_stats_change: table_stats_change.unwrap_or_default(),
1328            object_timestamps: HashMap::default(),
1329        }])
1330        .await?;
1331        Ok(())
1332    }
1333}
1334
1335/// What triggered the compaction schedule request.
1336#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1337pub enum ScheduleTrigger {
1338    /// New data arrived (e.g., `commit_epoch`). Clears cooldown.
1339    NewData,
1340    /// Periodic timer. Respects cooldown for Dynamic type.
1341    Periodic,
1342}
1343
1344/// A point-in-time snapshot of the compaction schedule state.
1345///
1346/// `snapshot_time` is used by `unschedule()` to detect whether new data arrived
1347/// after the snapshot was taken, preventing incorrect cooldown.
1348pub struct CompactionScheduleSnapshot {
1349    scheduled: HashSet<(CompactionGroupId, compact_task::TaskType)>,
1350    snapshot_time: Instant,
1351}
1352
1353impl CompactionScheduleSnapshot {
1354    /// Task type priority order for scheduling (checked first = higher priority).
1355    const TASK_TYPE_PRIORITY: &[TaskType] = &[
1356        TaskType::Dynamic,
1357        TaskType::SpaceReclaim,
1358        TaskType::Ttl,
1359        TaskType::Tombstone,
1360        TaskType::VnodeWatermark,
1361    ];
1362
1363    pub fn snapshot_time(&self) -> Instant {
1364        self.snapshot_time
1365    }
1366
1367    /// Pick compaction groups and task type from this snapshot.
1368    ///
1369    /// Returns groups in shuffled order. Non-Dynamic types have higher priority
1370    /// and return a single group; Dynamic groups are batched together.
1371    pub fn pick_compaction_groups_and_type(&self) -> Option<(Vec<CompactionGroupId>, TaskType)> {
1372        let group_ids = self.group_ids_shuffled();
1373        let mut normal_groups = vec![];
1374        for cg_id in group_ids {
1375            if let Some(pick_type) = self.pick_type(cg_id) {
1376                if pick_type == TaskType::Dynamic {
1377                    normal_groups.push(cg_id);
1378                } else if normal_groups.is_empty() {
1379                    return Some((vec![cg_id], pick_type));
1380                }
1381            }
1382        }
1383        if normal_groups.is_empty() {
1384            None
1385        } else {
1386            Some((normal_groups, TaskType::Dynamic))
1387        }
1388    }
1389
1390    fn group_ids_shuffled(&self) -> Vec<CompactionGroupId> {
1391        let mut group_ids: Vec<_> = self.scheduled.iter().map(|(g, _)| *g).unique().collect();
1392        group_ids.shuffle(&mut thread_rng());
1393        group_ids
1394    }
1395
1396    fn pick_type(&self, group: CompactionGroupId) -> Option<TaskType> {
1397        Self::TASK_TYPE_PRIORITY
1398            .iter()
1399            .find(|t| self.scheduled.contains(&(group, **t)))
1400            .copied()
1401    }
1402}
1403
1404/// Tracks which (`compaction_group`, `task_type`) pairs are scheduled for compaction.
1405///
1406/// For `Dynamic` type, includes a cooldown mechanism: groups with no compaction work
1407/// are skipped by periodic triggers until new data arrives via `commit_epoch`.
1408#[derive(Debug, Default)]
1409pub struct CompactionState {
1410    inner: Mutex<CompactionStateInner>,
1411}
1412
1413#[derive(Debug, Default)]
1414struct CompactionStateInner {
1415    scheduled: HashSet<(CompactionGroupId, compact_task::TaskType)>,
1416    /// Groups skipped by periodic Dynamic trigger until new data arrives.
1417    dynamic_cooldown: HashSet<CompactionGroupId>,
1418    /// Tracks new-data arrival time per group for cooldown race detection.
1419    last_new_data_time: HashMap<CompactionGroupId, Instant>,
1420}
1421
1422impl CompactionState {
1423    pub fn new() -> Self {
1424        Self {
1425            inner: Default::default(),
1426        }
1427    }
1428
1429    /// Enqueues a compaction request. Returns `true` if newly scheduled.
1430    ///
1431    /// `trigger` only affects `Dynamic` type — see [`ScheduleTrigger`].
1432    pub fn try_sched_compaction(
1433        &self,
1434        compaction_group: CompactionGroupId,
1435        task_type: TaskType,
1436        trigger: ScheduleTrigger,
1437    ) -> bool {
1438        let mut guard = self.inner.lock();
1439        if task_type == TaskType::Dynamic {
1440            match trigger {
1441                ScheduleTrigger::NewData => {
1442                    guard.dynamic_cooldown.remove(&compaction_group);
1443                    guard
1444                        .last_new_data_time
1445                        .insert(compaction_group, Instant::now());
1446                }
1447                ScheduleTrigger::Periodic => {
1448                    if guard.dynamic_cooldown.contains(&compaction_group) {
1449                        return false;
1450                    }
1451                }
1452            }
1453        }
1454        guard.scheduled.insert((compaction_group, task_type))
1455    }
1456
1457    /// Removes a scheduled entry. For Dynamic type, adds to cooldown unless
1458    /// new data arrived after `snapshot_time`.
1459    pub fn unschedule(
1460        &self,
1461        compaction_group: CompactionGroupId,
1462        task_type: compact_task::TaskType,
1463        snapshot_time: Instant,
1464    ) {
1465        let mut guard = self.inner.lock();
1466        guard.scheduled.remove(&(compaction_group, task_type));
1467        if task_type == TaskType::Dynamic {
1468            let has_new_data = guard
1469                .last_new_data_time
1470                .get(&compaction_group)
1471                .is_some_and(|t| *t > snapshot_time);
1472            if !has_new_data {
1473                guard.dynamic_cooldown.insert(compaction_group);
1474            }
1475        }
1476    }
1477
1478    /// Takes a snapshot of the current schedule state.
1479    pub fn snapshot(&self) -> CompactionScheduleSnapshot {
1480        let guard = self.inner.lock();
1481        // Record time after lock to ensure accurate ordering vs. try_sched_compaction
1482        let snapshot_time = Instant::now();
1483        CompactionScheduleSnapshot {
1484            scheduled: guard.scheduled.clone(),
1485            snapshot_time,
1486        }
1487    }
1488
1489    /// Removes all schedule state for a deleted or merged group.
1490    pub fn remove_compaction_group(&self, compaction_group: CompactionGroupId) {
1491        let mut guard = self.inner.lock();
1492        guard
1493            .scheduled
1494            .retain(|(group, _)| *group != compaction_group);
1495        guard.dynamic_cooldown.remove(&compaction_group);
1496        guard.last_new_data_time.remove(&compaction_group);
1497    }
1498}
1499
1500impl Compaction {
1501    pub fn get_compact_task_assignments_by_group_id(
1502        &self,
1503        compaction_group_id: CompactionGroupId,
1504    ) -> Vec<CompactTaskAssignment> {
1505        self.compact_task_assignment
1506            .iter()
1507            .filter_map(|(_, assignment)| {
1508                if assignment
1509                    .compact_task
1510                    .as_ref()
1511                    .is_some_and(|task| task.compaction_group_id == compaction_group_id)
1512                {
1513                    Some(CompactTaskAssignment {
1514                        compact_task: assignment.compact_task.clone(),
1515                        context_id: assignment.context_id,
1516                    })
1517                } else {
1518                    None
1519                }
1520            })
1521            .collect()
1522    }
1523}
1524
1525#[derive(Clone, Default)]
1526pub struct CompactionGroupStatistic {
1527    pub group_id: CompactionGroupId,
1528    pub group_size: u64,
1529    pub table_statistic: BTreeMap<StateTableId, u64>,
1530    pub compaction_group_config: CompactionGroup,
1531}
1532
1533/// Updates table stats caused by vnode watermark trivial reclaim compaction.
1534fn update_table_stats_for_vnode_watermark_trivial_reclaim(
1535    table_stats: &mut PbTableStatsMap,
1536    task: &CompactTask,
1537) {
1538    if task.task_type != TaskType::VnodeWatermark {
1539        return;
1540    }
1541    let mut deleted_table_keys: HashMap<TableId, u64> = HashMap::default();
1542    for s in task.input_ssts.iter().flat_map(|l| l.table_infos.iter()) {
1543        assert_eq!(s.table_ids.len(), 1);
1544        let e = deleted_table_keys.entry(s.table_ids[0]).or_insert(0);
1545        *e += s.total_key_count;
1546    }
1547    for (table_id, delete_count) in deleted_table_keys {
1548        let Some(stats) = table_stats.get_mut(&table_id) else {
1549            continue;
1550        };
1551        if stats.total_key_count == 0 {
1552            continue;
1553        }
1554        let new_total_key_count = stats.total_key_count.saturating_sub(delete_count as i64);
1555        let ratio = new_total_key_count as f64 / stats.total_key_count as f64;
1556        // total_key_count is updated accurately.
1557        stats.total_key_count = new_total_key_count;
1558        // others are updated approximately.
1559        stats.total_key_size = (stats.total_key_size as f64 * ratio).ceil() as i64;
1560        stats.total_value_size = (stats.total_value_size as f64 * ratio).ceil() as i64;
1561    }
1562}
1563
1564#[derive(Debug, Clone)]
1565pub enum GroupState {
1566    /// The compaction group is not in emergency state.
1567    Normal,
1568
1569    /// The compaction group is in emergency state.
1570    Emergency(String), // reason
1571
1572    /// The compaction group is in write stop state.
1573    WriteStop(String), // reason
1574}
1575
1576impl GroupState {
1577    pub fn is_write_stop(&self) -> bool {
1578        matches!(self, Self::WriteStop(_))
1579    }
1580
1581    pub fn is_emergency(&self) -> bool {
1582        matches!(self, Self::Emergency(_))
1583    }
1584
1585    pub fn reason(&self) -> Option<&str> {
1586        match self {
1587            Self::Emergency(reason) | Self::WriteStop(reason) => Some(reason),
1588            _ => None,
1589        }
1590    }
1591}
1592
1593#[derive(Clone, Default)]
1594pub struct GroupStateValidator;
1595
1596impl GroupStateValidator {
1597    pub fn write_stop_sub_level_count(
1598        level_count: usize,
1599        compaction_config: &CompactionConfig,
1600    ) -> bool {
1601        let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize;
1602        level_count > threshold
1603    }
1604
1605    pub fn write_stop_l0_size(l0_size: u64, compaction_config: &CompactionConfig) -> bool {
1606        l0_size
1607            > compaction_config
1608                .level0_stop_write_threshold_max_size
1609                .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1610    }
1611
1612    pub fn write_stop_l0_file_count(
1613        l0_file_count: usize,
1614        compaction_config: &CompactionConfig,
1615    ) -> bool {
1616        l0_file_count
1617            > compaction_config
1618                .level0_stop_write_threshold_max_sst_count
1619                .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1620                as usize
1621    }
1622
1623    pub fn emergency_l0_file_count(
1624        l0_file_count: usize,
1625        compaction_config: &CompactionConfig,
1626    ) -> bool {
1627        l0_file_count
1628            > compaction_config
1629                .emergency_level0_sst_file_count
1630                .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1631                as usize
1632    }
1633
1634    pub fn emergency_l0_partition_count(
1635        last_l0_sub_level_partition_count: usize,
1636        compaction_config: &CompactionConfig,
1637    ) -> bool {
1638        last_l0_sub_level_partition_count
1639            > compaction_config
1640                .emergency_level0_sub_level_partition
1641                .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1642                as usize
1643    }
1644
1645    pub fn check_single_group_write_stop(
1646        levels: &Levels,
1647        compaction_config: &CompactionConfig,
1648    ) -> GroupState {
1649        if Self::write_stop_sub_level_count(levels.l0.sub_levels.len(), compaction_config) {
1650            return GroupState::WriteStop(format!(
1651                "WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels",
1652                levels.l0.sub_levels.len(),
1653                compaction_config.level0_stop_write_threshold_sub_level_number
1654            ));
1655        }
1656
1657        if Self::write_stop_l0_file_count(
1658            levels
1659                .l0
1660                .sub_levels
1661                .iter()
1662                .map(|l| l.table_infos.len())
1663                .sum(),
1664            compaction_config,
1665        ) {
1666            return GroupState::WriteStop(format!(
1667                "WriteStop(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1668                levels
1669                    .l0
1670                    .sub_levels
1671                    .iter()
1672                    .map(|l| l.table_infos.len())
1673                    .sum::<usize>(),
1674                compaction_config
1675                    .level0_stop_write_threshold_max_sst_count
1676                    .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1677            ));
1678        }
1679
1680        if Self::write_stop_l0_size(levels.l0.total_file_size, compaction_config) {
1681            return GroupState::WriteStop(format!(
1682                "WriteStop(l0_size: {}, threshold: {}) too large L0 size",
1683                levels.l0.total_file_size,
1684                compaction_config
1685                    .level0_stop_write_threshold_max_size
1686                    .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1687            ));
1688        }
1689
1690        GroupState::Normal
1691    }
1692
1693    pub fn check_single_group_emergency(
1694        levels: &Levels,
1695        compaction_config: &CompactionConfig,
1696    ) -> GroupState {
1697        if Self::emergency_l0_file_count(
1698            levels
1699                .l0
1700                .sub_levels
1701                .iter()
1702                .map(|l| l.table_infos.len())
1703                .sum(),
1704            compaction_config,
1705        ) {
1706            return GroupState::Emergency(format!(
1707                "Emergency(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1708                levels
1709                    .l0
1710                    .sub_levels
1711                    .iter()
1712                    .map(|l| l.table_infos.len())
1713                    .sum::<usize>(),
1714                compaction_config
1715                    .emergency_level0_sst_file_count
1716                    .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1717            ));
1718        }
1719
1720        if Self::emergency_l0_partition_count(
1721            levels
1722                .l0
1723                .sub_levels
1724                .first()
1725                .map(|l| l.table_infos.len())
1726                .unwrap_or(0),
1727            compaction_config,
1728        ) {
1729            return GroupState::Emergency(format!(
1730                "Emergency(l0_partition_count: {}, threshold: {}) too many L0 partitions",
1731                levels
1732                    .l0
1733                    .sub_levels
1734                    .first()
1735                    .map(|l| l.table_infos.len())
1736                    .unwrap_or(0),
1737                compaction_config
1738                    .emergency_level0_sub_level_partition
1739                    .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1740            ));
1741        }
1742
1743        GroupState::Normal
1744    }
1745
1746    pub fn group_state(levels: &Levels, compaction_config: &CompactionConfig) -> GroupState {
1747        let state = Self::check_single_group_write_stop(levels, compaction_config);
1748        if state.is_write_stop() {
1749            return state;
1750        }
1751
1752        Self::check_single_group_emergency(levels, compaction_config)
1753    }
1754}
1755
1756#[cfg(test)]
1757mod compaction_state_tests {
1758    use risingwave_pb::hummock::compact_task::TaskType;
1759
1760    use super::*;
1761
1762    #[test]
1763    fn test_basic_schedule_and_unschedule() {
1764        let state = CompactionState::new();
1765        let group_id: CompactionGroupId = 1.into();
1766
1767        // First schedule should succeed
1768        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1769        // Duplicate schedule should fail
1770        assert!(!state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1771        // Different task type should succeed
1772        assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1773
1774        // Snapshot should contain both
1775        let snapshot = state.snapshot();
1776        assert!(snapshot.scheduled.contains(&(group_id, TaskType::Dynamic)));
1777        assert!(snapshot.scheduled.contains(&(group_id, TaskType::Ttl)));
1778
1779        // Unschedule removes from scheduled set
1780        state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1781        let snapshot2 = state.snapshot();
1782        assert!(!snapshot2.scheduled.contains(&(group_id, TaskType::Dynamic)));
1783        assert!(snapshot2.scheduled.contains(&(group_id, TaskType::Ttl)));
1784    }
1785
1786    #[test]
1787    fn test_cooldown_blocks_periodic_trigger() {
1788        let state = CompactionState::new();
1789        let group_id: CompactionGroupId = 1.into();
1790
1791        // Schedule then unschedule - should add to cooldown
1792        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1793        let snapshot = state.snapshot();
1794        state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1795
1796        // Verify in cooldown
1797        assert!(state.inner.lock().dynamic_cooldown.contains(&group_id));
1798
1799        // Periodic trigger should be blocked
1800        assert!(!state.try_sched_compaction(
1801            group_id,
1802            TaskType::Dynamic,
1803            ScheduleTrigger::Periodic
1804        ));
1805    }
1806
1807    #[test]
1808    fn test_new_data_clears_cooldown() {
1809        let state = CompactionState::new();
1810        let group_id: CompactionGroupId = 1.into();
1811
1812        // Put group in cooldown
1813        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1814        let snapshot = state.snapshot();
1815        state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1816        assert!(state.inner.lock().dynamic_cooldown.contains(&group_id));
1817
1818        // NewData trigger should clear cooldown and schedule
1819        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1820        assert!(!state.inner.lock().dynamic_cooldown.contains(&group_id));
1821    }
1822
1823    #[test]
1824    fn test_cooldown_only_affects_dynamic_type() {
1825        let state = CompactionState::new();
1826        let group_id: CompactionGroupId = 1.into();
1827
1828        // Put group in cooldown for Dynamic
1829        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1830        let snapshot = state.snapshot();
1831        state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1832
1833        // Ttl unschedule should NOT add to cooldown
1834        let group_id_2: CompactionGroupId = 2.into();
1835        assert!(state.try_sched_compaction(group_id_2, TaskType::Ttl, ScheduleTrigger::Periodic));
1836        let snapshot2 = state.snapshot();
1837        state.unschedule(group_id_2, TaskType::Ttl, snapshot2.snapshot_time());
1838        assert!(!state.inner.lock().dynamic_cooldown.contains(&group_id_2));
1839
1840        // Other task types should work regardless of cooldown
1841        assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1842        assert!(state.try_sched_compaction(
1843            group_id,
1844            TaskType::SpaceReclaim,
1845            ScheduleTrigger::Periodic
1846        ));
1847    }
1848
1849    #[test]
1850    fn test_race_condition_new_data_after_snapshot() {
1851        let state = CompactionState::new();
1852        let group_id: CompactionGroupId = 1.into();
1853
1854        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1855        let snapshot = state.snapshot();
1856
1857        // Simulate new data arriving AFTER snapshot
1858        {
1859            let mut guard = state.inner.lock();
1860            guard.last_new_data_time.insert(group_id, Instant::now());
1861        }
1862
1863        // Unschedule should NOT add to cooldown (new data arrived after snapshot)
1864        state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1865        assert!(
1866            !state.inner.lock().dynamic_cooldown.contains(&group_id),
1867            "Should skip cooldown when new data arrived after snapshot"
1868        );
1869    }
1870
1871    #[test]
1872    fn test_remove_compaction_group_cleans_all_state() {
1873        let state = CompactionState::new();
1874        let group_id: CompactionGroupId = 1.into();
1875
1876        // Set up state
1877        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1878        assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1879        state.inner.lock().dynamic_cooldown.insert(group_id);
1880
1881        // Remove group
1882        state.remove_compaction_group(group_id);
1883
1884        // Verify all state cleaned up
1885        let guard = state.inner.lock();
1886        assert!(!guard.scheduled.contains(&(group_id, TaskType::Dynamic)));
1887        assert!(!guard.scheduled.contains(&(group_id, TaskType::Ttl)));
1888        assert!(!guard.dynamic_cooldown.contains(&group_id));
1889        assert!(!guard.last_new_data_time.contains_key(&group_id));
1890    }
1891
1892    #[test]
1893    fn test_snapshot_pick_type_priority() {
1894        let state = CompactionState::new();
1895        let group_id: CompactionGroupId = 1.into();
1896
1897        // Empty group returns None
1898        assert_eq!(state.snapshot().pick_type(group_id), None);
1899
1900        // Priority order: Dynamic > SpaceReclaim > Ttl > Tombstone > VnodeWatermark
1901        state.try_sched_compaction(
1902            group_id,
1903            TaskType::VnodeWatermark,
1904            ScheduleTrigger::Periodic,
1905        );
1906        assert_eq!(
1907            state.snapshot().pick_type(group_id),
1908            Some(TaskType::VnodeWatermark)
1909        );
1910
1911        state.try_sched_compaction(group_id, TaskType::Tombstone, ScheduleTrigger::Periodic);
1912        assert_eq!(
1913            state.snapshot().pick_type(group_id),
1914            Some(TaskType::Tombstone)
1915        );
1916
1917        state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic);
1918        assert_eq!(state.snapshot().pick_type(group_id), Some(TaskType::Ttl));
1919
1920        state.try_sched_compaction(group_id, TaskType::SpaceReclaim, ScheduleTrigger::Periodic);
1921        assert_eq!(
1922            state.snapshot().pick_type(group_id),
1923            Some(TaskType::SpaceReclaim)
1924        );
1925
1926        state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData);
1927        assert_eq!(
1928            state.snapshot().pick_type(group_id),
1929            Some(TaskType::Dynamic)
1930        );
1931    }
1932
1933    #[test]
1934    fn test_multiple_groups_independent_cooldown() {
1935        let state = CompactionState::new();
1936        let g1: CompactionGroupId = 1.into();
1937        let g2: CompactionGroupId = 2.into();
1938
1939        state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
1940        state.try_sched_compaction(g2, TaskType::Dynamic, ScheduleTrigger::NewData);
1941        let snapshot = state.snapshot();
1942
1943        // Only unschedule g1
1944        state.unschedule(g1, TaskType::Dynamic, snapshot.snapshot_time());
1945
1946        let guard = state.inner.lock();
1947        assert!(guard.dynamic_cooldown.contains(&g1));
1948        assert!(!guard.dynamic_cooldown.contains(&g2));
1949    }
1950
1951    #[test]
1952    fn test_pick_compaction_groups_empty() {
1953        let state = CompactionState::new();
1954        let snapshot = state.snapshot();
1955        // No scheduled groups → returns None
1956        assert!(snapshot.pick_compaction_groups_and_type().is_none());
1957    }
1958
1959    #[test]
1960    fn test_pick_compaction_groups_mixed_types() {
1961        let state = CompactionState::new();
1962        let g1: CompactionGroupId = 1.into();
1963        let g2: CompactionGroupId = 2.into();
1964        let g3: CompactionGroupId = 3.into();
1965
1966        // g1: Dynamic, g2: Ttl, g3: Dynamic
1967        state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
1968        state.try_sched_compaction(g2, TaskType::Ttl, ScheduleTrigger::Periodic);
1969        state.try_sched_compaction(g3, TaskType::Dynamic, ScheduleTrigger::NewData);
1970
1971        let snapshot = state.snapshot();
1972        let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
1973
1974        // Due to shuffle, either:
1975        // - Ttl group is encountered first → returns (vec![g2], Ttl)
1976        // - Dynamic group is encountered first → collects all Dynamic, skips Ttl
1977        //   → returns ([g1, g3] in some order, Dynamic)
1978        if task_type == TaskType::Dynamic {
1979            assert!(groups.contains(&g1));
1980            assert!(groups.contains(&g3));
1981            assert!(!groups.contains(&g2)); // Ttl group excluded from Dynamic result
1982        } else {
1983            assert_eq!(task_type, TaskType::Ttl);
1984            assert_eq!(groups, vec![g2]);
1985        }
1986    }
1987
1988    #[test]
1989    fn test_pick_compaction_groups_all_dynamic() {
1990        let state = CompactionState::new();
1991        let g1: CompactionGroupId = 1.into();
1992        let g2: CompactionGroupId = 2.into();
1993
1994        state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
1995        state.try_sched_compaction(g2, TaskType::Dynamic, ScheduleTrigger::NewData);
1996
1997        let snapshot = state.snapshot();
1998        let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
1999        assert_eq!(task_type, TaskType::Dynamic);
2000        assert!(groups.contains(&g1));
2001        assert!(groups.contains(&g2));
2002    }
2003
2004    #[test]
2005    fn test_pick_compaction_groups_single_non_dynamic() {
2006        let state = CompactionState::new();
2007        let g1: CompactionGroupId = 1.into();
2008
2009        state.try_sched_compaction(g1, TaskType::SpaceReclaim, ScheduleTrigger::Periodic);
2010
2011        let snapshot = state.snapshot();
2012        let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
2013        assert_eq!(task_type, TaskType::SpaceReclaim);
2014        assert_eq!(groups, vec![g1]);
2015    }
2016}