Skip to main content

risingwave_meta/hummock/manager/compaction/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17use std::time::Instant;
18
19use anyhow::Context;
20use compaction_event_loop::{
21    HummockCompactionEventDispatcher, HummockCompactionEventHandler, HummockCompactionEventLoop,
22    HummockCompactorDedicatedEventLoop,
23};
24use fail::fail_point;
25use itertools::Itertools;
26use parking_lot::Mutex;
27use rand::rng as thread_rng;
28use rand::seq::SliceRandom;
29use risingwave_common::catalog::TableId;
30use risingwave_common::config::meta::default::compaction_config;
31use risingwave_common::util::epoch::Epoch;
32use risingwave_hummock_sdk::compact_task::{CompactTask, CompactTaskAssignment, ReportTask};
33use risingwave_hummock_sdk::compaction_group::StateTableId;
34use risingwave_hummock_sdk::compaction_group::hummock_version_ext::safe_epoch_table_watermarks_impl;
35use risingwave_hummock_sdk::level::Levels;
36use risingwave_hummock_sdk::sstable_info::SstableInfo;
37use risingwave_hummock_sdk::table_stats::{
38    PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats,
39};
40use risingwave_hummock_sdk::table_watermark::TableWatermarks;
41use risingwave_hummock_sdk::version::{GroupDelta, IntraLevelDelta};
42use risingwave_hummock_sdk::{
43    CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockSstableId,
44    HummockSstableObjectId, HummockVersionId, compact_task_to_string, statistics_compact_task,
45};
46use risingwave_meta_model::hummock_sequence::COMPACTION_TASK_ID;
47use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
48use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
49use risingwave_pb::hummock::{
50    CompactTaskAssignment as PbCompactTaskAssignment, CompactionConfig, PbCompactStatus,
51    SubscribeCompactionEventRequest, TableOption, compact_task,
52};
53use thiserror_ext::AsReport;
54use tokio::sync::RwLockWriteGuard;
55use tokio::sync::mpsc::UnboundedReceiver;
56use tokio::sync::oneshot::{Receiver, Sender};
57use tokio::task::JoinHandle;
58use tonic::Streaming;
59use tracing::warn;
60
61use crate::hummock::compaction::in_progress_compaction::InProgressCompactionView;
62use crate::hummock::compaction::selector::level_selector::PickerInfo;
63use crate::hummock::compaction::selector::{
64    DynamicLevelSelector, DynamicLevelSelectorCore, LocalSelectorStatistic, ManualCompactionOption,
65    ManualCompactionSelector, SpaceReclaimCompactionSelector, TombstoneCompactionSelector,
66    TtlCompactionSelector, VnodeWatermarkCompactionSelector,
67};
68use crate::hummock::compaction::{
69    CompactStatus, CompactionDeveloperConfig, CompactionSelector,
70    CompactionTask as PickedCompactionTask,
71};
72use crate::hummock::error::{Error, Result};
73use crate::hummock::manager::CompactionTaskReportResult;
74use crate::hummock::manager::compaction::compact_task_builder::{
75    CompactTaskBuildContext, attach_compact_task_table_metadata, build_base_compact_task,
76};
77use crate::hummock::manager::transaction::{
78    HummockVersionStatsTransaction, HummockVersionTransaction,
79};
80use crate::hummock::manager::versioning::Versioning;
81use crate::hummock::metrics_utils::{
82    build_compact_task_level_type_metrics_label, trigger_compact_tasks_stat,
83    trigger_local_table_stat,
84};
85use crate::hummock::model::CompactionGroup;
86use crate::hummock::{HummockManager, commit_multi_var, start_measure_real_process_timer};
87use crate::manager::META_NODE_ID;
88use crate::model::BTreeMapTransaction;
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
91pub enum ManualCompactionTriggerResult {
92    Submitted,
93    Retry,
94}
95
96mod compact_task_builder;
97pub mod compaction_event_loop;
98pub mod compaction_group_manager;
99pub mod compaction_group_schedule;
100
101static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
102    [
103        TaskStatus::ManualCanceled,
104        TaskStatus::SendFailCanceled,
105        TaskStatus::AssignFailCanceled,
106        TaskStatus::HeartbeatCanceled,
107        TaskStatus::InvalidGroupCanceled,
108        TaskStatus::NoAvailMemoryResourceCanceled,
109        TaskStatus::NoAvailCpuResourceCanceled,
110        TaskStatus::HeartbeatProgressCanceled,
111    ]
112    .into_iter()
113    .collect()
114});
115
116fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> {
117    let mut compaction_selectors: HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> =
118        HashMap::default();
119    compaction_selectors.insert(
120        compact_task::TaskType::Dynamic,
121        Box::<DynamicLevelSelector>::default(),
122    );
123    compaction_selectors.insert(
124        compact_task::TaskType::SpaceReclaim,
125        Box::<SpaceReclaimCompactionSelector>::default(),
126    );
127    compaction_selectors.insert(
128        compact_task::TaskType::Ttl,
129        Box::<TtlCompactionSelector>::default(),
130    );
131    compaction_selectors.insert(
132        compact_task::TaskType::Tombstone,
133        Box::<TombstoneCompactionSelector>::default(),
134    );
135    compaction_selectors.insert(
136        compact_task::TaskType::VnodeWatermark,
137        Box::<VnodeWatermarkCompactionSelector>::default(),
138    );
139    compaction_selectors
140}
141
142enum BuiltCompactTask {
143    MetaFinished(CompactTask),
144    PendingAssignment(CompactTask),
145}
146
147impl HummockVersionTransaction<'_> {
148    fn apply_compact_task(&mut self, compact_task: &CompactTask) {
149        let mut version_delta = self.new_delta();
150        let trivial_move = compact_task.is_trivial_move_task();
151        version_delta.trivial_move = trivial_move;
152
153        let group_deltas = &mut version_delta
154            .group_deltas
155            .entry(compact_task.compaction_group_id)
156            .or_default()
157            .group_deltas;
158        let mut removed_table_ids_map: BTreeMap<u32, HashSet<HummockSstableId>> =
159            BTreeMap::default();
160
161        for level in &compact_task.input_ssts {
162            let level_idx = level.level_idx;
163
164            removed_table_ids_map
165                .entry(level_idx)
166                .or_default()
167                .extend(level.table_infos.iter().map(|sst| sst.sst_id));
168        }
169
170        for (level_idx, removed_table_ids) in removed_table_ids_map {
171            let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
172                level_idx,
173                0, // default
174                removed_table_ids,
175                vec![], // default
176                0,      // default
177                compact_task.compaction_group_version_id,
178            ));
179
180            group_deltas.push(group_delta);
181        }
182
183        let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
184            compact_task.target_level,
185            compact_task.target_sub_level_id,
186            HashSet::new(), // default
187            compact_task.sorted_output_ssts.clone(),
188            compact_task.split_weight_by_vnode,
189            compact_task.compaction_group_version_id,
190        ));
191
192        group_deltas.push(group_delta);
193        version_delta.pre_apply();
194    }
195}
196
197#[derive(Default)]
198pub struct Compaction {
199    /// Compaction task that is already assigned to a compactor
200    pub compact_task_assignment: BTreeMap<HummockCompactionTaskId, CompactTaskAssignment>,
201    /// `CompactStatus` of each compaction group
202    pub compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus>,
203
204    pub _deterministic_mode: bool,
205}
206
207impl HummockManager {
208    pub async fn get_assigned_compact_task_num(&self) -> u64 {
209        self.compaction.read().await.compact_task_assignment.len() as u64
210    }
211
212    pub async fn list_compaction_status(
213        &self,
214    ) -> (Vec<PbCompactStatus>, Vec<PbCompactTaskAssignment>) {
215        let (compaction_statuses, compact_task_assignments) = {
216            let compaction = self.compaction.read().await;
217            (
218                compaction
219                    .compaction_statuses
220                    .values()
221                    .map_into()
222                    .collect_vec(),
223                compaction
224                    .compact_task_assignment
225                    .values()
226                    .cloned()
227                    .collect_vec(),
228            )
229        };
230
231        (
232            compaction_statuses,
233            compact_task_assignments
234                .into_iter()
235                .map(PbCompactTaskAssignment::from)
236                .collect(),
237        )
238    }
239
240    pub async fn get_compaction_scores(
241        &self,
242        compaction_group_id: CompactionGroupId,
243    ) -> Vec<PickerInfo> {
244        let (status, levels, group) = {
245            let compaction = self.compaction.read().await;
246            let versioning = self.versioning.read().await;
247            let config_manager = self.compaction_group_manager.read().await;
248            match (
249                compaction.compaction_statuses.get(&compaction_group_id),
250                versioning.current_version.levels.get(&compaction_group_id),
251                config_manager.try_get_compaction_group_config(compaction_group_id),
252            ) {
253                (Some(cs), Some(v), Some(cf)) => (cs.to_owned(), v.to_owned(), cf),
254                _ => {
255                    return vec![];
256                }
257            }
258        };
259        let dynamic_level_core = DynamicLevelSelectorCore::new(
260            group.compaction_config,
261            Arc::new(CompactionDeveloperConfig::default()),
262        );
263        let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers);
264        ctx.score_levels
265    }
266}
267
268impl HummockManager {
269    pub fn compaction_event_loop(
270        hummock_manager: Arc<Self>,
271        compactor_streams_change_rx: UnboundedReceiver<(
272            HummockContextId,
273            Streaming<SubscribeCompactionEventRequest>,
274        )>,
275    ) -> Vec<(JoinHandle<()>, Sender<()>)> {
276        let mut join_handle_vec = Vec::default();
277
278        let hummock_compaction_event_handler =
279            HummockCompactionEventHandler::new(hummock_manager.clone());
280
281        let dedicated_event_loop = HummockCompactorDedicatedEventLoop::new(
282            hummock_manager.clone(),
283            hummock_compaction_event_handler.clone(),
284        );
285
286        let (dedicated_event_loop_join_handle, event_tx, shutdown_tx) = dedicated_event_loop.run();
287        join_handle_vec.push((dedicated_event_loop_join_handle, shutdown_tx));
288
289        let hummock_compaction_event_dispatcher = HummockCompactionEventDispatcher::new(
290            hummock_manager.env.opts.clone(),
291            hummock_compaction_event_handler,
292            Some(event_tx),
293        );
294
295        let event_loop = HummockCompactionEventLoop::new(
296            hummock_compaction_event_dispatcher,
297            hummock_manager.metrics.clone(),
298            compactor_streams_change_rx,
299        );
300
301        let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
302        join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
303
304        join_handle_vec
305    }
306
307    pub fn add_compactor_stream(
308        &self,
309        context_id: HummockContextId,
310        req_stream: Streaming<SubscribeCompactionEventRequest>,
311    ) {
312        self.compactor_streams_change_tx
313            .send((context_id, req_stream))
314            .unwrap();
315    }
316}
317
318impl HummockManager {
319    /// Gets one compaction task id with best-effort batching while ensuring concurrent callers
320    /// share the same refill instead of wasting an allocated range.
321    async fn next_compaction_task_id_with_prefetch(&self, refill_capacity: u32) -> Result<u64> {
322        self.prefetched_compaction_task_ids
323            .next(refill_capacity, |count| async move {
324                self.env
325                    .hummock_seq
326                    .next_interval(COMPACTION_TASK_ID, count)
327                    .await
328            })
329            .await
330    }
331
332    pub async fn get_compact_tasks_impl(
333        &self,
334        compaction_groups: Vec<CompactionGroupId>,
335        max_select_count: usize,
336        selector: &mut dyn CompactionSelector,
337    ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
338        let deterministic_mode = self.env.opts.compaction_deterministic_test;
339
340        let mut compaction_guard = self.compaction.write().await;
341        let compaction: &mut Compaction = &mut compaction_guard;
342        let mut versioning_guard = self.versioning.write().await;
343        let versioning: &mut Versioning = &mut versioning_guard;
344
345        let _timer = start_measure_real_process_timer!(self, "get_compact_tasks_impl");
346
347        let start_time = Instant::now();
348        let mut compaction_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
349
350        let mut compact_task_assignment =
351            BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
352
353        let mut version = HummockVersionTransaction::new(
354            &mut versioning.current_version,
355            &mut versioning.hummock_version_deltas,
356            &mut versioning.table_change_log,
357            self.env.notification_manager(),
358            None,
359            &self.metrics,
360            &self.env.opts,
361        );
362        // Apply stats changes.
363        let mut version_stats = HummockVersionStatsTransaction::new(
364            &mut versioning.version_stats,
365            self.env.notification_manager(),
366        );
367
368        if deterministic_mode {
369            version.disable_apply_to_txn();
370        }
371        let all_versioned_table_schemas = if self.env.opts.enable_dropped_column_reclaim {
372            self.metadata_manager
373                .catalog_controller
374                .get_versioned_table_schemas()
375                .await
376                .map_err(|e| Error::Internal(e.into()))?
377        } else {
378            HashMap::default()
379        };
380        let mut unschedule_groups = vec![];
381        let mut trivial_tasks = vec![];
382        let mut pick_tasks = vec![];
383        let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts(
384            &self.env.opts,
385        ));
386        // Reuse prefetched task ids from previous loops.
387        // Each group consumes at most one task_id (trivial tasks share the same id with normal
388        // task). When prefetched ids are exhausted, refill in fixed-size chunks to avoid
389        // per-group SQL transactions while keeping the in-memory cache small.
390        'outside: for compaction_group_id in compaction_groups {
391            if pick_tasks.len() >= max_select_count {
392                break;
393            }
394
395            if !version
396                .latest_version()
397                .levels
398                .contains_key(&compaction_group_id)
399            {
400                continue;
401            }
402
403            // When the last table of a compaction group is deleted, the compaction group (and its
404            // config) is destroyed as well. Then a compaction task for this group may come later and
405            // cannot find its config.
406            let group_config = {
407                let config_manager = self.compaction_group_manager.read().await;
408
409                match config_manager.try_get_compaction_group_config(compaction_group_id) {
410                    Some(config) => config,
411                    None => continue,
412                }
413            };
414
415            // Use prefetched task id if available; when exhausted, refill in chunks first.
416            let task_id = self
417                .next_compaction_task_id_with_prefetch(
418                    self.env.opts.compaction_task_id_refill_capacity,
419                )
420                .await?;
421
422            if !compaction_statuses.contains_key(&compaction_group_id) {
423                // lazy initialize.
424                compaction_statuses.insert(
425                    compaction_group_id,
426                    CompactStatus::new(
427                        compaction_group_id,
428                        group_config.compaction_config.max_level,
429                    ),
430                );
431            }
432            let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap();
433
434            let mut stats = LocalSelectorStatistic::default();
435            let member_table_ids: Vec<_> = version
436                .latest_version()
437                .state_table_info
438                .compaction_group_member_table_ids(compaction_group_id)
439                .iter()
440                .copied()
441                .collect();
442
443            let mut table_id_to_option: HashMap<TableId, _> = HashMap::default();
444
445            {
446                let guard = self.table_id_to_table_option.read();
447                for table_id in &member_table_ids {
448                    if let Some(opts) = guard.get(table_id) {
449                        table_id_to_option.insert(*table_id, *opts);
450                    }
451                }
452            }
453
454            let in_progress_compactions = InProgressCompactionView::for_group(
455                compact_task_assignment.tree_ref().values(),
456                compaction_group_id,
457            );
458
459            while let Some(picked_task) = compact_status.get_compact_task(
460                version
461                    .latest_version()
462                    .get_compaction_group_levels(compaction_group_id),
463                version
464                    .latest_version()
465                    .state_table_info
466                    .compaction_group_member_table_ids(compaction_group_id),
467                task_id as HummockCompactionTaskId,
468                &group_config,
469                &mut stats,
470                selector,
471                &table_id_to_option,
472                developer_config.clone(),
473                &version.latest_version().table_watermarks,
474                &version.latest_version().state_table_info,
475                &in_progress_compactions,
476            ) {
477                let compaction_group_levels = version
478                    .latest_version()
479                    .get_compaction_group_levels(compaction_group_id);
480                let target_level_id = picked_task.input.target_level as u32;
481                let is_target_level_last = compaction_group_levels.is_last_level(target_level_id);
482                let table_options = table_id_to_option
483                    .iter()
484                    .map(|(table_id, table_option)| (*table_id, TableOption::from(table_option)))
485                    .collect();
486                let built_compact_task = self.build_ready_compact_task(
487                    picked_task,
488                    CompactTaskBuildContext {
489                        task_id,
490                        compaction_group_id: group_config.group_id,
491                        compaction_group_version_id: compaction_group_levels
492                            .compaction_group_version_id,
493                        existing_table_ids: member_table_ids.clone(),
494                        table_options,
495                        is_target_level_last,
496                        compaction_config: group_config.compaction_config.clone(),
497                        current_epoch_time: Epoch::now().0,
498                    },
499                    &version.latest_version().table_watermarks,
500                    &all_versioned_table_schemas,
501                );
502
503                match built_compact_task {
504                    BuiltCompactTask::MetaFinished(compact_task) => {
505                        let label = compact_task.task_label();
506                        tracing::debug!(
507                            "{} for compaction group {}: input: {:?}, cost time: {:?}",
508                            label,
509                            compact_task.compaction_group_id,
510                            compact_task.input_ssts,
511                            start_time.elapsed()
512                        );
513                        compact_status.report_compact_task(&compact_task);
514                        update_table_stats_for_vnode_watermark_trivial_reclaim(
515                            &mut version_stats.table_stats,
516                            &compact_task,
517                        );
518                        self.metrics
519                            .compact_frequency
520                            .with_label_values(&[
521                                label,
522                                &compact_task.compaction_group_id.to_string(),
523                                selector.task_type().as_str_name(),
524                                "SUCCESS",
525                            ])
526                            .inc();
527
528                        version.apply_compact_task(&compact_task);
529                        trivial_tasks.push(compact_task);
530                        if trivial_tasks.len() >= self.env.opts.max_trivial_move_task_count_per_loop
531                        {
532                            break 'outside;
533                        }
534                    }
535                    BuiltCompactTask::PendingAssignment(compact_task) => {
536                        compact_task_assignment.insert(
537                            compact_task.task_id,
538                            CompactTaskAssignment {
539                                compact_task: compact_task.clone(),
540                                context_id: META_NODE_ID, // deprecated
541                            },
542                        );
543
544                        pick_tasks.push(compact_task);
545                        break;
546                    }
547                }
548
549                stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
550                stats = LocalSelectorStatistic::default();
551            }
552            if pick_tasks
553                .last()
554                .map(|task| task.compaction_group_id != compaction_group_id)
555                .unwrap_or(true)
556            {
557                unschedule_groups.push(compaction_group_id);
558            }
559            stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
560        }
561
562        if !trivial_tasks.is_empty() {
563            commit_multi_var!(
564                self.meta_store_ref(),
565                compaction_statuses,
566                compact_task_assignment,
567                version,
568                version_stats
569            )?;
570            self.metrics
571                .compact_task_batch_count
572                .with_label_values(&["batch_trivial_move"])
573                .observe(trivial_tasks.len() as f64);
574
575            for trivial_task in &trivial_tasks {
576                self.metrics
577                    .compact_task_trivial_move_sst_count
578                    .with_label_values(&[&trivial_task.compaction_group_id.to_string()])
579                    .observe(trivial_task.input_ssts[0].table_infos.len() as _);
580            }
581
582            drop(versioning_guard);
583        } else {
584            // We are using a single transaction to ensure that each task has progress when it is
585            // created.
586            drop(versioning_guard);
587            commit_multi_var!(
588                self.meta_store_ref(),
589                compaction_statuses,
590                compact_task_assignment
591            )?;
592        }
593        drop(compaction_guard);
594        if !pick_tasks.is_empty() {
595            self.metrics
596                .compact_task_batch_count
597                .with_label_values(&["batch_get_compact_task"])
598                .observe(pick_tasks.len() as f64);
599        }
600
601        for compact_task in &mut pick_tasks {
602            let compaction_group_id = compact_task.compaction_group_id;
603
604            // Initiate heartbeat for the task to track its progress.
605            self.compactor_manager
606                .initiate_task_heartbeat(compact_task.clone());
607
608            // this task has been finished.
609            compact_task.task_status = TaskStatus::Pending;
610            let compact_task_statistics = statistics_compact_task(compact_task);
611
612            let level_type_label = build_compact_task_level_type_metrics_label(
613                compact_task.input_ssts[0].level_idx as usize,
614                compact_task.input_ssts.last().unwrap().level_idx as usize,
615            );
616
617            let level_count = compact_task.input_ssts.len();
618            if compact_task.input_ssts[0].level_idx == 0 {
619                self.metrics
620                    .l0_compact_level_count
621                    .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
622                    .observe(level_count as _);
623            }
624
625            self.metrics
626                .compact_task_size
627                .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
628                .observe(compact_task_statistics.total_file_size as _);
629
630            self.metrics
631                .compact_task_size
632                .with_label_values(&[
633                    &compaction_group_id.to_string(),
634                    &format!("{} uncompressed", level_type_label),
635                ])
636                .observe(compact_task_statistics.total_uncompressed_file_size as _);
637
638            self.metrics
639                .compact_task_file_count
640                .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
641                .observe(compact_task_statistics.total_file_count as _);
642
643            tracing::trace!(
644                "For compaction group {}: pick up {} {} sub_level in level {} to compact to target {}. cost time: {:?} compact_task_statistics {:?}",
645                compaction_group_id,
646                level_count,
647                compact_task.input_ssts[0].level_type.as_str_name(),
648                compact_task.input_ssts[0].level_idx,
649                compact_task.target_level,
650                start_time.elapsed(),
651                compact_task_statistics
652            );
653        }
654
655        #[cfg(test)]
656        {
657            self.check_state_consistency().await;
658        }
659        pick_tasks.extend(trivial_tasks);
660        Ok((pick_tasks, unschedule_groups))
661    }
662
663    /// Cancels a compaction task no matter it's assigned or unassigned.
664    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
665        fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore(
666            anyhow::anyhow!("failpoint metastore err")
667        )));
668        let ret = self
669            .cancel_compact_task_impl(vec![task_id], task_status)
670            .await?;
671        Ok(ret[0])
672    }
673
674    pub async fn cancel_compact_tasks(
675        &self,
676        tasks: Vec<u64>,
677        task_status: TaskStatus,
678    ) -> Result<Vec<bool>> {
679        self.cancel_compact_task_impl(tasks, task_status).await
680    }
681
682    async fn cancel_compact_task_impl(
683        &self,
684        task_ids: Vec<u64>,
685        task_status: TaskStatus,
686    ) -> Result<Vec<bool>> {
687        assert!(CANCEL_STATUS_SET.contains(&task_status));
688        let tasks = task_ids
689            .into_iter()
690            .map(|task_id| ReportTask {
691                task_id,
692                task_status,
693                sorted_output_ssts: vec![],
694                table_stats_change: HashMap::default(),
695                object_timestamps: HashMap::default(),
696            })
697            .collect_vec();
698        let rets = self.report_compact_tasks(tasks).await?;
699        #[cfg(test)]
700        {
701            self.check_state_consistency().await;
702        }
703        Ok(rets)
704    }
705
706    async fn get_compact_tasks(
707        &self,
708        mut compaction_groups: Vec<CompactionGroupId>,
709        max_select_count: usize,
710        selector: &mut dyn CompactionSelector,
711    ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
712        fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
713            anyhow::anyhow!("failpoint metastore error")
714        )));
715        compaction_groups.shuffle(&mut thread_rng());
716        let (mut tasks, groups) = self
717            .get_compact_tasks_impl(compaction_groups, max_select_count, selector)
718            .await?;
719        tasks.retain(|task| {
720            if task.task_status == TaskStatus::Success {
721                debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
722                false
723            } else {
724                true
725            }
726        });
727        Ok((tasks, groups))
728    }
729
730    pub async fn get_compact_task(
731        &self,
732        compaction_group_id: CompactionGroupId,
733        selector: &mut dyn CompactionSelector,
734    ) -> Result<Option<CompactTask>> {
735        fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
736            anyhow::anyhow!("failpoint metastore error")
737        )));
738
739        let (normal_tasks, _) = self
740            .get_compact_tasks_impl(vec![compaction_group_id], 1, selector)
741            .await?;
742        for task in normal_tasks {
743            if task.task_status != TaskStatus::Success {
744                return Ok(Some(task));
745            }
746            debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
747        }
748        Ok(None)
749    }
750
751    pub async fn manual_get_compact_task(
752        &self,
753        compaction_group_id: CompactionGroupId,
754        manual_compaction_option: ManualCompactionOption,
755    ) -> Result<Option<CompactTask>> {
756        let (task, _) = self
757            .manual_get_compact_task_with_info(compaction_group_id, manual_compaction_option)
758            .await?;
759        Ok(task)
760    }
761
762    pub async fn manual_get_compact_task_with_info(
763        &self,
764        compaction_group_id: CompactionGroupId,
765        manual_compaction_option: ManualCompactionOption,
766    ) -> Result<(Option<CompactTask>, bool)> {
767        let mut selector = ManualCompactionSelector::new(manual_compaction_option);
768        let task = self
769            .get_compact_task(compaction_group_id, &mut selector)
770            .await?;
771        if let Some(err) = selector.validation_error() {
772            return Err(Error::InvalidManualCompactionOption(err.to_owned()));
773        }
774        Ok((task, selector.blocked_by_pending()))
775    }
776
777    pub async fn report_compact_task(
778        &self,
779        task_id: u64,
780        task_status: TaskStatus,
781        sorted_output_ssts: Vec<SstableInfo>,
782        table_stats_change: Option<PbTableStatsMap>,
783        object_timestamps: HashMap<HummockSstableObjectId, u64>,
784    ) -> Result<bool> {
785        let rets = self
786            .report_compact_tasks(vec![ReportTask {
787                task_id,
788                task_status,
789                sorted_output_ssts,
790                table_stats_change: table_stats_change.unwrap_or_default(),
791                object_timestamps,
792            }])
793            .await?;
794        Ok(rets[0])
795    }
796
797    pub async fn report_compact_tasks(&self, report_tasks: Vec<ReportTask>) -> Result<Vec<bool>> {
798        let compaction_guard = self.compaction.write().await;
799        let versioning_guard = self.versioning.write().await;
800
801        self.report_compact_tasks_impl(report_tasks, compaction_guard, versioning_guard)
802            .await
803    }
804
805    /// Finishes or cancels a compaction task, according to `task_status`.
806    ///
807    /// If `context_id` is not None, its validity will be checked when writing meta store.
808    /// Its ownership of the task is checked as well.
809    ///
810    /// Return Ok(false) indicates either the task is not found,
811    /// or the task is not owned by `context_id` when `context_id` is not None.
812    pub async fn report_compact_tasks_impl(
813        &self,
814        report_tasks: Vec<ReportTask>,
815        mut compaction_guard: RwLockWriteGuard<'_, Compaction>,
816        mut versioning_guard: RwLockWriteGuard<'_, Versioning>,
817    ) -> Result<Vec<bool>> {
818        let deterministic_mode = self.env.opts.compaction_deterministic_test;
819        let compaction: &mut Compaction = &mut compaction_guard;
820        let start_time = Instant::now();
821        let original_keys = compaction.compaction_statuses.keys().cloned().collect_vec();
822        let mut compact_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
823        let mut rets = vec![false; report_tasks.len()];
824        let mut compact_task_assignment =
825            BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
826        // The compaction task is finished.
827        let versioning: &mut Versioning = &mut versioning_guard;
828        let _timer = start_measure_real_process_timer!(self, "report_compact_tasks");
829
830        // purge stale compact_status
831        for group_id in original_keys {
832            if !versioning.current_version.levels.contains_key(&group_id) {
833                compact_statuses.remove(group_id);
834            }
835        }
836        let mut tasks = vec![];
837
838        let mut version = HummockVersionTransaction::new(
839            &mut versioning.current_version,
840            &mut versioning.hummock_version_deltas,
841            &mut versioning.table_change_log,
842            self.env.notification_manager(),
843            None,
844            &self.metrics,
845            &self.env.opts,
846        );
847
848        if deterministic_mode {
849            version.disable_apply_to_txn();
850        }
851
852        let mut version_stats = HummockVersionStatsTransaction::new(
853            &mut versioning.version_stats,
854            self.env.notification_manager(),
855        );
856        let mut success_count = 0;
857        let mut report_results = Vec::with_capacity(rets.len());
858        for (idx, task) in report_tasks.into_iter().enumerate() {
859            rets[idx] = true;
860            let task_id = task.task_id;
861            let mut task_status = task.task_status;
862            let mut compact_task = match compact_task_assignment.remove(task.task_id) {
863                Some(compact_task_assignment) => compact_task_assignment.compact_task,
864                None => {
865                    tracing::warn!("{}", format!("compact task {} not found", task.task_id));
866                    rets[idx] = false;
867                    report_results.push(CompactionTaskReportResult {
868                        task_id,
869                        task_status,
870                        reported: false,
871                    });
872                    continue;
873                }
874            };
875
876            {
877                // apply result
878                compact_task.task_status = task.task_status;
879                compact_task.sorted_output_ssts = task.sorted_output_ssts;
880            }
881
882            match compact_statuses.get_mut(compact_task.compaction_group_id) {
883                Some(mut compact_status) => {
884                    compact_status.report_compact_task(&compact_task);
885                }
886                None => {
887                    // When the group_id is not found in the compaction_statuses, it means the group has been removed.
888                    // The task is invalid and should be canceled.
889                    // e.g.
890                    // 1. The group is removed by the user unregistering the tables
891                    // 2. The group is removed by the group scheduling algorithm
892                    compact_task.task_status = TaskStatus::InvalidGroupCanceled;
893                }
894            }
895
896            let is_success = if let TaskStatus::Success = compact_task.task_status {
897                match self
898                    .report_compaction_sanity_check(&task.object_timestamps)
899                    .await
900                {
901                    Err(e) => {
902                        warn!(
903                            "failed to commit compaction task {} {}",
904                            compact_task.task_id,
905                            e.as_report()
906                        );
907                        compact_task.task_status = TaskStatus::RetentionTimeRejected;
908                        false
909                    }
910                    _ => {
911                        let group = version
912                            .latest_version()
913                            .levels
914                            .get(&compact_task.compaction_group_id)
915                            .unwrap();
916                        let is_expired = compact_task.is_expired(group.compaction_group_version_id);
917                        if is_expired {
918                            compact_task.task_status = TaskStatus::InputOutdatedCanceled;
919                            warn!(
920                                "The task may be expired because of group split, task:\n {:?}",
921                                compact_task_to_string(&compact_task)
922                            );
923                        }
924                        !is_expired
925                    }
926                }
927            } else {
928                false
929            };
930            if is_success {
931                success_count += 1;
932                version.apply_compact_task(&compact_task);
933                if purge_prost_table_stats(
934                    &mut version_stats.table_stats,
935                    version.latest_version(),
936                    &HashSet::default(),
937                ) {
938                    self.metrics.version_stats.reset();
939                    versioning.local_metrics.clear();
940                }
941                add_prost_table_stats_map(&mut version_stats.table_stats, &task.table_stats_change);
942                trigger_local_table_stat(
943                    &self.metrics,
944                    &mut versioning.local_metrics,
945                    &version_stats,
946                    &task.table_stats_change,
947                );
948            }
949            task_status = compact_task.task_status;
950            report_results.push(CompactionTaskReportResult {
951                task_id,
952                task_status,
953                reported: rets[idx],
954            });
955            tasks.push(compact_task);
956        }
957        if success_count > 0 {
958            commit_multi_var!(
959                self.meta_store_ref(),
960                compact_statuses,
961                compact_task_assignment,
962                version,
963                version_stats
964            )?;
965
966            self.metrics
967                .compact_task_batch_count
968                .with_label_values(&["batch_report_task"])
969                .observe(success_count as f64);
970        } else {
971            // The compaction task is cancelled or failed.
972            commit_multi_var!(
973                self.meta_store_ref(),
974                compact_statuses,
975                compact_task_assignment
976            )?;
977        }
978
979        self.notify_compaction_task_report_waiters(report_results);
980
981        let mut success_groups = vec![];
982        for compact_task in &tasks {
983            self.compactor_manager
984                .remove_task_heartbeat(compact_task.task_id);
985            tracing::trace!(
986                "Reported compaction task. {}. cost time: {:?}",
987                compact_task_to_string(compact_task),
988                start_time.elapsed(),
989            );
990
991            if !deterministic_mode
992                && (matches!(compact_task.task_type, compact_task::TaskType::Dynamic)
993                    || matches!(compact_task.task_type, compact_task::TaskType::Emergency))
994            {
995                // only try send Dynamic compaction
996                self.try_send_compaction_request(
997                    compact_task.compaction_group_id,
998                    compact_task::TaskType::Dynamic,
999                );
1000            }
1001
1002            if compact_task.task_status == TaskStatus::Success {
1003                success_groups.push(compact_task.compaction_group_id);
1004            }
1005        }
1006
1007        trigger_compact_tasks_stat(
1008            &self.metrics,
1009            &tasks,
1010            &compaction.compaction_statuses,
1011            &versioning_guard.current_version,
1012        );
1013        drop(versioning_guard);
1014        if !success_groups.is_empty() {
1015            self.try_update_write_limits(&success_groups).await;
1016        }
1017        Ok(rets)
1018    }
1019
1020    /// Triggers compacitons to specified compaction groups.
1021    /// Don't wait for compaction finish
1022    pub async fn trigger_compaction_deterministic(
1023        &self,
1024        _base_version_id: HummockVersionId,
1025        compaction_groups: Vec<CompactionGroupId>,
1026    ) -> Result<()> {
1027        self.on_current_version(|old_version| {
1028            tracing::info!(
1029                "Trigger compaction for version {}, groups {:?}",
1030                old_version.id,
1031                compaction_groups
1032            );
1033        })
1034        .await;
1035
1036        if compaction_groups.is_empty() {
1037            return Ok(());
1038        }
1039        for compaction_group in compaction_groups {
1040            self.try_send_compaction_request(compaction_group, compact_task::TaskType::Dynamic);
1041        }
1042        Ok(())
1043    }
1044
1045    pub async fn trigger_manual_compaction(
1046        &self,
1047        compaction_group: CompactionGroupId,
1048        manual_compaction_option: ManualCompactionOption,
1049    ) -> Result<ManualCompactionTriggerResult> {
1050        let start_time = Instant::now();
1051        let exclusive = manual_compaction_option.exclusive;
1052
1053        // 1. Get idle compactor.
1054        let compactor = match self.compactor_manager.next_compactor() {
1055            Some(compactor) => compactor,
1056            None => {
1057                tracing::warn!("trigger_manual_compaction No compactor is available.");
1058                return Err(anyhow::anyhow!(
1059                    "trigger_manual_compaction No compactor is available. compaction_group {}",
1060                    compaction_group
1061                )
1062                .into());
1063            }
1064        };
1065
1066        // 2. Get manual compaction task.
1067        let compact_task = self
1068            .manual_get_compact_task_with_info(compaction_group, manual_compaction_option)
1069            .await;
1070        let (compact_task, blocked_by_pending) = match compact_task {
1071            Ok((compact_task, blocked_by_pending)) => (compact_task, blocked_by_pending),
1072            Err(err) => {
1073                tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
1074                if matches!(err, Error::InvalidManualCompactionOption(_)) {
1075                    return Err(err);
1076                }
1077
1078                return Err(anyhow::anyhow!(err)
1079                    .context(format!(
1080                        "Failed to get compaction task for compaction_group {}",
1081                        compaction_group,
1082                    ))
1083                    .into());
1084            }
1085        };
1086        let compact_task = match compact_task {
1087            Some(compact_task) => compact_task,
1088            None => {
1089                if exclusive && blocked_by_pending {
1090                    return Ok(ManualCompactionTriggerResult::Retry);
1091                }
1092                // No compaction task available.
1093                return Err(anyhow::anyhow!(
1094                    "trigger_manual_compaction No compaction_task is available. compaction_group {}",
1095                    compaction_group
1096                )
1097                .into());
1098            }
1099        };
1100
1101        // 3. send task to compactor
1102        let task_id = compact_task.task_id;
1103        let compact_task_string = compact_task_to_string(&compact_task);
1104        tracing::info!(
1105            compact_task_string,
1106            duration = ?start_time.elapsed(),
1107            "Triggered manual compaction task."
1108        );
1109
1110        let report_rx = self.register_compaction_task_report_waiter(task_id);
1111        if let Err(err) = compactor
1112            .send_event(ResponseEvent::CompactTask(compact_task.into()))
1113            .with_context(|| {
1114                format!(
1115                    "Failed to trigger compaction task for compaction_group {}",
1116                    compaction_group,
1117                )
1118            })
1119        {
1120            self.remove_compaction_task_report_waiter(task_id);
1121            return Err(err.into());
1122        }
1123
1124        let report_result = match report_rx.await {
1125            Ok(result) => result,
1126            Err(_) => {
1127                self.remove_compaction_task_report_waiter(task_id);
1128                return Err(anyhow::anyhow!(
1129                    "trigger_manual_compaction wait report failed. compaction_group {}",
1130                    compaction_group
1131                )
1132                .into());
1133            }
1134        };
1135        if !report_result.reported {
1136            return Err(anyhow::anyhow!(
1137                "trigger_manual_compaction report not accepted. task_id {}",
1138                report_result.task_id
1139            )
1140            .into());
1141        }
1142
1143        if report_result.task_status == TaskStatus::NoAvailCpuResourceCanceled
1144            || report_result.task_status == TaskStatus::NoAvailMemoryResourceCanceled
1145        {
1146            return Ok(ManualCompactionTriggerResult::Retry);
1147        }
1148
1149        tracing::info!(
1150            ?report_result,
1151            duration = ?start_time.elapsed(),
1152            "Completed manual compaction task."
1153        );
1154
1155        Ok(ManualCompactionTriggerResult::Submitted)
1156    }
1157
1158    /// Sends a compaction request for new data (clears cooldown).
1159    pub fn try_send_compaction_request(
1160        &self,
1161        compaction_group: CompactionGroupId,
1162        task_type: compact_task::TaskType,
1163    ) -> bool {
1164        self.compaction_state.try_sched_compaction(
1165            compaction_group,
1166            task_type,
1167            ScheduleTrigger::NewData,
1168        )
1169    }
1170
1171    /// Apply `split_weight_by_vnode` based partition strategy.
1172    /// This handles dynamic partitioning based on table size and write throughput.
1173    fn apply_split_weight_by_vnode_partition(
1174        &self,
1175        compact_task: &mut CompactTask,
1176        compaction_config: &CompactionConfig,
1177        compact_table_ids: &[TableId],
1178    ) {
1179        if compaction_config.split_weight_by_vnode > 0 {
1180            for table_id in compact_table_ids {
1181                compact_task
1182                    .table_vnode_partition
1183                    .insert(*table_id, compact_task.split_weight_by_vnode);
1184            }
1185
1186            return;
1187        }
1188
1189        // Calculate per-table size from normalized input SSTs.
1190        let mut table_size_info: HashMap<TableId, u64> = HashMap::default();
1191        for input_ssts in &compact_task.input_ssts {
1192            for sst in &input_ssts.table_infos {
1193                for table_id in &sst.table_ids {
1194                    *table_size_info.entry(*table_id).or_default() +=
1195                        sst.sst_size / (sst.table_ids.len() as u64);
1196                }
1197            }
1198        }
1199
1200        let hybrid_vnode_count = self.env.opts.hybrid_partition_node_count;
1201        let default_partition_count = self.env.opts.partition_vnode_count;
1202        let compact_task_table_size_partition_threshold_low = self
1203            .env
1204            .opts
1205            .compact_task_table_size_partition_threshold_low;
1206        let compact_task_table_size_partition_threshold_high = self
1207            .env
1208            .opts
1209            .compact_task_table_size_partition_threshold_high;
1210
1211        // Check latest write throughput
1212        let table_write_throughput_statistic_manager =
1213            self.table_write_throughput_statistic_manager.read();
1214        let timestamp = chrono::Utc::now().timestamp();
1215
1216        for (table_id, compact_table_size) in table_size_info {
1217            let write_throughput = table_write_throughput_statistic_manager
1218                .get_table_throughput_descending(table_id, timestamp)
1219                .peekable()
1220                .peek()
1221                .map(|item| item.throughput)
1222                .unwrap_or(0);
1223
1224            if compact_table_size > compact_task_table_size_partition_threshold_high
1225                && default_partition_count > 0
1226            {
1227                compact_task
1228                    .table_vnode_partition
1229                    .insert(table_id, default_partition_count);
1230            } else if (compact_table_size > compact_task_table_size_partition_threshold_low
1231                || (write_throughput > self.env.opts.table_high_write_throughput_threshold
1232                    && compact_table_size > compaction_config.target_file_size_base))
1233                && hybrid_vnode_count > 0
1234            {
1235                compact_task
1236                    .table_vnode_partition
1237                    .insert(table_id, hybrid_vnode_count);
1238            } else if compact_table_size > compaction_config.target_file_size_base {
1239                compact_task.table_vnode_partition.insert(table_id, 1);
1240            }
1241        }
1242
1243        compact_task
1244            .table_vnode_partition
1245            .retain(|table_id, _| compact_table_ids.contains(table_id));
1246    }
1247
1248    pub(crate) fn calculate_vnode_partition(
1249        &self,
1250        compact_task: &mut CompactTask,
1251        compaction_config: &CompactionConfig,
1252        compact_table_ids: &[TableId],
1253    ) {
1254        // Do not split sst by vnode partition when target_level > base_level
1255        // The purpose of data alignment is mainly to improve the parallelism of base level compaction
1256        // and reduce write amplification. However, at high level, the size of the sst file is often
1257        // larger and only contains the data of a single table_id, so there is no need to cut it.
1258        if compact_task.target_level > compact_task.base_level {
1259            return;
1260        }
1261
1262        // Apply split_weight_by_vnode based partition strategy
1263        self.apply_split_weight_by_vnode_partition(
1264            compact_task,
1265            compaction_config,
1266            compact_table_ids,
1267        );
1268    }
1269
1270    fn build_ready_compact_task(
1271        &self,
1272        picked_task: PickedCompactionTask,
1273        context: CompactTaskBuildContext,
1274        table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
1275        all_versioned_table_schemas: &HashMap<TableId, Vec<i32>>,
1276    ) -> BuiltCompactTask {
1277        let compaction_config = context.compaction_config.clone();
1278        let (mut compact_task, compact_table_ids) = build_base_compact_task(picked_task, context);
1279
1280        if compact_task.is_trivial_reclaim() {
1281            compact_task.task_status = TaskStatus::Success;
1282            compact_task.sorted_output_ssts.clear();
1283            return BuiltCompactTask::MetaFinished(compact_task);
1284        }
1285
1286        if compact_task.is_trivial_move_task() {
1287            compact_task.task_status = TaskStatus::Success;
1288            compact_task.sorted_output_ssts = compact_task.input_ssts[0]
1289                .read_sstable_infos()
1290                .cloned()
1291                .collect();
1292            return BuiltCompactTask::MetaFinished(compact_task);
1293        }
1294
1295        self.prepare_compact_task_for_assignment(
1296            &mut compact_task,
1297            compaction_config.as_ref(),
1298            &compact_table_ids,
1299            safe_epoch_table_watermarks_impl(table_watermarks, &compact_table_ids),
1300            all_versioned_table_schemas,
1301        );
1302
1303        BuiltCompactTask::PendingAssignment(compact_task)
1304    }
1305
1306    fn prepare_compact_task_for_assignment(
1307        &self,
1308        compact_task: &mut CompactTask,
1309        compaction_config: &CompactionConfig,
1310        compact_table_ids: &[TableId],
1311        table_watermarks: BTreeMap<TableId, TableWatermarks>,
1312        all_versioned_table_schemas: &HashMap<TableId, Vec<i32>>,
1313    ) {
1314        self.calculate_vnode_partition(compact_task, compaction_config, compact_table_ids);
1315        attach_compact_task_table_metadata(
1316            compact_task,
1317            compact_table_ids,
1318            table_watermarks,
1319            all_versioned_table_schemas,
1320        );
1321    }
1322
1323    pub fn compactor_manager_ref(&self) -> crate::hummock::CompactorManagerRef {
1324        self.compactor_manager.clone()
1325    }
1326
1327    fn register_compaction_task_report_waiter(
1328        &self,
1329        task_id: HummockCompactionTaskId,
1330    ) -> Receiver<CompactionTaskReportResult> {
1331        let (tx, rx) = tokio::sync::oneshot::channel();
1332        self.compaction_task_report_notifiers
1333            .lock()
1334            .register(task_id, tx);
1335        rx
1336    }
1337
1338    fn remove_compaction_task_report_waiter(&self, task_id: HummockCompactionTaskId) {
1339        self.compaction_task_report_notifiers.lock().remove(task_id);
1340    }
1341
1342    fn notify_compaction_task_report_waiters(&self, results: Vec<CompactionTaskReportResult>) {
1343        let mut guard = self.compaction_task_report_notifiers.lock();
1344        for result in results {
1345            guard.notify(result);
1346        }
1347    }
1348}
1349
1350#[cfg(any(test, feature = "test"))]
1351impl HummockManager {
1352    pub async fn compaction_task_from_assignment_for_test(
1353        &self,
1354        task_id: u64,
1355    ) -> Option<CompactTaskAssignment> {
1356        let compaction_guard = self.compaction.read().await;
1357        let assignment_ref = &compaction_guard.compact_task_assignment;
1358        assignment_ref.get(&task_id).cloned()
1359    }
1360
1361    pub async fn report_compact_task_for_test(
1362        &self,
1363        task_id: u64,
1364        compact_task: Option<CompactTask>,
1365        task_status: TaskStatus,
1366        sorted_output_ssts: Vec<SstableInfo>,
1367        table_stats_change: Option<PbTableStatsMap>,
1368    ) -> Result<()> {
1369        if let Some(task) = compact_task {
1370            let mut guard = self.compaction.write().await;
1371            guard.compact_task_assignment.insert(
1372                task_id,
1373                CompactTaskAssignment {
1374                    compact_task: task,
1375                    context_id: 0.into(),
1376                },
1377            );
1378        }
1379
1380        // In the test, the contents of the compact task may have been modified directly, while the contents of compact_task_assignment were not modified.
1381        // So we pass the modified compact_task directly into the `report_compact_task_impl`
1382        self.report_compact_tasks(vec![ReportTask {
1383            task_id,
1384            task_status,
1385            sorted_output_ssts,
1386            table_stats_change: table_stats_change.unwrap_or_default(),
1387            object_timestamps: HashMap::default(),
1388        }])
1389        .await?;
1390        Ok(())
1391    }
1392}
1393
1394/// What triggered the compaction schedule request.
1395#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1396pub enum ScheduleTrigger {
1397    /// New data arrived (e.g., `commit_epoch`). Clears cooldown.
1398    NewData,
1399    /// Periodic timer. Respects cooldown for Dynamic type.
1400    Periodic,
1401}
1402
1403/// A point-in-time snapshot of the compaction schedule state.
1404///
1405/// `snapshot_time` is used by `unschedule()` to detect whether new data arrived
1406/// after the snapshot was taken, preventing incorrect cooldown.
1407pub struct CompactionScheduleSnapshot {
1408    scheduled: HashSet<(CompactionGroupId, compact_task::TaskType)>,
1409    snapshot_time: Instant,
1410}
1411
1412impl CompactionScheduleSnapshot {
1413    /// Task type priority order for scheduling (checked first = higher priority).
1414    const TASK_TYPE_PRIORITY: &[TaskType] = &[
1415        TaskType::Dynamic,
1416        TaskType::SpaceReclaim,
1417        TaskType::Ttl,
1418        TaskType::Tombstone,
1419        TaskType::VnodeWatermark,
1420    ];
1421
1422    pub fn snapshot_time(&self) -> Instant {
1423        self.snapshot_time
1424    }
1425
1426    /// Pick compaction groups and task type from this snapshot.
1427    ///
1428    /// Returns groups in shuffled order. Non-Dynamic types have higher priority
1429    /// and return a single group; Dynamic groups are batched together.
1430    pub fn pick_compaction_groups_and_type(&self) -> Option<(Vec<CompactionGroupId>, TaskType)> {
1431        let group_ids = self.group_ids_shuffled();
1432        let mut normal_groups = vec![];
1433        for cg_id in group_ids {
1434            if let Some(pick_type) = self.pick_type(cg_id) {
1435                if pick_type == TaskType::Dynamic {
1436                    normal_groups.push(cg_id);
1437                } else if normal_groups.is_empty() {
1438                    return Some((vec![cg_id], pick_type));
1439                }
1440            }
1441        }
1442        if normal_groups.is_empty() {
1443            None
1444        } else {
1445            Some((normal_groups, TaskType::Dynamic))
1446        }
1447    }
1448
1449    fn group_ids_shuffled(&self) -> Vec<CompactionGroupId> {
1450        let mut group_ids: Vec<_> = self.scheduled.iter().map(|(g, _)| *g).unique().collect();
1451        group_ids.shuffle(&mut thread_rng());
1452        group_ids
1453    }
1454
1455    fn pick_type(&self, group: CompactionGroupId) -> Option<TaskType> {
1456        Self::TASK_TYPE_PRIORITY
1457            .iter()
1458            .find(|t| self.scheduled.contains(&(group, **t)))
1459            .copied()
1460    }
1461}
1462
1463/// Tracks which (`compaction_group`, `task_type`) pairs are scheduled for compaction.
1464///
1465/// For `Dynamic` type, includes a cooldown mechanism: groups with no compaction work
1466/// are skipped by periodic triggers until new data arrives via `commit_epoch`.
1467#[derive(Debug, Default)]
1468pub struct CompactionState {
1469    inner: Mutex<CompactionStateInner>,
1470}
1471
1472#[derive(Debug, Default)]
1473struct CompactionStateInner {
1474    scheduled: HashSet<(CompactionGroupId, compact_task::TaskType)>,
1475    /// Groups skipped by periodic Dynamic trigger until new data arrives.
1476    dynamic_cooldown: HashSet<CompactionGroupId>,
1477    /// Tracks new-data arrival time per group for cooldown race detection.
1478    last_new_data_time: HashMap<CompactionGroupId, Instant>,
1479}
1480
1481impl CompactionState {
1482    pub fn new() -> Self {
1483        Self {
1484            inner: Default::default(),
1485        }
1486    }
1487
1488    /// Enqueues a compaction request. Returns `true` if newly scheduled.
1489    ///
1490    /// `trigger` only affects `Dynamic` type — see [`ScheduleTrigger`].
1491    pub fn try_sched_compaction(
1492        &self,
1493        compaction_group: CompactionGroupId,
1494        task_type: TaskType,
1495        trigger: ScheduleTrigger,
1496    ) -> bool {
1497        let mut guard = self.inner.lock();
1498        if task_type == TaskType::Dynamic {
1499            match trigger {
1500                ScheduleTrigger::NewData => {
1501                    guard.dynamic_cooldown.remove(&compaction_group);
1502                    guard
1503                        .last_new_data_time
1504                        .insert(compaction_group, Instant::now());
1505                }
1506                ScheduleTrigger::Periodic => {
1507                    if guard.dynamic_cooldown.contains(&compaction_group) {
1508                        return false;
1509                    }
1510                }
1511            }
1512        }
1513        guard.scheduled.insert((compaction_group, task_type))
1514    }
1515
1516    /// Removes a scheduled entry. For Dynamic type, adds to cooldown unless
1517    /// new data arrived after `snapshot_time`.
1518    pub fn unschedule(
1519        &self,
1520        compaction_group: CompactionGroupId,
1521        task_type: compact_task::TaskType,
1522        snapshot_time: Instant,
1523    ) {
1524        let mut guard = self.inner.lock();
1525        guard.scheduled.remove(&(compaction_group, task_type));
1526        if task_type == TaskType::Dynamic {
1527            let has_new_data = guard
1528                .last_new_data_time
1529                .get(&compaction_group)
1530                .is_some_and(|t| *t > snapshot_time);
1531            if !has_new_data {
1532                guard.dynamic_cooldown.insert(compaction_group);
1533            }
1534        }
1535    }
1536
1537    /// Takes a snapshot of the current schedule state.
1538    pub fn snapshot(&self) -> CompactionScheduleSnapshot {
1539        let guard = self.inner.lock();
1540        // Record time after lock to ensure accurate ordering vs. try_sched_compaction
1541        let snapshot_time = Instant::now();
1542        CompactionScheduleSnapshot {
1543            scheduled: guard.scheduled.clone(),
1544            snapshot_time,
1545        }
1546    }
1547
1548    /// Removes all schedule state for a deleted or merged group.
1549    pub fn remove_compaction_group(&self, compaction_group: CompactionGroupId) {
1550        let mut guard = self.inner.lock();
1551        guard
1552            .scheduled
1553            .retain(|(group, _)| *group != compaction_group);
1554        guard.dynamic_cooldown.remove(&compaction_group);
1555        guard.last_new_data_time.remove(&compaction_group);
1556    }
1557}
1558
1559impl Compaction {
1560    pub fn get_compact_task_assignments_by_group_id(
1561        &self,
1562        compaction_group_id: CompactionGroupId,
1563    ) -> Vec<CompactTaskAssignment> {
1564        self.compact_task_assignment
1565            .values()
1566            .filter_map(|assignment| {
1567                if assignment.compact_task.compaction_group_id == compaction_group_id {
1568                    Some(assignment.clone())
1569                } else {
1570                    None
1571                }
1572            })
1573            .collect()
1574    }
1575}
1576
1577#[derive(Clone, Default)]
1578pub struct CompactionGroupStatistic {
1579    pub group_id: CompactionGroupId,
1580    pub group_size: u64,
1581    pub table_statistic: BTreeMap<StateTableId, u64>,
1582    pub compaction_group_config: CompactionGroup,
1583}
1584
1585/// Updates table stats caused by vnode watermark trivial reclaim compaction.
1586fn update_table_stats_for_vnode_watermark_trivial_reclaim(
1587    table_stats: &mut PbTableStatsMap,
1588    task: &CompactTask,
1589) {
1590    if task.task_type != TaskType::VnodeWatermark {
1591        return;
1592    }
1593    let mut deleted_table_keys: HashMap<TableId, u64> = HashMap::default();
1594    for s in task.input_ssts.iter().flat_map(|l| l.table_infos.iter()) {
1595        assert_eq!(s.table_ids.len(), 1);
1596        let e = deleted_table_keys.entry(s.table_ids[0]).or_insert(0);
1597        *e += s.total_key_count;
1598    }
1599    for (table_id, delete_count) in deleted_table_keys {
1600        let Some(stats) = table_stats.get_mut(&table_id) else {
1601            continue;
1602        };
1603        if stats.total_key_count == 0 {
1604            continue;
1605        }
1606        let new_total_key_count = stats.total_key_count.saturating_sub(delete_count as i64);
1607        let ratio = new_total_key_count as f64 / stats.total_key_count as f64;
1608        // total_key_count is updated accurately.
1609        stats.total_key_count = new_total_key_count;
1610        // others are updated approximately.
1611        stats.total_key_size = (stats.total_key_size as f64 * ratio).ceil() as i64;
1612        stats.total_value_size = (stats.total_value_size as f64 * ratio).ceil() as i64;
1613    }
1614}
1615
1616#[derive(Debug, Clone)]
1617pub enum GroupState {
1618    /// The compaction group is not in emergency state.
1619    Normal,
1620
1621    /// The compaction group is in emergency state.
1622    Emergency(String), // reason
1623
1624    /// The compaction group is in write stop state.
1625    WriteStop(String), // reason
1626}
1627
1628impl GroupState {
1629    pub fn is_write_stop(&self) -> bool {
1630        matches!(self, Self::WriteStop(_))
1631    }
1632
1633    pub fn is_emergency(&self) -> bool {
1634        matches!(self, Self::Emergency(_))
1635    }
1636
1637    pub fn reason(&self) -> Option<&str> {
1638        match self {
1639            Self::Emergency(reason) | Self::WriteStop(reason) => Some(reason),
1640            _ => None,
1641        }
1642    }
1643}
1644
1645#[derive(Clone, Default)]
1646pub struct GroupStateValidator;
1647
1648impl GroupStateValidator {
1649    pub fn write_stop_sub_level_count(
1650        level_count: usize,
1651        compaction_config: &CompactionConfig,
1652    ) -> bool {
1653        let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize;
1654        level_count > threshold
1655    }
1656
1657    pub fn write_stop_l0_size(l0_size: u64, compaction_config: &CompactionConfig) -> bool {
1658        l0_size
1659            > compaction_config
1660                .level0_stop_write_threshold_max_size
1661                .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1662    }
1663
1664    pub fn write_stop_l0_file_count(
1665        l0_file_count: usize,
1666        compaction_config: &CompactionConfig,
1667    ) -> bool {
1668        l0_file_count
1669            > compaction_config
1670                .level0_stop_write_threshold_max_sst_count
1671                .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1672                as usize
1673    }
1674
1675    pub fn emergency_l0_file_count(
1676        l0_file_count: usize,
1677        compaction_config: &CompactionConfig,
1678    ) -> bool {
1679        l0_file_count
1680            > compaction_config
1681                .emergency_level0_sst_file_count
1682                .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1683                as usize
1684    }
1685
1686    pub fn emergency_l0_partition_count(
1687        last_l0_sub_level_partition_count: usize,
1688        compaction_config: &CompactionConfig,
1689    ) -> bool {
1690        last_l0_sub_level_partition_count
1691            > compaction_config
1692                .emergency_level0_sub_level_partition
1693                .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1694                as usize
1695    }
1696
1697    pub fn check_single_group_write_stop(
1698        levels: &Levels,
1699        compaction_config: &CompactionConfig,
1700    ) -> GroupState {
1701        if Self::write_stop_sub_level_count(levels.l0.sub_levels.len(), compaction_config) {
1702            return GroupState::WriteStop(format!(
1703                "WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels",
1704                levels.l0.sub_levels.len(),
1705                compaction_config.level0_stop_write_threshold_sub_level_number
1706            ));
1707        }
1708
1709        if Self::write_stop_l0_file_count(
1710            levels
1711                .l0
1712                .sub_levels
1713                .iter()
1714                .map(|l| l.table_infos.len())
1715                .sum(),
1716            compaction_config,
1717        ) {
1718            return GroupState::WriteStop(format!(
1719                "WriteStop(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1720                levels
1721                    .l0
1722                    .sub_levels
1723                    .iter()
1724                    .map(|l| l.table_infos.len())
1725                    .sum::<usize>(),
1726                compaction_config
1727                    .level0_stop_write_threshold_max_sst_count
1728                    .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1729            ));
1730        }
1731
1732        if Self::write_stop_l0_size(levels.l0.total_file_size, compaction_config) {
1733            return GroupState::WriteStop(format!(
1734                "WriteStop(l0_size: {}, threshold: {}) too large L0 size",
1735                levels.l0.total_file_size,
1736                compaction_config
1737                    .level0_stop_write_threshold_max_size
1738                    .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1739            ));
1740        }
1741
1742        GroupState::Normal
1743    }
1744
1745    pub fn check_single_group_emergency(
1746        levels: &Levels,
1747        compaction_config: &CompactionConfig,
1748    ) -> GroupState {
1749        if Self::emergency_l0_file_count(
1750            levels
1751                .l0
1752                .sub_levels
1753                .iter()
1754                .map(|l| l.table_infos.len())
1755                .sum(),
1756            compaction_config,
1757        ) {
1758            return GroupState::Emergency(format!(
1759                "Emergency(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1760                levels
1761                    .l0
1762                    .sub_levels
1763                    .iter()
1764                    .map(|l| l.table_infos.len())
1765                    .sum::<usize>(),
1766                compaction_config
1767                    .emergency_level0_sst_file_count
1768                    .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1769            ));
1770        }
1771
1772        if Self::emergency_l0_partition_count(
1773            levels
1774                .l0
1775                .sub_levels
1776                .first()
1777                .map(|l| l.table_infos.len())
1778                .unwrap_or(0),
1779            compaction_config,
1780        ) {
1781            return GroupState::Emergency(format!(
1782                "Emergency(l0_partition_count: {}, threshold: {}) too many L0 partitions",
1783                levels
1784                    .l0
1785                    .sub_levels
1786                    .first()
1787                    .map(|l| l.table_infos.len())
1788                    .unwrap_or(0),
1789                compaction_config
1790                    .emergency_level0_sub_level_partition
1791                    .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1792            ));
1793        }
1794
1795        GroupState::Normal
1796    }
1797
1798    pub fn group_state(levels: &Levels, compaction_config: &CompactionConfig) -> GroupState {
1799        let state = Self::check_single_group_write_stop(levels, compaction_config);
1800        if state.is_write_stop() {
1801            return state;
1802        }
1803
1804        Self::check_single_group_emergency(levels, compaction_config)
1805    }
1806}
1807
1808#[cfg(test)]
1809mod prefetched_task_id_tests {
1810    use crate::manager::MetaOpts;
1811
1812    #[test]
1813    fn test_compaction_task_id_refill_capacity_default() {
1814        assert_eq!(MetaOpts::test(false).compaction_task_id_refill_capacity, 64);
1815    }
1816}
1817
1818#[cfg(test)]
1819mod compaction_state_tests {
1820    use risingwave_pb::hummock::compact_task::TaskType;
1821
1822    use super::*;
1823
1824    #[test]
1825    fn test_basic_schedule_and_unschedule() {
1826        let state = CompactionState::new();
1827        let group_id: CompactionGroupId = 1.into();
1828
1829        // First schedule should succeed
1830        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1831        // Duplicate schedule should fail
1832        assert!(!state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1833        // Different task type should succeed
1834        assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1835
1836        // Snapshot should contain both
1837        let snapshot = state.snapshot();
1838        assert!(snapshot.scheduled.contains(&(group_id, TaskType::Dynamic)));
1839        assert!(snapshot.scheduled.contains(&(group_id, TaskType::Ttl)));
1840
1841        // Unschedule removes from scheduled set
1842        state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1843        let snapshot2 = state.snapshot();
1844        assert!(!snapshot2.scheduled.contains(&(group_id, TaskType::Dynamic)));
1845        assert!(snapshot2.scheduled.contains(&(group_id, TaskType::Ttl)));
1846    }
1847
1848    #[test]
1849    fn test_cooldown_blocks_periodic_trigger() {
1850        let state = CompactionState::new();
1851        let group_id: CompactionGroupId = 1.into();
1852
1853        // Schedule then unschedule - should add to cooldown
1854        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1855        let snapshot = state.snapshot();
1856        state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1857
1858        // Verify in cooldown
1859        assert!(state.inner.lock().dynamic_cooldown.contains(&group_id));
1860
1861        // Periodic trigger should be blocked
1862        assert!(!state.try_sched_compaction(
1863            group_id,
1864            TaskType::Dynamic,
1865            ScheduleTrigger::Periodic
1866        ));
1867    }
1868
1869    #[test]
1870    fn test_new_data_clears_cooldown() {
1871        let state = CompactionState::new();
1872        let group_id: CompactionGroupId = 1.into();
1873
1874        // Put group in cooldown
1875        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1876        let snapshot = state.snapshot();
1877        state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1878        assert!(state.inner.lock().dynamic_cooldown.contains(&group_id));
1879
1880        // NewData trigger should clear cooldown and schedule
1881        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1882        assert!(!state.inner.lock().dynamic_cooldown.contains(&group_id));
1883    }
1884
1885    #[test]
1886    fn test_cooldown_only_affects_dynamic_type() {
1887        let state = CompactionState::new();
1888        let group_id: CompactionGroupId = 1.into();
1889
1890        // Put group in cooldown for Dynamic
1891        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1892        let snapshot = state.snapshot();
1893        state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1894
1895        // Ttl unschedule should NOT add to cooldown
1896        let group_id_2: CompactionGroupId = 2.into();
1897        assert!(state.try_sched_compaction(group_id_2, TaskType::Ttl, ScheduleTrigger::Periodic));
1898        let snapshot2 = state.snapshot();
1899        state.unschedule(group_id_2, TaskType::Ttl, snapshot2.snapshot_time());
1900        assert!(!state.inner.lock().dynamic_cooldown.contains(&group_id_2));
1901
1902        // Other task types should work regardless of cooldown
1903        assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1904        assert!(state.try_sched_compaction(
1905            group_id,
1906            TaskType::SpaceReclaim,
1907            ScheduleTrigger::Periodic
1908        ));
1909    }
1910
1911    #[test]
1912    fn test_race_condition_new_data_after_snapshot() {
1913        let state = CompactionState::new();
1914        let group_id: CompactionGroupId = 1.into();
1915
1916        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1917        let snapshot = state.snapshot();
1918
1919        // Simulate new data arriving AFTER snapshot
1920        {
1921            let mut guard = state.inner.lock();
1922            guard.last_new_data_time.insert(group_id, Instant::now());
1923        }
1924
1925        // Unschedule should NOT add to cooldown (new data arrived after snapshot)
1926        state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1927        assert!(
1928            !state.inner.lock().dynamic_cooldown.contains(&group_id),
1929            "Should skip cooldown when new data arrived after snapshot"
1930        );
1931    }
1932
1933    #[test]
1934    fn test_remove_compaction_group_cleans_all_state() {
1935        let state = CompactionState::new();
1936        let group_id: CompactionGroupId = 1.into();
1937
1938        // Set up state
1939        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1940        assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1941        state.inner.lock().dynamic_cooldown.insert(group_id);
1942
1943        // Remove group
1944        state.remove_compaction_group(group_id);
1945
1946        // Verify all state cleaned up
1947        let guard = state.inner.lock();
1948        assert!(!guard.scheduled.contains(&(group_id, TaskType::Dynamic)));
1949        assert!(!guard.scheduled.contains(&(group_id, TaskType::Ttl)));
1950        assert!(!guard.dynamic_cooldown.contains(&group_id));
1951        assert!(!guard.last_new_data_time.contains_key(&group_id));
1952    }
1953
1954    #[test]
1955    fn test_snapshot_pick_type_priority() {
1956        let state = CompactionState::new();
1957        let group_id: CompactionGroupId = 1.into();
1958
1959        // Empty group returns None
1960        assert_eq!(state.snapshot().pick_type(group_id), None);
1961
1962        // Priority order: Dynamic > SpaceReclaim > Ttl > Tombstone > VnodeWatermark
1963        state.try_sched_compaction(
1964            group_id,
1965            TaskType::VnodeWatermark,
1966            ScheduleTrigger::Periodic,
1967        );
1968        assert_eq!(
1969            state.snapshot().pick_type(group_id),
1970            Some(TaskType::VnodeWatermark)
1971        );
1972
1973        state.try_sched_compaction(group_id, TaskType::Tombstone, ScheduleTrigger::Periodic);
1974        assert_eq!(
1975            state.snapshot().pick_type(group_id),
1976            Some(TaskType::Tombstone)
1977        );
1978
1979        state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic);
1980        assert_eq!(state.snapshot().pick_type(group_id), Some(TaskType::Ttl));
1981
1982        state.try_sched_compaction(group_id, TaskType::SpaceReclaim, ScheduleTrigger::Periodic);
1983        assert_eq!(
1984            state.snapshot().pick_type(group_id),
1985            Some(TaskType::SpaceReclaim)
1986        );
1987
1988        state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData);
1989        assert_eq!(
1990            state.snapshot().pick_type(group_id),
1991            Some(TaskType::Dynamic)
1992        );
1993    }
1994
1995    #[test]
1996    fn test_multiple_groups_independent_cooldown() {
1997        let state = CompactionState::new();
1998        let g1: CompactionGroupId = 1.into();
1999        let g2: CompactionGroupId = 2.into();
2000
2001        state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
2002        state.try_sched_compaction(g2, TaskType::Dynamic, ScheduleTrigger::NewData);
2003        let snapshot = state.snapshot();
2004
2005        // Only unschedule g1
2006        state.unschedule(g1, TaskType::Dynamic, snapshot.snapshot_time());
2007
2008        let guard = state.inner.lock();
2009        assert!(guard.dynamic_cooldown.contains(&g1));
2010        assert!(!guard.dynamic_cooldown.contains(&g2));
2011    }
2012
2013    #[test]
2014    fn test_pick_compaction_groups_empty() {
2015        let state = CompactionState::new();
2016        let snapshot = state.snapshot();
2017        // No scheduled groups → returns None
2018        assert!(snapshot.pick_compaction_groups_and_type().is_none());
2019    }
2020
2021    #[test]
2022    fn test_pick_compaction_groups_mixed_types() {
2023        let state = CompactionState::new();
2024        let g1: CompactionGroupId = 1.into();
2025        let g2: CompactionGroupId = 2.into();
2026        let g3: CompactionGroupId = 3.into();
2027
2028        // g1: Dynamic, g2: Ttl, g3: Dynamic
2029        state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
2030        state.try_sched_compaction(g2, TaskType::Ttl, ScheduleTrigger::Periodic);
2031        state.try_sched_compaction(g3, TaskType::Dynamic, ScheduleTrigger::NewData);
2032
2033        let snapshot = state.snapshot();
2034        let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
2035
2036        // Due to shuffle, either:
2037        // - Ttl group is encountered first → returns (vec![g2], Ttl)
2038        // - Dynamic group is encountered first → collects all Dynamic, skips Ttl
2039        //   → returns ([g1, g3] in some order, Dynamic)
2040        if task_type == TaskType::Dynamic {
2041            assert!(groups.contains(&g1));
2042            assert!(groups.contains(&g3));
2043            assert!(!groups.contains(&g2)); // Ttl group excluded from Dynamic result
2044        } else {
2045            assert_eq!(task_type, TaskType::Ttl);
2046            assert_eq!(groups, vec![g2]);
2047        }
2048    }
2049
2050    #[test]
2051    fn test_pick_compaction_groups_all_dynamic() {
2052        let state = CompactionState::new();
2053        let g1: CompactionGroupId = 1.into();
2054        let g2: CompactionGroupId = 2.into();
2055
2056        state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
2057        state.try_sched_compaction(g2, TaskType::Dynamic, ScheduleTrigger::NewData);
2058
2059        let snapshot = state.snapshot();
2060        let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
2061        assert_eq!(task_type, TaskType::Dynamic);
2062        assert!(groups.contains(&g1));
2063        assert!(groups.contains(&g2));
2064    }
2065
2066    #[test]
2067    fn test_pick_compaction_groups_single_non_dynamic() {
2068        let state = CompactionState::new();
2069        let g1: CompactionGroupId = 1.into();
2070
2071        state.try_sched_compaction(g1, TaskType::SpaceReclaim, ScheduleTrigger::Periodic);
2072
2073        let snapshot = state.snapshot();
2074        let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
2075        assert_eq!(task_type, TaskType::SpaceReclaim);
2076        assert_eq!(groups, vec![g1]);
2077    }
2078}