risingwave_meta/hummock/manager/compaction/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17use std::time::Instant;
18
19use anyhow::Context;
20use compaction_event_loop::{
21    HummockCompactionEventDispatcher, HummockCompactionEventHandler, HummockCompactionEventLoop,
22    HummockCompactorDedicatedEventLoop,
23};
24use fail::fail_point;
25use itertools::Itertools;
26use parking_lot::Mutex;
27use rand::rng as thread_rng;
28use rand::seq::SliceRandom;
29use risingwave_common::catalog::TableId;
30use risingwave_common::config::meta::default::compaction_config;
31use risingwave_common::util::epoch::Epoch;
32use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask};
33use risingwave_hummock_sdk::compaction_group::StateTableId;
34use risingwave_hummock_sdk::compaction_group::hummock_version_ext::safe_epoch_table_watermarks_impl;
35use risingwave_hummock_sdk::level::Levels;
36use risingwave_hummock_sdk::sstable_info::SstableInfo;
37use risingwave_hummock_sdk::table_stats::{
38    PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats,
39};
40use risingwave_hummock_sdk::table_watermark::TableWatermarks;
41use risingwave_hummock_sdk::version::{GroupDelta, IntraLevelDelta};
42use risingwave_hummock_sdk::{
43    CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockSstableId,
44    HummockSstableObjectId, HummockVersionId, compact_task_to_string, statistics_compact_task,
45};
46use risingwave_meta_model::hummock_sequence::COMPACTION_TASK_ID;
47use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
48use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
49use risingwave_pb::hummock::{
50    CompactTaskAssignment, CompactionConfig, PbCompactStatus, PbCompactTaskAssignment,
51    SubscribeCompactionEventRequest, TableOption, compact_task,
52};
53use thiserror_ext::AsReport;
54use tokio::sync::RwLockWriteGuard;
55use tokio::sync::mpsc::UnboundedReceiver;
56use tokio::sync::oneshot::{Receiver, Sender};
57use tokio::task::JoinHandle;
58use tonic::Streaming;
59use tracing::warn;
60
61use crate::hummock::compaction::selector::level_selector::PickerInfo;
62use crate::hummock::compaction::selector::{
63    DynamicLevelSelector, DynamicLevelSelectorCore, LocalSelectorStatistic, ManualCompactionOption,
64    ManualCompactionSelector, SpaceReclaimCompactionSelector, TombstoneCompactionSelector,
65    TtlCompactionSelector, VnodeWatermarkCompactionSelector,
66};
67use crate::hummock::compaction::{
68    CompactStatus, CompactionDeveloperConfig, CompactionSelector,
69    CompactionTask as PickedCompactionTask,
70};
71use crate::hummock::error::{Error, Result};
72use crate::hummock::manager::CompactionTaskReportResult;
73use crate::hummock::manager::compaction::compact_task_builder::{
74    CompactTaskBuildContext, attach_compact_task_table_metadata, build_base_compact_task,
75};
76use crate::hummock::manager::transaction::{
77    HummockVersionStatsTransaction, HummockVersionTransaction,
78};
79use crate::hummock::manager::versioning::Versioning;
80use crate::hummock::metrics_utils::{
81    build_compact_task_level_type_metrics_label, trigger_compact_tasks_stat,
82    trigger_local_table_stat,
83};
84use crate::hummock::model::CompactionGroup;
85use crate::hummock::{HummockManager, commit_multi_var, start_measure_real_process_timer};
86use crate::manager::META_NODE_ID;
87use crate::model::BTreeMapTransaction;
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90pub enum ManualCompactionTriggerResult {
91    Submitted,
92    Retry,
93}
94
95mod compact_task_builder;
96pub mod compaction_event_loop;
97pub mod compaction_group_manager;
98pub mod compaction_group_schedule;
99
100static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
101    [
102        TaskStatus::ManualCanceled,
103        TaskStatus::SendFailCanceled,
104        TaskStatus::AssignFailCanceled,
105        TaskStatus::HeartbeatCanceled,
106        TaskStatus::InvalidGroupCanceled,
107        TaskStatus::NoAvailMemoryResourceCanceled,
108        TaskStatus::NoAvailCpuResourceCanceled,
109        TaskStatus::HeartbeatProgressCanceled,
110    ]
111    .into_iter()
112    .collect()
113});
114
115fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> {
116    let mut compaction_selectors: HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> =
117        HashMap::default();
118    compaction_selectors.insert(
119        compact_task::TaskType::Dynamic,
120        Box::<DynamicLevelSelector>::default(),
121    );
122    compaction_selectors.insert(
123        compact_task::TaskType::SpaceReclaim,
124        Box::<SpaceReclaimCompactionSelector>::default(),
125    );
126    compaction_selectors.insert(
127        compact_task::TaskType::Ttl,
128        Box::<TtlCompactionSelector>::default(),
129    );
130    compaction_selectors.insert(
131        compact_task::TaskType::Tombstone,
132        Box::<TombstoneCompactionSelector>::default(),
133    );
134    compaction_selectors.insert(
135        compact_task::TaskType::VnodeWatermark,
136        Box::<VnodeWatermarkCompactionSelector>::default(),
137    );
138    compaction_selectors
139}
140
141enum BuiltCompactTask {
142    MetaFinished(CompactTask),
143    PendingAssignment(CompactTask),
144}
145
146impl HummockVersionTransaction<'_> {
147    fn apply_compact_task(&mut self, compact_task: &CompactTask) {
148        let mut version_delta = self.new_delta();
149        let trivial_move = compact_task.is_trivial_move_task();
150        version_delta.trivial_move = trivial_move;
151
152        let group_deltas = &mut version_delta
153            .group_deltas
154            .entry(compact_task.compaction_group_id)
155            .or_default()
156            .group_deltas;
157        let mut removed_table_ids_map: BTreeMap<u32, HashSet<HummockSstableId>> =
158            BTreeMap::default();
159
160        for level in &compact_task.input_ssts {
161            let level_idx = level.level_idx;
162
163            removed_table_ids_map
164                .entry(level_idx)
165                .or_default()
166                .extend(level.table_infos.iter().map(|sst| sst.sst_id));
167        }
168
169        for (level_idx, removed_table_ids) in removed_table_ids_map {
170            let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
171                level_idx,
172                0, // default
173                removed_table_ids,
174                vec![], // default
175                0,      // default
176                compact_task.compaction_group_version_id,
177            ));
178
179            group_deltas.push(group_delta);
180        }
181
182        let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
183            compact_task.target_level,
184            compact_task.target_sub_level_id,
185            HashSet::new(), // default
186            compact_task.sorted_output_ssts.clone(),
187            compact_task.split_weight_by_vnode,
188            compact_task.compaction_group_version_id,
189        ));
190
191        group_deltas.push(group_delta);
192        version_delta.pre_apply();
193    }
194}
195
196#[derive(Default)]
197pub struct Compaction {
198    /// Compaction task that is already assigned to a compactor
199    pub compact_task_assignment: BTreeMap<HummockCompactionTaskId, PbCompactTaskAssignment>,
200    /// `CompactStatus` of each compaction group
201    pub compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus>,
202
203    pub _deterministic_mode: bool,
204}
205
206impl HummockManager {
207    pub async fn get_assigned_compact_task_num(&self) -> u64 {
208        self.compaction.read().await.compact_task_assignment.len() as u64
209    }
210
211    pub async fn list_compaction_status(
212        &self,
213    ) -> (Vec<PbCompactStatus>, Vec<CompactTaskAssignment>) {
214        let compaction = self.compaction.read().await;
215        (
216            compaction.compaction_statuses.values().map_into().collect(),
217            compaction
218                .compact_task_assignment
219                .values()
220                .cloned()
221                .collect(),
222        )
223    }
224
225    pub async fn get_compaction_scores(
226        &self,
227        compaction_group_id: CompactionGroupId,
228    ) -> Vec<PickerInfo> {
229        let (status, levels, group) = {
230            let compaction = self.compaction.read().await;
231            let versioning = self.versioning.read().await;
232            let config_manager = self.compaction_group_manager.read().await;
233            match (
234                compaction.compaction_statuses.get(&compaction_group_id),
235                versioning.current_version.levels.get(&compaction_group_id),
236                config_manager.try_get_compaction_group_config(compaction_group_id),
237            ) {
238                (Some(cs), Some(v), Some(cf)) => (cs.to_owned(), v.to_owned(), cf),
239                _ => {
240                    return vec![];
241                }
242            }
243        };
244        let dynamic_level_core = DynamicLevelSelectorCore::new(
245            group.compaction_config,
246            Arc::new(CompactionDeveloperConfig::default()),
247        );
248        let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers);
249        ctx.score_levels
250    }
251}
252
253impl HummockManager {
254    pub fn compaction_event_loop(
255        hummock_manager: Arc<Self>,
256        compactor_streams_change_rx: UnboundedReceiver<(
257            HummockContextId,
258            Streaming<SubscribeCompactionEventRequest>,
259        )>,
260    ) -> Vec<(JoinHandle<()>, Sender<()>)> {
261        let mut join_handle_vec = Vec::default();
262
263        let hummock_compaction_event_handler =
264            HummockCompactionEventHandler::new(hummock_manager.clone());
265
266        let dedicated_event_loop = HummockCompactorDedicatedEventLoop::new(
267            hummock_manager.clone(),
268            hummock_compaction_event_handler.clone(),
269        );
270
271        let (dedicated_event_loop_join_handle, event_tx, shutdown_tx) = dedicated_event_loop.run();
272        join_handle_vec.push((dedicated_event_loop_join_handle, shutdown_tx));
273
274        let hummock_compaction_event_dispatcher = HummockCompactionEventDispatcher::new(
275            hummock_manager.env.opts.clone(),
276            hummock_compaction_event_handler,
277            Some(event_tx),
278        );
279
280        let event_loop = HummockCompactionEventLoop::new(
281            hummock_compaction_event_dispatcher,
282            hummock_manager.metrics.clone(),
283            compactor_streams_change_rx,
284        );
285
286        let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
287        join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
288
289        join_handle_vec
290    }
291
292    pub fn add_compactor_stream(
293        &self,
294        context_id: HummockContextId,
295        req_stream: Streaming<SubscribeCompactionEventRequest>,
296    ) {
297        self.compactor_streams_change_tx
298            .send((context_id, req_stream))
299            .unwrap();
300    }
301}
302
303impl HummockManager {
304    /// Gets one compaction task id with best-effort batching while ensuring concurrent callers
305    /// share the same refill instead of wasting an allocated range.
306    async fn next_compaction_task_id_with_prefetch(&self, refill_capacity: u32) -> Result<u64> {
307        self.prefetched_compaction_task_ids
308            .next(refill_capacity, |count| async move {
309                self.env
310                    .hummock_seq
311                    .next_interval(COMPACTION_TASK_ID, count)
312                    .await
313            })
314            .await
315    }
316
317    pub async fn get_compact_tasks_impl(
318        &self,
319        compaction_groups: Vec<CompactionGroupId>,
320        max_select_count: usize,
321        selector: &mut dyn CompactionSelector,
322    ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
323        let deterministic_mode = self.env.opts.compaction_deterministic_test;
324
325        let mut compaction_guard = self.compaction.write().await;
326        let compaction: &mut Compaction = &mut compaction_guard;
327        let mut versioning_guard = self.versioning.write().await;
328        let versioning: &mut Versioning = &mut versioning_guard;
329
330        let _timer = start_measure_real_process_timer!(self, "get_compact_tasks_impl");
331
332        let start_time = Instant::now();
333        let mut compaction_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
334
335        let mut compact_task_assignment =
336            BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
337
338        let mut version = HummockVersionTransaction::new(
339            &mut versioning.current_version,
340            &mut versioning.hummock_version_deltas,
341            &mut versioning.table_change_log,
342            self.env.notification_manager(),
343            None,
344            &self.metrics,
345            &self.env.opts,
346        );
347        // Apply stats changes.
348        let mut version_stats = HummockVersionStatsTransaction::new(
349            &mut versioning.version_stats,
350            self.env.notification_manager(),
351        );
352
353        if deterministic_mode {
354            version.disable_apply_to_txn();
355        }
356        let all_versioned_table_schemas = if self.env.opts.enable_dropped_column_reclaim {
357            self.metadata_manager
358                .catalog_controller
359                .get_versioned_table_schemas()
360                .await
361                .map_err(|e| Error::Internal(e.into()))?
362        } else {
363            HashMap::default()
364        };
365        let mut unschedule_groups = vec![];
366        let mut trivial_tasks = vec![];
367        let mut pick_tasks = vec![];
368        let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts(
369            &self.env.opts,
370        ));
371        // Reuse prefetched task ids from previous loops.
372        // Each group consumes at most one task_id (trivial tasks share the same id with normal
373        // task). When prefetched ids are exhausted, refill in fixed-size chunks to avoid
374        // per-group SQL transactions while keeping the in-memory cache small.
375        'outside: for compaction_group_id in compaction_groups {
376            if pick_tasks.len() >= max_select_count {
377                break;
378            }
379
380            if !version
381                .latest_version()
382                .levels
383                .contains_key(&compaction_group_id)
384            {
385                continue;
386            }
387
388            // When the last table of a compaction group is deleted, the compaction group (and its
389            // config) is destroyed as well. Then a compaction task for this group may come later and
390            // cannot find its config.
391            let group_config = {
392                let config_manager = self.compaction_group_manager.read().await;
393
394                match config_manager.try_get_compaction_group_config(compaction_group_id) {
395                    Some(config) => config,
396                    None => continue,
397                }
398            };
399
400            // Use prefetched task id if available; when exhausted, refill in chunks first.
401            let task_id = self
402                .next_compaction_task_id_with_prefetch(
403                    self.env.opts.compaction_task_id_refill_capacity,
404                )
405                .await?;
406
407            if !compaction_statuses.contains_key(&compaction_group_id) {
408                // lazy initialize.
409                compaction_statuses.insert(
410                    compaction_group_id,
411                    CompactStatus::new(
412                        compaction_group_id,
413                        group_config.compaction_config.max_level,
414                    ),
415                );
416            }
417            let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap();
418
419            let mut stats = LocalSelectorStatistic::default();
420            let member_table_ids: Vec<_> = version
421                .latest_version()
422                .state_table_info
423                .compaction_group_member_table_ids(compaction_group_id)
424                .iter()
425                .copied()
426                .collect();
427
428            let mut table_id_to_option: HashMap<TableId, _> = HashMap::default();
429
430            {
431                let guard = self.table_id_to_table_option.read();
432                for table_id in &member_table_ids {
433                    if let Some(opts) = guard.get(table_id) {
434                        table_id_to_option.insert(*table_id, *opts);
435                    }
436                }
437            }
438
439            while let Some(picked_task) = compact_status.get_compact_task(
440                version
441                    .latest_version()
442                    .get_compaction_group_levels(compaction_group_id),
443                version
444                    .latest_version()
445                    .state_table_info
446                    .compaction_group_member_table_ids(compaction_group_id),
447                task_id as HummockCompactionTaskId,
448                &group_config,
449                &mut stats,
450                selector,
451                &table_id_to_option,
452                developer_config.clone(),
453                &version.latest_version().table_watermarks,
454                &version.latest_version().state_table_info,
455            ) {
456                let compaction_group_levels = version
457                    .latest_version()
458                    .get_compaction_group_levels(compaction_group_id);
459                let target_level_id = picked_task.input.target_level as u32;
460                let is_target_level_last = compaction_group_levels.is_last_level(target_level_id);
461                let table_options = table_id_to_option
462                    .iter()
463                    .map(|(table_id, table_option)| (*table_id, TableOption::from(table_option)))
464                    .collect();
465                let built_compact_task = self.build_ready_compact_task(
466                    picked_task,
467                    CompactTaskBuildContext {
468                        task_id,
469                        compaction_group_id: group_config.group_id,
470                        compaction_group_version_id: compaction_group_levels
471                            .compaction_group_version_id,
472                        existing_table_ids: member_table_ids.clone(),
473                        table_options,
474                        is_target_level_last,
475                        compaction_config: group_config.compaction_config.clone(),
476                        current_epoch_time: Epoch::now().0,
477                    },
478                    &version.latest_version().table_watermarks,
479                    &all_versioned_table_schemas,
480                );
481
482                match built_compact_task {
483                    BuiltCompactTask::MetaFinished(compact_task) => {
484                        let label = compact_task.task_label();
485                        tracing::debug!(
486                            "{} for compaction group {}: input: {:?}, cost time: {:?}",
487                            label,
488                            compact_task.compaction_group_id,
489                            compact_task.input_ssts,
490                            start_time.elapsed()
491                        );
492                        compact_status.report_compact_task(&compact_task);
493                        update_table_stats_for_vnode_watermark_trivial_reclaim(
494                            &mut version_stats.table_stats,
495                            &compact_task,
496                        );
497                        self.metrics
498                            .compact_frequency
499                            .with_label_values(&[
500                                label,
501                                &compact_task.compaction_group_id.to_string(),
502                                selector.task_type().as_str_name(),
503                                "SUCCESS",
504                            ])
505                            .inc();
506
507                        version.apply_compact_task(&compact_task);
508                        trivial_tasks.push(compact_task);
509                        if trivial_tasks.len() >= self.env.opts.max_trivial_move_task_count_per_loop
510                        {
511                            break 'outside;
512                        }
513                    }
514                    BuiltCompactTask::PendingAssignment(compact_task) => {
515                        compact_task_assignment.insert(
516                            compact_task.task_id,
517                            CompactTaskAssignment {
518                                compact_task: Some(compact_task.clone().into()),
519                                context_id: META_NODE_ID, // deprecated
520                            },
521                        );
522
523                        pick_tasks.push(compact_task);
524                        break;
525                    }
526                }
527
528                stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
529                stats = LocalSelectorStatistic::default();
530            }
531            if pick_tasks
532                .last()
533                .map(|task| task.compaction_group_id != compaction_group_id)
534                .unwrap_or(true)
535            {
536                unschedule_groups.push(compaction_group_id);
537            }
538            stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
539        }
540
541        if !trivial_tasks.is_empty() {
542            commit_multi_var!(
543                self.meta_store_ref(),
544                compaction_statuses,
545                compact_task_assignment,
546                version,
547                version_stats
548            )?;
549            self.metrics
550                .compact_task_batch_count
551                .with_label_values(&["batch_trivial_move"])
552                .observe(trivial_tasks.len() as f64);
553
554            for trivial_task in &trivial_tasks {
555                self.metrics
556                    .compact_task_trivial_move_sst_count
557                    .with_label_values(&[&trivial_task.compaction_group_id.to_string()])
558                    .observe(trivial_task.input_ssts[0].table_infos.len() as _);
559            }
560
561            drop(versioning_guard);
562        } else {
563            // We are using a single transaction to ensure that each task has progress when it is
564            // created.
565            drop(versioning_guard);
566            commit_multi_var!(
567                self.meta_store_ref(),
568                compaction_statuses,
569                compact_task_assignment
570            )?;
571        }
572        drop(compaction_guard);
573        if !pick_tasks.is_empty() {
574            self.metrics
575                .compact_task_batch_count
576                .with_label_values(&["batch_get_compact_task"])
577                .observe(pick_tasks.len() as f64);
578        }
579
580        for compact_task in &mut pick_tasks {
581            let compaction_group_id = compact_task.compaction_group_id;
582
583            // Initiate heartbeat for the task to track its progress.
584            self.compactor_manager
585                .initiate_task_heartbeat(compact_task.clone());
586
587            // this task has been finished.
588            compact_task.task_status = TaskStatus::Pending;
589            let compact_task_statistics = statistics_compact_task(compact_task);
590
591            let level_type_label = build_compact_task_level_type_metrics_label(
592                compact_task.input_ssts[0].level_idx as usize,
593                compact_task.input_ssts.last().unwrap().level_idx as usize,
594            );
595
596            let level_count = compact_task.input_ssts.len();
597            if compact_task.input_ssts[0].level_idx == 0 {
598                self.metrics
599                    .l0_compact_level_count
600                    .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
601                    .observe(level_count as _);
602            }
603
604            self.metrics
605                .compact_task_size
606                .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
607                .observe(compact_task_statistics.total_file_size as _);
608
609            self.metrics
610                .compact_task_size
611                .with_label_values(&[
612                    &compaction_group_id.to_string(),
613                    &format!("{} uncompressed", level_type_label),
614                ])
615                .observe(compact_task_statistics.total_uncompressed_file_size as _);
616
617            self.metrics
618                .compact_task_file_count
619                .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
620                .observe(compact_task_statistics.total_file_count as _);
621
622            tracing::trace!(
623                "For compaction group {}: pick up {} {} sub_level in level {} to compact to target {}. cost time: {:?} compact_task_statistics {:?}",
624                compaction_group_id,
625                level_count,
626                compact_task.input_ssts[0].level_type.as_str_name(),
627                compact_task.input_ssts[0].level_idx,
628                compact_task.target_level,
629                start_time.elapsed(),
630                compact_task_statistics
631            );
632        }
633
634        #[cfg(test)]
635        {
636            self.check_state_consistency().await;
637        }
638        pick_tasks.extend(trivial_tasks);
639        Ok((pick_tasks, unschedule_groups))
640    }
641
642    /// Cancels a compaction task no matter it's assigned or unassigned.
643    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
644        fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore(
645            anyhow::anyhow!("failpoint metastore err")
646        )));
647        let ret = self
648            .cancel_compact_task_impl(vec![task_id], task_status)
649            .await?;
650        Ok(ret[0])
651    }
652
653    pub async fn cancel_compact_tasks(
654        &self,
655        tasks: Vec<u64>,
656        task_status: TaskStatus,
657    ) -> Result<Vec<bool>> {
658        self.cancel_compact_task_impl(tasks, task_status).await
659    }
660
661    async fn cancel_compact_task_impl(
662        &self,
663        task_ids: Vec<u64>,
664        task_status: TaskStatus,
665    ) -> Result<Vec<bool>> {
666        assert!(CANCEL_STATUS_SET.contains(&task_status));
667        let tasks = task_ids
668            .into_iter()
669            .map(|task_id| ReportTask {
670                task_id,
671                task_status,
672                sorted_output_ssts: vec![],
673                table_stats_change: HashMap::default(),
674                object_timestamps: HashMap::default(),
675            })
676            .collect_vec();
677        let rets = self.report_compact_tasks(tasks).await?;
678        #[cfg(test)]
679        {
680            self.check_state_consistency().await;
681        }
682        Ok(rets)
683    }
684
685    async fn get_compact_tasks(
686        &self,
687        mut compaction_groups: Vec<CompactionGroupId>,
688        max_select_count: usize,
689        selector: &mut dyn CompactionSelector,
690    ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
691        fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
692            anyhow::anyhow!("failpoint metastore error")
693        )));
694        compaction_groups.shuffle(&mut thread_rng());
695        let (mut tasks, groups) = self
696            .get_compact_tasks_impl(compaction_groups, max_select_count, selector)
697            .await?;
698        tasks.retain(|task| {
699            if task.task_status == TaskStatus::Success {
700                debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
701                false
702            } else {
703                true
704            }
705        });
706        Ok((tasks, groups))
707    }
708
709    pub async fn get_compact_task(
710        &self,
711        compaction_group_id: CompactionGroupId,
712        selector: &mut dyn CompactionSelector,
713    ) -> Result<Option<CompactTask>> {
714        fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
715            anyhow::anyhow!("failpoint metastore error")
716        )));
717
718        let (normal_tasks, _) = self
719            .get_compact_tasks_impl(vec![compaction_group_id], 1, selector)
720            .await?;
721        for task in normal_tasks {
722            if task.task_status != TaskStatus::Success {
723                return Ok(Some(task));
724            }
725            debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
726        }
727        Ok(None)
728    }
729
730    pub async fn manual_get_compact_task(
731        &self,
732        compaction_group_id: CompactionGroupId,
733        manual_compaction_option: ManualCompactionOption,
734    ) -> Result<Option<CompactTask>> {
735        let (task, _) = self
736            .manual_get_compact_task_with_info(compaction_group_id, manual_compaction_option)
737            .await?;
738        Ok(task)
739    }
740
741    pub async fn manual_get_compact_task_with_info(
742        &self,
743        compaction_group_id: CompactionGroupId,
744        manual_compaction_option: ManualCompactionOption,
745    ) -> Result<(Option<CompactTask>, bool)> {
746        let mut selector = ManualCompactionSelector::new(manual_compaction_option);
747        let task = self
748            .get_compact_task(compaction_group_id, &mut selector)
749            .await?;
750        Ok((task, selector.blocked_by_pending()))
751    }
752
753    pub async fn report_compact_task(
754        &self,
755        task_id: u64,
756        task_status: TaskStatus,
757        sorted_output_ssts: Vec<SstableInfo>,
758        table_stats_change: Option<PbTableStatsMap>,
759        object_timestamps: HashMap<HummockSstableObjectId, u64>,
760    ) -> Result<bool> {
761        let rets = self
762            .report_compact_tasks(vec![ReportTask {
763                task_id,
764                task_status,
765                sorted_output_ssts,
766                table_stats_change: table_stats_change.unwrap_or_default(),
767                object_timestamps,
768            }])
769            .await?;
770        Ok(rets[0])
771    }
772
773    pub async fn report_compact_tasks(&self, report_tasks: Vec<ReportTask>) -> Result<Vec<bool>> {
774        let compaction_guard = self.compaction.write().await;
775        let versioning_guard = self.versioning.write().await;
776
777        self.report_compact_tasks_impl(report_tasks, compaction_guard, versioning_guard)
778            .await
779    }
780
781    /// Finishes or cancels a compaction task, according to `task_status`.
782    ///
783    /// If `context_id` is not None, its validity will be checked when writing meta store.
784    /// Its ownership of the task is checked as well.
785    ///
786    /// Return Ok(false) indicates either the task is not found,
787    /// or the task is not owned by `context_id` when `context_id` is not None.
788    pub async fn report_compact_tasks_impl(
789        &self,
790        report_tasks: Vec<ReportTask>,
791        mut compaction_guard: RwLockWriteGuard<'_, Compaction>,
792        mut versioning_guard: RwLockWriteGuard<'_, Versioning>,
793    ) -> Result<Vec<bool>> {
794        let deterministic_mode = self.env.opts.compaction_deterministic_test;
795        let compaction: &mut Compaction = &mut compaction_guard;
796        let start_time = Instant::now();
797        let original_keys = compaction.compaction_statuses.keys().cloned().collect_vec();
798        let mut compact_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
799        let mut rets = vec![false; report_tasks.len()];
800        let mut compact_task_assignment =
801            BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
802        // The compaction task is finished.
803        let versioning: &mut Versioning = &mut versioning_guard;
804        let _timer = start_measure_real_process_timer!(self, "report_compact_tasks");
805
806        // purge stale compact_status
807        for group_id in original_keys {
808            if !versioning.current_version.levels.contains_key(&group_id) {
809                compact_statuses.remove(group_id);
810            }
811        }
812        let mut tasks = vec![];
813
814        let mut version = HummockVersionTransaction::new(
815            &mut versioning.current_version,
816            &mut versioning.hummock_version_deltas,
817            &mut versioning.table_change_log,
818            self.env.notification_manager(),
819            None,
820            &self.metrics,
821            &self.env.opts,
822        );
823
824        if deterministic_mode {
825            version.disable_apply_to_txn();
826        }
827
828        let mut version_stats = HummockVersionStatsTransaction::new(
829            &mut versioning.version_stats,
830            self.env.notification_manager(),
831        );
832        let mut success_count = 0;
833        let mut report_results = Vec::with_capacity(rets.len());
834        for (idx, task) in report_tasks.into_iter().enumerate() {
835            rets[idx] = true;
836            let task_id = task.task_id;
837            let mut task_status = task.task_status;
838            let mut compact_task = match compact_task_assignment.remove(task.task_id) {
839                Some(compact_task) => CompactTask::from(compact_task.compact_task.unwrap()),
840                None => {
841                    tracing::warn!("{}", format!("compact task {} not found", task.task_id));
842                    rets[idx] = false;
843                    report_results.push(CompactionTaskReportResult {
844                        task_id,
845                        task_status,
846                        reported: false,
847                    });
848                    continue;
849                }
850            };
851
852            {
853                // apply result
854                compact_task.task_status = task.task_status;
855                compact_task.sorted_output_ssts = task.sorted_output_ssts;
856            }
857
858            match compact_statuses.get_mut(compact_task.compaction_group_id) {
859                Some(mut compact_status) => {
860                    compact_status.report_compact_task(&compact_task);
861                }
862                None => {
863                    // When the group_id is not found in the compaction_statuses, it means the group has been removed.
864                    // The task is invalid and should be canceled.
865                    // e.g.
866                    // 1. The group is removed by the user unregistering the tables
867                    // 2. The group is removed by the group scheduling algorithm
868                    compact_task.task_status = TaskStatus::InvalidGroupCanceled;
869                }
870            }
871
872            let is_success = if let TaskStatus::Success = compact_task.task_status {
873                match self
874                    .report_compaction_sanity_check(&task.object_timestamps)
875                    .await
876                {
877                    Err(e) => {
878                        warn!(
879                            "failed to commit compaction task {} {}",
880                            compact_task.task_id,
881                            e.as_report()
882                        );
883                        compact_task.task_status = TaskStatus::RetentionTimeRejected;
884                        false
885                    }
886                    _ => {
887                        let group = version
888                            .latest_version()
889                            .levels
890                            .get(&compact_task.compaction_group_id)
891                            .unwrap();
892                        let is_expired = compact_task.is_expired(group.compaction_group_version_id);
893                        if is_expired {
894                            compact_task.task_status = TaskStatus::InputOutdatedCanceled;
895                            warn!(
896                                "The task may be expired because of group split, task:\n {:?}",
897                                compact_task_to_string(&compact_task)
898                            );
899                        }
900                        !is_expired
901                    }
902                }
903            } else {
904                false
905            };
906            if is_success {
907                success_count += 1;
908                version.apply_compact_task(&compact_task);
909                if purge_prost_table_stats(
910                    &mut version_stats.table_stats,
911                    version.latest_version(),
912                    &HashSet::default(),
913                ) {
914                    self.metrics.version_stats.reset();
915                    versioning.local_metrics.clear();
916                }
917                add_prost_table_stats_map(&mut version_stats.table_stats, &task.table_stats_change);
918                trigger_local_table_stat(
919                    &self.metrics,
920                    &mut versioning.local_metrics,
921                    &version_stats,
922                    &task.table_stats_change,
923                );
924            }
925            task_status = compact_task.task_status;
926            report_results.push(CompactionTaskReportResult {
927                task_id,
928                task_status,
929                reported: rets[idx],
930            });
931            tasks.push(compact_task);
932        }
933        if success_count > 0 {
934            commit_multi_var!(
935                self.meta_store_ref(),
936                compact_statuses,
937                compact_task_assignment,
938                version,
939                version_stats
940            )?;
941
942            self.metrics
943                .compact_task_batch_count
944                .with_label_values(&["batch_report_task"])
945                .observe(success_count as f64);
946        } else {
947            // The compaction task is cancelled or failed.
948            commit_multi_var!(
949                self.meta_store_ref(),
950                compact_statuses,
951                compact_task_assignment
952            )?;
953        }
954
955        self.notify_compaction_task_report_waiters(report_results);
956
957        let mut success_groups = vec![];
958        for compact_task in &tasks {
959            self.compactor_manager
960                .remove_task_heartbeat(compact_task.task_id);
961            tracing::trace!(
962                "Reported compaction task. {}. cost time: {:?}",
963                compact_task_to_string(compact_task),
964                start_time.elapsed(),
965            );
966
967            if !deterministic_mode
968                && (matches!(compact_task.task_type, compact_task::TaskType::Dynamic)
969                    || matches!(compact_task.task_type, compact_task::TaskType::Emergency))
970            {
971                // only try send Dynamic compaction
972                self.try_send_compaction_request(
973                    compact_task.compaction_group_id,
974                    compact_task::TaskType::Dynamic,
975                );
976            }
977
978            if compact_task.task_status == TaskStatus::Success {
979                success_groups.push(compact_task.compaction_group_id);
980            }
981        }
982
983        trigger_compact_tasks_stat(
984            &self.metrics,
985            &tasks,
986            &compaction.compaction_statuses,
987            &versioning_guard.current_version,
988        );
989        drop(versioning_guard);
990        if !success_groups.is_empty() {
991            self.try_update_write_limits(&success_groups).await;
992        }
993        Ok(rets)
994    }
995
996    /// Triggers compacitons to specified compaction groups.
997    /// Don't wait for compaction finish
998    pub async fn trigger_compaction_deterministic(
999        &self,
1000        _base_version_id: HummockVersionId,
1001        compaction_groups: Vec<CompactionGroupId>,
1002    ) -> Result<()> {
1003        self.on_current_version(|old_version| {
1004            tracing::info!(
1005                "Trigger compaction for version {}, groups {:?}",
1006                old_version.id,
1007                compaction_groups
1008            );
1009        })
1010        .await;
1011
1012        if compaction_groups.is_empty() {
1013            return Ok(());
1014        }
1015        for compaction_group in compaction_groups {
1016            self.try_send_compaction_request(compaction_group, compact_task::TaskType::Dynamic);
1017        }
1018        Ok(())
1019    }
1020
1021    pub async fn trigger_manual_compaction(
1022        &self,
1023        compaction_group: CompactionGroupId,
1024        manual_compaction_option: ManualCompactionOption,
1025    ) -> Result<ManualCompactionTriggerResult> {
1026        let start_time = Instant::now();
1027        let exclusive = manual_compaction_option.exclusive;
1028
1029        // 1. Get idle compactor.
1030        let compactor = match self.compactor_manager.next_compactor() {
1031            Some(compactor) => compactor,
1032            None => {
1033                tracing::warn!("trigger_manual_compaction No compactor is available.");
1034                return Err(anyhow::anyhow!(
1035                    "trigger_manual_compaction No compactor is available. compaction_group {}",
1036                    compaction_group
1037                )
1038                .into());
1039            }
1040        };
1041
1042        // 2. Get manual compaction task.
1043        let compact_task = self
1044            .manual_get_compact_task_with_info(compaction_group, manual_compaction_option)
1045            .await;
1046        let (compact_task, blocked_by_pending) = match compact_task {
1047            Ok((compact_task, blocked_by_pending)) => (compact_task, blocked_by_pending),
1048            Err(err) => {
1049                tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
1050
1051                return Err(anyhow::anyhow!(err)
1052                    .context(format!(
1053                        "Failed to get compaction task for compaction_group {}",
1054                        compaction_group,
1055                    ))
1056                    .into());
1057            }
1058        };
1059        let compact_task = match compact_task {
1060            Some(compact_task) => compact_task,
1061            None => {
1062                if exclusive && blocked_by_pending {
1063                    return Ok(ManualCompactionTriggerResult::Retry);
1064                }
1065                // No compaction task available.
1066                return Err(anyhow::anyhow!(
1067                    "trigger_manual_compaction No compaction_task is available. compaction_group {}",
1068                    compaction_group
1069                )
1070                .into());
1071            }
1072        };
1073
1074        // 3. send task to compactor
1075        let task_id = compact_task.task_id;
1076        let compact_task_string = compact_task_to_string(&compact_task);
1077        tracing::info!(
1078            compact_task_string,
1079            duration = ?start_time.elapsed(),
1080            "Triggered manual compaction task."
1081        );
1082
1083        let report_rx = self.register_compaction_task_report_waiter(task_id);
1084        if let Err(err) = compactor
1085            .send_event(ResponseEvent::CompactTask(compact_task.into()))
1086            .with_context(|| {
1087                format!(
1088                    "Failed to trigger compaction task for compaction_group {}",
1089                    compaction_group,
1090                )
1091            })
1092        {
1093            self.remove_compaction_task_report_waiter(task_id);
1094            return Err(err.into());
1095        }
1096
1097        let report_result = match report_rx.await {
1098            Ok(result) => result,
1099            Err(_) => {
1100                self.remove_compaction_task_report_waiter(task_id);
1101                return Err(anyhow::anyhow!(
1102                    "trigger_manual_compaction wait report failed. compaction_group {}",
1103                    compaction_group
1104                )
1105                .into());
1106            }
1107        };
1108        if !report_result.reported {
1109            return Err(anyhow::anyhow!(
1110                "trigger_manual_compaction report not accepted. task_id {}",
1111                report_result.task_id
1112            )
1113            .into());
1114        }
1115
1116        if report_result.task_status == TaskStatus::NoAvailCpuResourceCanceled
1117            || report_result.task_status == TaskStatus::NoAvailMemoryResourceCanceled
1118        {
1119            return Ok(ManualCompactionTriggerResult::Retry);
1120        }
1121
1122        tracing::info!(
1123            ?report_result,
1124            duration = ?start_time.elapsed(),
1125            "Completed manual compaction task."
1126        );
1127
1128        Ok(ManualCompactionTriggerResult::Submitted)
1129    }
1130
1131    /// Sends a compaction request for new data (clears cooldown).
1132    pub fn try_send_compaction_request(
1133        &self,
1134        compaction_group: CompactionGroupId,
1135        task_type: compact_task::TaskType,
1136    ) -> bool {
1137        self.compaction_state.try_sched_compaction(
1138            compaction_group,
1139            task_type,
1140            ScheduleTrigger::NewData,
1141        )
1142    }
1143
1144    /// Apply `split_weight_by_vnode` based partition strategy.
1145    /// This handles dynamic partitioning based on table size and write throughput.
1146    fn apply_split_weight_by_vnode_partition(
1147        &self,
1148        compact_task: &mut CompactTask,
1149        compaction_config: &CompactionConfig,
1150        compact_table_ids: &[TableId],
1151    ) {
1152        if compaction_config.split_weight_by_vnode > 0 {
1153            for table_id in compact_table_ids {
1154                compact_task
1155                    .table_vnode_partition
1156                    .insert(*table_id, compact_task.split_weight_by_vnode);
1157            }
1158
1159            return;
1160        }
1161
1162        // Calculate per-table size from normalized input SSTs.
1163        let mut table_size_info: HashMap<TableId, u64> = HashMap::default();
1164        for input_ssts in &compact_task.input_ssts {
1165            for sst in &input_ssts.table_infos {
1166                for table_id in &sst.table_ids {
1167                    *table_size_info.entry(*table_id).or_default() +=
1168                        sst.sst_size / (sst.table_ids.len() as u64);
1169                }
1170            }
1171        }
1172
1173        let hybrid_vnode_count = self.env.opts.hybrid_partition_node_count;
1174        let default_partition_count = self.env.opts.partition_vnode_count;
1175        let compact_task_table_size_partition_threshold_low = self
1176            .env
1177            .opts
1178            .compact_task_table_size_partition_threshold_low;
1179        let compact_task_table_size_partition_threshold_high = self
1180            .env
1181            .opts
1182            .compact_task_table_size_partition_threshold_high;
1183
1184        // Check latest write throughput
1185        let table_write_throughput_statistic_manager =
1186            self.table_write_throughput_statistic_manager.read();
1187        let timestamp = chrono::Utc::now().timestamp();
1188
1189        for (table_id, compact_table_size) in table_size_info {
1190            let write_throughput = table_write_throughput_statistic_manager
1191                .get_table_throughput_descending(table_id, timestamp)
1192                .peekable()
1193                .peek()
1194                .map(|item| item.throughput)
1195                .unwrap_or(0);
1196
1197            if compact_table_size > compact_task_table_size_partition_threshold_high
1198                && default_partition_count > 0
1199            {
1200                compact_task
1201                    .table_vnode_partition
1202                    .insert(table_id, default_partition_count);
1203            } else if (compact_table_size > compact_task_table_size_partition_threshold_low
1204                || (write_throughput > self.env.opts.table_high_write_throughput_threshold
1205                    && compact_table_size > compaction_config.target_file_size_base))
1206                && hybrid_vnode_count > 0
1207            {
1208                compact_task
1209                    .table_vnode_partition
1210                    .insert(table_id, hybrid_vnode_count);
1211            } else if compact_table_size > compaction_config.target_file_size_base {
1212                compact_task.table_vnode_partition.insert(table_id, 1);
1213            }
1214        }
1215
1216        compact_task
1217            .table_vnode_partition
1218            .retain(|table_id, _| compact_table_ids.contains(table_id));
1219    }
1220
1221    pub(crate) fn calculate_vnode_partition(
1222        &self,
1223        compact_task: &mut CompactTask,
1224        compaction_config: &CompactionConfig,
1225        compact_table_ids: &[TableId],
1226    ) {
1227        // Do not split sst by vnode partition when target_level > base_level
1228        // The purpose of data alignment is mainly to improve the parallelism of base level compaction
1229        // and reduce write amplification. However, at high level, the size of the sst file is often
1230        // larger and only contains the data of a single table_id, so there is no need to cut it.
1231        if compact_task.target_level > compact_task.base_level {
1232            return;
1233        }
1234
1235        // Apply split_weight_by_vnode based partition strategy
1236        self.apply_split_weight_by_vnode_partition(
1237            compact_task,
1238            compaction_config,
1239            compact_table_ids,
1240        );
1241    }
1242
1243    fn build_ready_compact_task(
1244        &self,
1245        picked_task: PickedCompactionTask,
1246        context: CompactTaskBuildContext,
1247        table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
1248        all_versioned_table_schemas: &HashMap<TableId, Vec<i32>>,
1249    ) -> BuiltCompactTask {
1250        let compaction_config = context.compaction_config.clone();
1251        let (mut compact_task, compact_table_ids) = build_base_compact_task(picked_task, context);
1252
1253        if compact_task.is_trivial_reclaim() {
1254            compact_task.task_status = TaskStatus::Success;
1255            compact_task.sorted_output_ssts.clear();
1256            return BuiltCompactTask::MetaFinished(compact_task);
1257        }
1258
1259        if compact_task.is_trivial_move_task() {
1260            compact_task.task_status = TaskStatus::Success;
1261            compact_task.sorted_output_ssts = compact_task.input_ssts[0]
1262                .read_sstable_infos()
1263                .cloned()
1264                .collect();
1265            return BuiltCompactTask::MetaFinished(compact_task);
1266        }
1267
1268        self.prepare_compact_task_for_assignment(
1269            &mut compact_task,
1270            compaction_config.as_ref(),
1271            &compact_table_ids,
1272            safe_epoch_table_watermarks_impl(table_watermarks, &compact_table_ids),
1273            all_versioned_table_schemas,
1274        );
1275
1276        BuiltCompactTask::PendingAssignment(compact_task)
1277    }
1278
1279    fn prepare_compact_task_for_assignment(
1280        &self,
1281        compact_task: &mut CompactTask,
1282        compaction_config: &CompactionConfig,
1283        compact_table_ids: &[TableId],
1284        table_watermarks: BTreeMap<TableId, TableWatermarks>,
1285        all_versioned_table_schemas: &HashMap<TableId, Vec<i32>>,
1286    ) {
1287        self.calculate_vnode_partition(compact_task, compaction_config, compact_table_ids);
1288        attach_compact_task_table_metadata(
1289            compact_task,
1290            compact_table_ids,
1291            table_watermarks,
1292            all_versioned_table_schemas,
1293        );
1294    }
1295
1296    pub fn compactor_manager_ref(&self) -> crate::hummock::CompactorManagerRef {
1297        self.compactor_manager.clone()
1298    }
1299
1300    fn register_compaction_task_report_waiter(
1301        &self,
1302        task_id: HummockCompactionTaskId,
1303    ) -> Receiver<CompactionTaskReportResult> {
1304        let (tx, rx) = tokio::sync::oneshot::channel();
1305        self.compaction_task_report_notifiers
1306            .lock()
1307            .register(task_id, tx);
1308        rx
1309    }
1310
1311    fn remove_compaction_task_report_waiter(&self, task_id: HummockCompactionTaskId) {
1312        self.compaction_task_report_notifiers.lock().remove(task_id);
1313    }
1314
1315    fn notify_compaction_task_report_waiters(&self, results: Vec<CompactionTaskReportResult>) {
1316        let mut guard = self.compaction_task_report_notifiers.lock();
1317        for result in results {
1318            guard.notify(result);
1319        }
1320    }
1321}
1322
1323#[cfg(any(test, feature = "test"))]
1324impl HummockManager {
1325    pub async fn compaction_task_from_assignment_for_test(
1326        &self,
1327        task_id: u64,
1328    ) -> Option<CompactTaskAssignment> {
1329        let compaction_guard = self.compaction.read().await;
1330        let assignment_ref = &compaction_guard.compact_task_assignment;
1331        assignment_ref.get(&task_id).cloned()
1332    }
1333
1334    pub async fn report_compact_task_for_test(
1335        &self,
1336        task_id: u64,
1337        compact_task: Option<CompactTask>,
1338        task_status: TaskStatus,
1339        sorted_output_ssts: Vec<SstableInfo>,
1340        table_stats_change: Option<PbTableStatsMap>,
1341    ) -> Result<()> {
1342        if let Some(task) = compact_task {
1343            let mut guard = self.compaction.write().await;
1344            guard.compact_task_assignment.insert(
1345                task_id,
1346                CompactTaskAssignment {
1347                    compact_task: Some(task.into()),
1348                    context_id: 0.into(),
1349                },
1350            );
1351        }
1352
1353        // In the test, the contents of the compact task may have been modified directly, while the contents of compact_task_assignment were not modified.
1354        // So we pass the modified compact_task directly into the `report_compact_task_impl`
1355        self.report_compact_tasks(vec![ReportTask {
1356            task_id,
1357            task_status,
1358            sorted_output_ssts,
1359            table_stats_change: table_stats_change.unwrap_or_default(),
1360            object_timestamps: HashMap::default(),
1361        }])
1362        .await?;
1363        Ok(())
1364    }
1365}
1366
1367/// What triggered the compaction schedule request.
1368#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1369pub enum ScheduleTrigger {
1370    /// New data arrived (e.g., `commit_epoch`). Clears cooldown.
1371    NewData,
1372    /// Periodic timer. Respects cooldown for Dynamic type.
1373    Periodic,
1374}
1375
1376/// A point-in-time snapshot of the compaction schedule state.
1377///
1378/// `snapshot_time` is used by `unschedule()` to detect whether new data arrived
1379/// after the snapshot was taken, preventing incorrect cooldown.
1380pub struct CompactionScheduleSnapshot {
1381    scheduled: HashSet<(CompactionGroupId, compact_task::TaskType)>,
1382    snapshot_time: Instant,
1383}
1384
1385impl CompactionScheduleSnapshot {
1386    /// Task type priority order for scheduling (checked first = higher priority).
1387    const TASK_TYPE_PRIORITY: &[TaskType] = &[
1388        TaskType::Dynamic,
1389        TaskType::SpaceReclaim,
1390        TaskType::Ttl,
1391        TaskType::Tombstone,
1392        TaskType::VnodeWatermark,
1393    ];
1394
1395    pub fn snapshot_time(&self) -> Instant {
1396        self.snapshot_time
1397    }
1398
1399    /// Pick compaction groups and task type from this snapshot.
1400    ///
1401    /// Returns groups in shuffled order. Non-Dynamic types have higher priority
1402    /// and return a single group; Dynamic groups are batched together.
1403    pub fn pick_compaction_groups_and_type(&self) -> Option<(Vec<CompactionGroupId>, TaskType)> {
1404        let group_ids = self.group_ids_shuffled();
1405        let mut normal_groups = vec![];
1406        for cg_id in group_ids {
1407            if let Some(pick_type) = self.pick_type(cg_id) {
1408                if pick_type == TaskType::Dynamic {
1409                    normal_groups.push(cg_id);
1410                } else if normal_groups.is_empty() {
1411                    return Some((vec![cg_id], pick_type));
1412                }
1413            }
1414        }
1415        if normal_groups.is_empty() {
1416            None
1417        } else {
1418            Some((normal_groups, TaskType::Dynamic))
1419        }
1420    }
1421
1422    fn group_ids_shuffled(&self) -> Vec<CompactionGroupId> {
1423        let mut group_ids: Vec<_> = self.scheduled.iter().map(|(g, _)| *g).unique().collect();
1424        group_ids.shuffle(&mut thread_rng());
1425        group_ids
1426    }
1427
1428    fn pick_type(&self, group: CompactionGroupId) -> Option<TaskType> {
1429        Self::TASK_TYPE_PRIORITY
1430            .iter()
1431            .find(|t| self.scheduled.contains(&(group, **t)))
1432            .copied()
1433    }
1434}
1435
1436/// Tracks which (`compaction_group`, `task_type`) pairs are scheduled for compaction.
1437///
1438/// For `Dynamic` type, includes a cooldown mechanism: groups with no compaction work
1439/// are skipped by periodic triggers until new data arrives via `commit_epoch`.
1440#[derive(Debug, Default)]
1441pub struct CompactionState {
1442    inner: Mutex<CompactionStateInner>,
1443}
1444
1445#[derive(Debug, Default)]
1446struct CompactionStateInner {
1447    scheduled: HashSet<(CompactionGroupId, compact_task::TaskType)>,
1448    /// Groups skipped by periodic Dynamic trigger until new data arrives.
1449    dynamic_cooldown: HashSet<CompactionGroupId>,
1450    /// Tracks new-data arrival time per group for cooldown race detection.
1451    last_new_data_time: HashMap<CompactionGroupId, Instant>,
1452}
1453
1454impl CompactionState {
1455    pub fn new() -> Self {
1456        Self {
1457            inner: Default::default(),
1458        }
1459    }
1460
1461    /// Enqueues a compaction request. Returns `true` if newly scheduled.
1462    ///
1463    /// `trigger` only affects `Dynamic` type — see [`ScheduleTrigger`].
1464    pub fn try_sched_compaction(
1465        &self,
1466        compaction_group: CompactionGroupId,
1467        task_type: TaskType,
1468        trigger: ScheduleTrigger,
1469    ) -> bool {
1470        let mut guard = self.inner.lock();
1471        if task_type == TaskType::Dynamic {
1472            match trigger {
1473                ScheduleTrigger::NewData => {
1474                    guard.dynamic_cooldown.remove(&compaction_group);
1475                    guard
1476                        .last_new_data_time
1477                        .insert(compaction_group, Instant::now());
1478                }
1479                ScheduleTrigger::Periodic => {
1480                    if guard.dynamic_cooldown.contains(&compaction_group) {
1481                        return false;
1482                    }
1483                }
1484            }
1485        }
1486        guard.scheduled.insert((compaction_group, task_type))
1487    }
1488
1489    /// Removes a scheduled entry. For Dynamic type, adds to cooldown unless
1490    /// new data arrived after `snapshot_time`.
1491    pub fn unschedule(
1492        &self,
1493        compaction_group: CompactionGroupId,
1494        task_type: compact_task::TaskType,
1495        snapshot_time: Instant,
1496    ) {
1497        let mut guard = self.inner.lock();
1498        guard.scheduled.remove(&(compaction_group, task_type));
1499        if task_type == TaskType::Dynamic {
1500            let has_new_data = guard
1501                .last_new_data_time
1502                .get(&compaction_group)
1503                .is_some_and(|t| *t > snapshot_time);
1504            if !has_new_data {
1505                guard.dynamic_cooldown.insert(compaction_group);
1506            }
1507        }
1508    }
1509
1510    /// Takes a snapshot of the current schedule state.
1511    pub fn snapshot(&self) -> CompactionScheduleSnapshot {
1512        let guard = self.inner.lock();
1513        // Record time after lock to ensure accurate ordering vs. try_sched_compaction
1514        let snapshot_time = Instant::now();
1515        CompactionScheduleSnapshot {
1516            scheduled: guard.scheduled.clone(),
1517            snapshot_time,
1518        }
1519    }
1520
1521    /// Removes all schedule state for a deleted or merged group.
1522    pub fn remove_compaction_group(&self, compaction_group: CompactionGroupId) {
1523        let mut guard = self.inner.lock();
1524        guard
1525            .scheduled
1526            .retain(|(group, _)| *group != compaction_group);
1527        guard.dynamic_cooldown.remove(&compaction_group);
1528        guard.last_new_data_time.remove(&compaction_group);
1529    }
1530}
1531
1532impl Compaction {
1533    pub fn get_compact_task_assignments_by_group_id(
1534        &self,
1535        compaction_group_id: CompactionGroupId,
1536    ) -> Vec<CompactTaskAssignment> {
1537        self.compact_task_assignment
1538            .iter()
1539            .filter_map(|(_, assignment)| {
1540                if assignment
1541                    .compact_task
1542                    .as_ref()
1543                    .is_some_and(|task| task.compaction_group_id == compaction_group_id)
1544                {
1545                    Some(CompactTaskAssignment {
1546                        compact_task: assignment.compact_task.clone(),
1547                        context_id: assignment.context_id,
1548                    })
1549                } else {
1550                    None
1551                }
1552            })
1553            .collect()
1554    }
1555}
1556
1557#[derive(Clone, Default)]
1558pub struct CompactionGroupStatistic {
1559    pub group_id: CompactionGroupId,
1560    pub group_size: u64,
1561    pub table_statistic: BTreeMap<StateTableId, u64>,
1562    pub compaction_group_config: CompactionGroup,
1563}
1564
1565/// Updates table stats caused by vnode watermark trivial reclaim compaction.
1566fn update_table_stats_for_vnode_watermark_trivial_reclaim(
1567    table_stats: &mut PbTableStatsMap,
1568    task: &CompactTask,
1569) {
1570    if task.task_type != TaskType::VnodeWatermark {
1571        return;
1572    }
1573    let mut deleted_table_keys: HashMap<TableId, u64> = HashMap::default();
1574    for s in task.input_ssts.iter().flat_map(|l| l.table_infos.iter()) {
1575        assert_eq!(s.table_ids.len(), 1);
1576        let e = deleted_table_keys.entry(s.table_ids[0]).or_insert(0);
1577        *e += s.total_key_count;
1578    }
1579    for (table_id, delete_count) in deleted_table_keys {
1580        let Some(stats) = table_stats.get_mut(&table_id) else {
1581            continue;
1582        };
1583        if stats.total_key_count == 0 {
1584            continue;
1585        }
1586        let new_total_key_count = stats.total_key_count.saturating_sub(delete_count as i64);
1587        let ratio = new_total_key_count as f64 / stats.total_key_count as f64;
1588        // total_key_count is updated accurately.
1589        stats.total_key_count = new_total_key_count;
1590        // others are updated approximately.
1591        stats.total_key_size = (stats.total_key_size as f64 * ratio).ceil() as i64;
1592        stats.total_value_size = (stats.total_value_size as f64 * ratio).ceil() as i64;
1593    }
1594}
1595
1596#[derive(Debug, Clone)]
1597pub enum GroupState {
1598    /// The compaction group is not in emergency state.
1599    Normal,
1600
1601    /// The compaction group is in emergency state.
1602    Emergency(String), // reason
1603
1604    /// The compaction group is in write stop state.
1605    WriteStop(String), // reason
1606}
1607
1608impl GroupState {
1609    pub fn is_write_stop(&self) -> bool {
1610        matches!(self, Self::WriteStop(_))
1611    }
1612
1613    pub fn is_emergency(&self) -> bool {
1614        matches!(self, Self::Emergency(_))
1615    }
1616
1617    pub fn reason(&self) -> Option<&str> {
1618        match self {
1619            Self::Emergency(reason) | Self::WriteStop(reason) => Some(reason),
1620            _ => None,
1621        }
1622    }
1623}
1624
1625#[derive(Clone, Default)]
1626pub struct GroupStateValidator;
1627
1628impl GroupStateValidator {
1629    pub fn write_stop_sub_level_count(
1630        level_count: usize,
1631        compaction_config: &CompactionConfig,
1632    ) -> bool {
1633        let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize;
1634        level_count > threshold
1635    }
1636
1637    pub fn write_stop_l0_size(l0_size: u64, compaction_config: &CompactionConfig) -> bool {
1638        l0_size
1639            > compaction_config
1640                .level0_stop_write_threshold_max_size
1641                .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1642    }
1643
1644    pub fn write_stop_l0_file_count(
1645        l0_file_count: usize,
1646        compaction_config: &CompactionConfig,
1647    ) -> bool {
1648        l0_file_count
1649            > compaction_config
1650                .level0_stop_write_threshold_max_sst_count
1651                .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1652                as usize
1653    }
1654
1655    pub fn emergency_l0_file_count(
1656        l0_file_count: usize,
1657        compaction_config: &CompactionConfig,
1658    ) -> bool {
1659        l0_file_count
1660            > compaction_config
1661                .emergency_level0_sst_file_count
1662                .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1663                as usize
1664    }
1665
1666    pub fn emergency_l0_partition_count(
1667        last_l0_sub_level_partition_count: usize,
1668        compaction_config: &CompactionConfig,
1669    ) -> bool {
1670        last_l0_sub_level_partition_count
1671            > compaction_config
1672                .emergency_level0_sub_level_partition
1673                .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1674                as usize
1675    }
1676
1677    pub fn check_single_group_write_stop(
1678        levels: &Levels,
1679        compaction_config: &CompactionConfig,
1680    ) -> GroupState {
1681        if Self::write_stop_sub_level_count(levels.l0.sub_levels.len(), compaction_config) {
1682            return GroupState::WriteStop(format!(
1683                "WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels",
1684                levels.l0.sub_levels.len(),
1685                compaction_config.level0_stop_write_threshold_sub_level_number
1686            ));
1687        }
1688
1689        if Self::write_stop_l0_file_count(
1690            levels
1691                .l0
1692                .sub_levels
1693                .iter()
1694                .map(|l| l.table_infos.len())
1695                .sum(),
1696            compaction_config,
1697        ) {
1698            return GroupState::WriteStop(format!(
1699                "WriteStop(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1700                levels
1701                    .l0
1702                    .sub_levels
1703                    .iter()
1704                    .map(|l| l.table_infos.len())
1705                    .sum::<usize>(),
1706                compaction_config
1707                    .level0_stop_write_threshold_max_sst_count
1708                    .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1709            ));
1710        }
1711
1712        if Self::write_stop_l0_size(levels.l0.total_file_size, compaction_config) {
1713            return GroupState::WriteStop(format!(
1714                "WriteStop(l0_size: {}, threshold: {}) too large L0 size",
1715                levels.l0.total_file_size,
1716                compaction_config
1717                    .level0_stop_write_threshold_max_size
1718                    .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1719            ));
1720        }
1721
1722        GroupState::Normal
1723    }
1724
1725    pub fn check_single_group_emergency(
1726        levels: &Levels,
1727        compaction_config: &CompactionConfig,
1728    ) -> GroupState {
1729        if Self::emergency_l0_file_count(
1730            levels
1731                .l0
1732                .sub_levels
1733                .iter()
1734                .map(|l| l.table_infos.len())
1735                .sum(),
1736            compaction_config,
1737        ) {
1738            return GroupState::Emergency(format!(
1739                "Emergency(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1740                levels
1741                    .l0
1742                    .sub_levels
1743                    .iter()
1744                    .map(|l| l.table_infos.len())
1745                    .sum::<usize>(),
1746                compaction_config
1747                    .emergency_level0_sst_file_count
1748                    .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1749            ));
1750        }
1751
1752        if Self::emergency_l0_partition_count(
1753            levels
1754                .l0
1755                .sub_levels
1756                .first()
1757                .map(|l| l.table_infos.len())
1758                .unwrap_or(0),
1759            compaction_config,
1760        ) {
1761            return GroupState::Emergency(format!(
1762                "Emergency(l0_partition_count: {}, threshold: {}) too many L0 partitions",
1763                levels
1764                    .l0
1765                    .sub_levels
1766                    .first()
1767                    .map(|l| l.table_infos.len())
1768                    .unwrap_or(0),
1769                compaction_config
1770                    .emergency_level0_sub_level_partition
1771                    .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1772            ));
1773        }
1774
1775        GroupState::Normal
1776    }
1777
1778    pub fn group_state(levels: &Levels, compaction_config: &CompactionConfig) -> GroupState {
1779        let state = Self::check_single_group_write_stop(levels, compaction_config);
1780        if state.is_write_stop() {
1781            return state;
1782        }
1783
1784        Self::check_single_group_emergency(levels, compaction_config)
1785    }
1786}
1787
1788#[cfg(test)]
1789mod prefetched_task_id_tests {
1790    use crate::manager::MetaOpts;
1791
1792    #[test]
1793    fn test_compaction_task_id_refill_capacity_default() {
1794        assert_eq!(MetaOpts::test(false).compaction_task_id_refill_capacity, 64);
1795    }
1796}
1797
1798#[cfg(test)]
1799mod compaction_state_tests {
1800    use risingwave_pb::hummock::compact_task::TaskType;
1801
1802    use super::*;
1803
1804    #[test]
1805    fn test_basic_schedule_and_unschedule() {
1806        let state = CompactionState::new();
1807        let group_id: CompactionGroupId = 1.into();
1808
1809        // First schedule should succeed
1810        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1811        // Duplicate schedule should fail
1812        assert!(!state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1813        // Different task type should succeed
1814        assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1815
1816        // Snapshot should contain both
1817        let snapshot = state.snapshot();
1818        assert!(snapshot.scheduled.contains(&(group_id, TaskType::Dynamic)));
1819        assert!(snapshot.scheduled.contains(&(group_id, TaskType::Ttl)));
1820
1821        // Unschedule removes from scheduled set
1822        state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1823        let snapshot2 = state.snapshot();
1824        assert!(!snapshot2.scheduled.contains(&(group_id, TaskType::Dynamic)));
1825        assert!(snapshot2.scheduled.contains(&(group_id, TaskType::Ttl)));
1826    }
1827
1828    #[test]
1829    fn test_cooldown_blocks_periodic_trigger() {
1830        let state = CompactionState::new();
1831        let group_id: CompactionGroupId = 1.into();
1832
1833        // Schedule then unschedule - should add to cooldown
1834        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1835        let snapshot = state.snapshot();
1836        state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1837
1838        // Verify in cooldown
1839        assert!(state.inner.lock().dynamic_cooldown.contains(&group_id));
1840
1841        // Periodic trigger should be blocked
1842        assert!(!state.try_sched_compaction(
1843            group_id,
1844            TaskType::Dynamic,
1845            ScheduleTrigger::Periodic
1846        ));
1847    }
1848
1849    #[test]
1850    fn test_new_data_clears_cooldown() {
1851        let state = CompactionState::new();
1852        let group_id: CompactionGroupId = 1.into();
1853
1854        // Put group in cooldown
1855        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1856        let snapshot = state.snapshot();
1857        state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1858        assert!(state.inner.lock().dynamic_cooldown.contains(&group_id));
1859
1860        // NewData trigger should clear cooldown and schedule
1861        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1862        assert!(!state.inner.lock().dynamic_cooldown.contains(&group_id));
1863    }
1864
1865    #[test]
1866    fn test_cooldown_only_affects_dynamic_type() {
1867        let state = CompactionState::new();
1868        let group_id: CompactionGroupId = 1.into();
1869
1870        // Put group in cooldown for Dynamic
1871        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1872        let snapshot = state.snapshot();
1873        state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1874
1875        // Ttl unschedule should NOT add to cooldown
1876        let group_id_2: CompactionGroupId = 2.into();
1877        assert!(state.try_sched_compaction(group_id_2, TaskType::Ttl, ScheduleTrigger::Periodic));
1878        let snapshot2 = state.snapshot();
1879        state.unschedule(group_id_2, TaskType::Ttl, snapshot2.snapshot_time());
1880        assert!(!state.inner.lock().dynamic_cooldown.contains(&group_id_2));
1881
1882        // Other task types should work regardless of cooldown
1883        assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1884        assert!(state.try_sched_compaction(
1885            group_id,
1886            TaskType::SpaceReclaim,
1887            ScheduleTrigger::Periodic
1888        ));
1889    }
1890
1891    #[test]
1892    fn test_race_condition_new_data_after_snapshot() {
1893        let state = CompactionState::new();
1894        let group_id: CompactionGroupId = 1.into();
1895
1896        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1897        let snapshot = state.snapshot();
1898
1899        // Simulate new data arriving AFTER snapshot
1900        {
1901            let mut guard = state.inner.lock();
1902            guard.last_new_data_time.insert(group_id, Instant::now());
1903        }
1904
1905        // Unschedule should NOT add to cooldown (new data arrived after snapshot)
1906        state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1907        assert!(
1908            !state.inner.lock().dynamic_cooldown.contains(&group_id),
1909            "Should skip cooldown when new data arrived after snapshot"
1910        );
1911    }
1912
1913    #[test]
1914    fn test_remove_compaction_group_cleans_all_state() {
1915        let state = CompactionState::new();
1916        let group_id: CompactionGroupId = 1.into();
1917
1918        // Set up state
1919        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1920        assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1921        state.inner.lock().dynamic_cooldown.insert(group_id);
1922
1923        // Remove group
1924        state.remove_compaction_group(group_id);
1925
1926        // Verify all state cleaned up
1927        let guard = state.inner.lock();
1928        assert!(!guard.scheduled.contains(&(group_id, TaskType::Dynamic)));
1929        assert!(!guard.scheduled.contains(&(group_id, TaskType::Ttl)));
1930        assert!(!guard.dynamic_cooldown.contains(&group_id));
1931        assert!(!guard.last_new_data_time.contains_key(&group_id));
1932    }
1933
1934    #[test]
1935    fn test_snapshot_pick_type_priority() {
1936        let state = CompactionState::new();
1937        let group_id: CompactionGroupId = 1.into();
1938
1939        // Empty group returns None
1940        assert_eq!(state.snapshot().pick_type(group_id), None);
1941
1942        // Priority order: Dynamic > SpaceReclaim > Ttl > Tombstone > VnodeWatermark
1943        state.try_sched_compaction(
1944            group_id,
1945            TaskType::VnodeWatermark,
1946            ScheduleTrigger::Periodic,
1947        );
1948        assert_eq!(
1949            state.snapshot().pick_type(group_id),
1950            Some(TaskType::VnodeWatermark)
1951        );
1952
1953        state.try_sched_compaction(group_id, TaskType::Tombstone, ScheduleTrigger::Periodic);
1954        assert_eq!(
1955            state.snapshot().pick_type(group_id),
1956            Some(TaskType::Tombstone)
1957        );
1958
1959        state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic);
1960        assert_eq!(state.snapshot().pick_type(group_id), Some(TaskType::Ttl));
1961
1962        state.try_sched_compaction(group_id, TaskType::SpaceReclaim, ScheduleTrigger::Periodic);
1963        assert_eq!(
1964            state.snapshot().pick_type(group_id),
1965            Some(TaskType::SpaceReclaim)
1966        );
1967
1968        state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData);
1969        assert_eq!(
1970            state.snapshot().pick_type(group_id),
1971            Some(TaskType::Dynamic)
1972        );
1973    }
1974
1975    #[test]
1976    fn test_multiple_groups_independent_cooldown() {
1977        let state = CompactionState::new();
1978        let g1: CompactionGroupId = 1.into();
1979        let g2: CompactionGroupId = 2.into();
1980
1981        state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
1982        state.try_sched_compaction(g2, TaskType::Dynamic, ScheduleTrigger::NewData);
1983        let snapshot = state.snapshot();
1984
1985        // Only unschedule g1
1986        state.unschedule(g1, TaskType::Dynamic, snapshot.snapshot_time());
1987
1988        let guard = state.inner.lock();
1989        assert!(guard.dynamic_cooldown.contains(&g1));
1990        assert!(!guard.dynamic_cooldown.contains(&g2));
1991    }
1992
1993    #[test]
1994    fn test_pick_compaction_groups_empty() {
1995        let state = CompactionState::new();
1996        let snapshot = state.snapshot();
1997        // No scheduled groups → returns None
1998        assert!(snapshot.pick_compaction_groups_and_type().is_none());
1999    }
2000
2001    #[test]
2002    fn test_pick_compaction_groups_mixed_types() {
2003        let state = CompactionState::new();
2004        let g1: CompactionGroupId = 1.into();
2005        let g2: CompactionGroupId = 2.into();
2006        let g3: CompactionGroupId = 3.into();
2007
2008        // g1: Dynamic, g2: Ttl, g3: Dynamic
2009        state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
2010        state.try_sched_compaction(g2, TaskType::Ttl, ScheduleTrigger::Periodic);
2011        state.try_sched_compaction(g3, TaskType::Dynamic, ScheduleTrigger::NewData);
2012
2013        let snapshot = state.snapshot();
2014        let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
2015
2016        // Due to shuffle, either:
2017        // - Ttl group is encountered first → returns (vec![g2], Ttl)
2018        // - Dynamic group is encountered first → collects all Dynamic, skips Ttl
2019        //   → returns ([g1, g3] in some order, Dynamic)
2020        if task_type == TaskType::Dynamic {
2021            assert!(groups.contains(&g1));
2022            assert!(groups.contains(&g3));
2023            assert!(!groups.contains(&g2)); // Ttl group excluded from Dynamic result
2024        } else {
2025            assert_eq!(task_type, TaskType::Ttl);
2026            assert_eq!(groups, vec![g2]);
2027        }
2028    }
2029
2030    #[test]
2031    fn test_pick_compaction_groups_all_dynamic() {
2032        let state = CompactionState::new();
2033        let g1: CompactionGroupId = 1.into();
2034        let g2: CompactionGroupId = 2.into();
2035
2036        state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
2037        state.try_sched_compaction(g2, TaskType::Dynamic, ScheduleTrigger::NewData);
2038
2039        let snapshot = state.snapshot();
2040        let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
2041        assert_eq!(task_type, TaskType::Dynamic);
2042        assert!(groups.contains(&g1));
2043        assert!(groups.contains(&g2));
2044    }
2045
2046    #[test]
2047    fn test_pick_compaction_groups_single_non_dynamic() {
2048        let state = CompactionState::new();
2049        let g1: CompactionGroupId = 1.into();
2050
2051        state.try_sched_compaction(g1, TaskType::SpaceReclaim, ScheduleTrigger::Periodic);
2052
2053        let snapshot = state.snapshot();
2054        let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
2055        assert_eq!(task_type, TaskType::SpaceReclaim);
2056        assert_eq!(groups, vec![g1]);
2057    }
2058}