risingwave_meta/hummock/manager/compaction/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17use std::time::Instant;
18
19use anyhow::Context;
20use compaction_event_loop::{
21    HummockCompactionEventDispatcher, HummockCompactionEventHandler, HummockCompactionEventLoop,
22    HummockCompactorDedicatedEventLoop,
23};
24use fail::fail_point;
25use itertools::Itertools;
26use parking_lot::Mutex;
27use rand::rng as thread_rng;
28use rand::seq::SliceRandom;
29use risingwave_common::catalog::TableId;
30use risingwave_common::config::meta::default::compaction_config;
31use risingwave_common::util::epoch::Epoch;
32use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask};
33use risingwave_hummock_sdk::compaction_group::StateTableId;
34use risingwave_hummock_sdk::compaction_group::hummock_version_ext::safe_epoch_table_watermarks_impl;
35use risingwave_hummock_sdk::level::Levels;
36use risingwave_hummock_sdk::sstable_info::SstableInfo;
37use risingwave_hummock_sdk::table_stats::{
38    PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats,
39};
40use risingwave_hummock_sdk::table_watermark::TableWatermarks;
41use risingwave_hummock_sdk::version::{GroupDelta, IntraLevelDelta};
42use risingwave_hummock_sdk::{
43    CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockSstableId,
44    HummockSstableObjectId, HummockVersionId, compact_task_to_string, statistics_compact_task,
45};
46use risingwave_meta_model::hummock_sequence::COMPACTION_TASK_ID;
47use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
48use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
49use risingwave_pb::hummock::{
50    CompactTaskAssignment, CompactionConfig, PbCompactStatus, PbCompactTaskAssignment,
51    SubscribeCompactionEventRequest, TableOption, compact_task,
52};
53use thiserror_ext::AsReport;
54use tokio::sync::RwLockWriteGuard;
55use tokio::sync::mpsc::UnboundedReceiver;
56use tokio::sync::oneshot::{Receiver, Sender};
57use tokio::task::JoinHandle;
58use tonic::Streaming;
59use tracing::warn;
60
61use crate::hummock::compaction::selector::level_selector::PickerInfo;
62use crate::hummock::compaction::selector::{
63    DynamicLevelSelector, DynamicLevelSelectorCore, LocalSelectorStatistic, ManualCompactionOption,
64    ManualCompactionSelector, SpaceReclaimCompactionSelector, TombstoneCompactionSelector,
65    TtlCompactionSelector, VnodeWatermarkCompactionSelector,
66};
67use crate::hummock::compaction::{
68    CompactStatus, CompactionDeveloperConfig, CompactionSelector,
69    CompactionTask as PickedCompactionTask,
70};
71use crate::hummock::error::{Error, Result};
72use crate::hummock::manager::CompactionTaskReportResult;
73use crate::hummock::manager::compaction::compact_task_builder::{
74    CompactTaskBuildContext, attach_compact_task_table_metadata, build_base_compact_task,
75};
76use crate::hummock::manager::transaction::{
77    HummockVersionStatsTransaction, HummockVersionTransaction,
78};
79use crate::hummock::manager::versioning::Versioning;
80use crate::hummock::metrics_utils::{
81    build_compact_task_level_type_metrics_label, trigger_compact_tasks_stat,
82    trigger_local_table_stat,
83};
84use crate::hummock::model::CompactionGroup;
85use crate::hummock::{HummockManager, commit_multi_var, start_measure_real_process_timer};
86use crate::manager::META_NODE_ID;
87use crate::model::BTreeMapTransaction;
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90pub enum ManualCompactionTriggerResult {
91    Submitted,
92    Retry,
93}
94
95mod compact_task_builder;
96pub mod compaction_event_loop;
97pub mod compaction_group_manager;
98pub mod compaction_group_schedule;
99
100static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
101    [
102        TaskStatus::ManualCanceled,
103        TaskStatus::SendFailCanceled,
104        TaskStatus::AssignFailCanceled,
105        TaskStatus::HeartbeatCanceled,
106        TaskStatus::InvalidGroupCanceled,
107        TaskStatus::NoAvailMemoryResourceCanceled,
108        TaskStatus::NoAvailCpuResourceCanceled,
109        TaskStatus::HeartbeatProgressCanceled,
110    ]
111    .into_iter()
112    .collect()
113});
114
115fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> {
116    let mut compaction_selectors: HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> =
117        HashMap::default();
118    compaction_selectors.insert(
119        compact_task::TaskType::Dynamic,
120        Box::<DynamicLevelSelector>::default(),
121    );
122    compaction_selectors.insert(
123        compact_task::TaskType::SpaceReclaim,
124        Box::<SpaceReclaimCompactionSelector>::default(),
125    );
126    compaction_selectors.insert(
127        compact_task::TaskType::Ttl,
128        Box::<TtlCompactionSelector>::default(),
129    );
130    compaction_selectors.insert(
131        compact_task::TaskType::Tombstone,
132        Box::<TombstoneCompactionSelector>::default(),
133    );
134    compaction_selectors.insert(
135        compact_task::TaskType::VnodeWatermark,
136        Box::<VnodeWatermarkCompactionSelector>::default(),
137    );
138    compaction_selectors
139}
140
141enum BuiltCompactTask {
142    MetaFinished(CompactTask),
143    PendingAssignment(CompactTask),
144}
145
146impl HummockVersionTransaction<'_> {
147    fn apply_compact_task(&mut self, compact_task: &CompactTask) {
148        let mut version_delta = self.new_delta();
149        let trivial_move = compact_task.is_trivial_move_task();
150        version_delta.trivial_move = trivial_move;
151
152        let group_deltas = &mut version_delta
153            .group_deltas
154            .entry(compact_task.compaction_group_id)
155            .or_default()
156            .group_deltas;
157        let mut removed_table_ids_map: BTreeMap<u32, HashSet<HummockSstableId>> =
158            BTreeMap::default();
159
160        for level in &compact_task.input_ssts {
161            let level_idx = level.level_idx;
162
163            removed_table_ids_map
164                .entry(level_idx)
165                .or_default()
166                .extend(level.table_infos.iter().map(|sst| sst.sst_id));
167        }
168
169        for (level_idx, removed_table_ids) in removed_table_ids_map {
170            let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
171                level_idx,
172                0, // default
173                removed_table_ids,
174                vec![], // default
175                0,      // default
176                compact_task.compaction_group_version_id,
177            ));
178
179            group_deltas.push(group_delta);
180        }
181
182        let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
183            compact_task.target_level,
184            compact_task.target_sub_level_id,
185            HashSet::new(), // default
186            compact_task.sorted_output_ssts.clone(),
187            compact_task.split_weight_by_vnode,
188            compact_task.compaction_group_version_id,
189        ));
190
191        group_deltas.push(group_delta);
192        version_delta.pre_apply();
193    }
194}
195
196#[derive(Default)]
197pub struct Compaction {
198    /// Compaction task that is already assigned to a compactor
199    pub compact_task_assignment: BTreeMap<HummockCompactionTaskId, PbCompactTaskAssignment>,
200    /// `CompactStatus` of each compaction group
201    pub compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus>,
202
203    pub _deterministic_mode: bool,
204}
205
206impl HummockManager {
207    pub async fn get_assigned_compact_task_num(&self) -> u64 {
208        self.compaction.read().await.compact_task_assignment.len() as u64
209    }
210
211    pub async fn list_compaction_status(
212        &self,
213    ) -> (Vec<PbCompactStatus>, Vec<CompactTaskAssignment>) {
214        let compaction = self.compaction.read().await;
215        (
216            compaction.compaction_statuses.values().map_into().collect(),
217            compaction
218                .compact_task_assignment
219                .values()
220                .cloned()
221                .collect(),
222        )
223    }
224
225    pub async fn get_compaction_scores(
226        &self,
227        compaction_group_id: CompactionGroupId,
228    ) -> Vec<PickerInfo> {
229        let (status, levels, group) = {
230            let compaction = self.compaction.read().await;
231            let versioning = self.versioning.read().await;
232            let config_manager = self.compaction_group_manager.read().await;
233            match (
234                compaction.compaction_statuses.get(&compaction_group_id),
235                versioning.current_version.levels.get(&compaction_group_id),
236                config_manager.try_get_compaction_group_config(compaction_group_id),
237            ) {
238                (Some(cs), Some(v), Some(cf)) => (cs.to_owned(), v.to_owned(), cf),
239                _ => {
240                    return vec![];
241                }
242            }
243        };
244        let dynamic_level_core = DynamicLevelSelectorCore::new(
245            group.compaction_config,
246            Arc::new(CompactionDeveloperConfig::default()),
247        );
248        let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers);
249        ctx.score_levels
250    }
251}
252
253impl HummockManager {
254    pub fn compaction_event_loop(
255        hummock_manager: Arc<Self>,
256        compactor_streams_change_rx: UnboundedReceiver<(
257            HummockContextId,
258            Streaming<SubscribeCompactionEventRequest>,
259        )>,
260    ) -> Vec<(JoinHandle<()>, Sender<()>)> {
261        let mut join_handle_vec = Vec::default();
262
263        let hummock_compaction_event_handler =
264            HummockCompactionEventHandler::new(hummock_manager.clone());
265
266        let dedicated_event_loop = HummockCompactorDedicatedEventLoop::new(
267            hummock_manager.clone(),
268            hummock_compaction_event_handler.clone(),
269        );
270
271        let (dedicated_event_loop_join_handle, event_tx, shutdown_tx) = dedicated_event_loop.run();
272        join_handle_vec.push((dedicated_event_loop_join_handle, shutdown_tx));
273
274        let hummock_compaction_event_dispatcher = HummockCompactionEventDispatcher::new(
275            hummock_manager.env.opts.clone(),
276            hummock_compaction_event_handler,
277            Some(event_tx),
278        );
279
280        let event_loop = HummockCompactionEventLoop::new(
281            hummock_compaction_event_dispatcher,
282            hummock_manager.metrics.clone(),
283            compactor_streams_change_rx,
284        );
285
286        let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
287        join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
288
289        join_handle_vec
290    }
291
292    pub fn add_compactor_stream(
293        &self,
294        context_id: HummockContextId,
295        req_stream: Streaming<SubscribeCompactionEventRequest>,
296    ) {
297        self.compactor_streams_change_tx
298            .send((context_id, req_stream))
299            .unwrap();
300    }
301}
302
303impl HummockManager {
304    /// Gets one compaction task id with best-effort batching while ensuring concurrent callers
305    /// share the same refill instead of wasting an allocated range.
306    async fn next_compaction_task_id_with_prefetch(&self, refill_capacity: u32) -> Result<u64> {
307        self.prefetched_compaction_task_ids
308            .next(refill_capacity, |count| async move {
309                self.env
310                    .hummock_seq
311                    .next_interval(COMPACTION_TASK_ID, count)
312                    .await
313            })
314            .await
315    }
316
317    pub async fn get_compact_tasks_impl(
318        &self,
319        compaction_groups: Vec<CompactionGroupId>,
320        max_select_count: usize,
321        selector: &mut dyn CompactionSelector,
322    ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
323        let deterministic_mode = self.env.opts.compaction_deterministic_test;
324
325        let mut compaction_guard = self.compaction.write().await;
326        let compaction: &mut Compaction = &mut compaction_guard;
327        let mut versioning_guard = self.versioning.write().await;
328        let versioning: &mut Versioning = &mut versioning_guard;
329
330        let _timer = start_measure_real_process_timer!(self, "get_compact_tasks_impl");
331
332        let start_time = Instant::now();
333        let mut compaction_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
334
335        let mut compact_task_assignment =
336            BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
337
338        let mut version = HummockVersionTransaction::new(
339            &mut versioning.current_version,
340            &mut versioning.hummock_version_deltas,
341            &mut versioning.table_change_log,
342            self.env.notification_manager(),
343            None,
344            &self.metrics,
345            &self.env.opts,
346        );
347        // Apply stats changes.
348        let mut version_stats = HummockVersionStatsTransaction::new(
349            &mut versioning.version_stats,
350            self.env.notification_manager(),
351        );
352
353        if deterministic_mode {
354            version.disable_apply_to_txn();
355        }
356        let all_versioned_table_schemas = if self.env.opts.enable_dropped_column_reclaim {
357            self.metadata_manager
358                .catalog_controller
359                .get_versioned_table_schemas()
360                .await
361                .map_err(|e| Error::Internal(e.into()))?
362        } else {
363            HashMap::default()
364        };
365        let mut unschedule_groups = vec![];
366        let mut trivial_tasks = vec![];
367        let mut pick_tasks = vec![];
368        let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts(
369            &self.env.opts,
370        ));
371        // Reuse prefetched task ids from previous loops.
372        // Each group consumes at most one task_id (trivial tasks share the same id with normal
373        // task). When prefetched ids are exhausted, refill in fixed-size chunks to avoid
374        // per-group SQL transactions while keeping the in-memory cache small.
375        'outside: for compaction_group_id in compaction_groups {
376            if pick_tasks.len() >= max_select_count {
377                break;
378            }
379
380            if !version
381                .latest_version()
382                .levels
383                .contains_key(&compaction_group_id)
384            {
385                continue;
386            }
387
388            // When the last table of a compaction group is deleted, the compaction group (and its
389            // config) is destroyed as well. Then a compaction task for this group may come later and
390            // cannot find its config.
391            let group_config = {
392                let config_manager = self.compaction_group_manager.read().await;
393
394                match config_manager.try_get_compaction_group_config(compaction_group_id) {
395                    Some(config) => config,
396                    None => continue,
397                }
398            };
399
400            // Use prefetched task id if available; when exhausted, refill in chunks first.
401            let task_id = self
402                .next_compaction_task_id_with_prefetch(
403                    self.env.opts.compaction_task_id_refill_capacity,
404                )
405                .await?;
406
407            if !compaction_statuses.contains_key(&compaction_group_id) {
408                // lazy initialize.
409                compaction_statuses.insert(
410                    compaction_group_id,
411                    CompactStatus::new(
412                        compaction_group_id,
413                        group_config.compaction_config.max_level,
414                    ),
415                );
416            }
417            let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap();
418
419            let mut stats = LocalSelectorStatistic::default();
420            let member_table_ids: Vec<_> = version
421                .latest_version()
422                .state_table_info
423                .compaction_group_member_table_ids(compaction_group_id)
424                .iter()
425                .copied()
426                .collect();
427
428            let mut table_id_to_option: HashMap<TableId, _> = HashMap::default();
429
430            {
431                let guard = self.table_id_to_table_option.read();
432                for table_id in &member_table_ids {
433                    if let Some(opts) = guard.get(table_id) {
434                        table_id_to_option.insert(*table_id, *opts);
435                    }
436                }
437            }
438
439            while let Some(picked_task) = compact_status.get_compact_task(
440                version
441                    .latest_version()
442                    .get_compaction_group_levels(compaction_group_id),
443                version
444                    .latest_version()
445                    .state_table_info
446                    .compaction_group_member_table_ids(compaction_group_id),
447                task_id as HummockCompactionTaskId,
448                &group_config,
449                &mut stats,
450                selector,
451                &table_id_to_option,
452                developer_config.clone(),
453                &version.latest_version().table_watermarks,
454                &version.latest_version().state_table_info,
455            ) {
456                let compaction_group_levels = version
457                    .latest_version()
458                    .get_compaction_group_levels(compaction_group_id);
459                let target_level_id = picked_task.input.target_level as u32;
460                let is_target_level_last = compaction_group_levels.is_last_level(target_level_id);
461                let table_options = table_id_to_option
462                    .iter()
463                    .map(|(table_id, table_option)| (*table_id, TableOption::from(table_option)))
464                    .collect();
465                let built_compact_task = self.build_ready_compact_task(
466                    picked_task,
467                    CompactTaskBuildContext {
468                        task_id,
469                        compaction_group_id: group_config.group_id,
470                        compaction_group_version_id: compaction_group_levels
471                            .compaction_group_version_id,
472                        existing_table_ids: member_table_ids.clone(),
473                        table_options,
474                        is_target_level_last,
475                        compaction_config: group_config.compaction_config.clone(),
476                        current_epoch_time: Epoch::now().0,
477                    },
478                    &version.latest_version().table_watermarks,
479                    &all_versioned_table_schemas,
480                );
481
482                match built_compact_task {
483                    BuiltCompactTask::MetaFinished(compact_task) => {
484                        let label = compact_task.task_label();
485                        tracing::debug!(
486                            "{} for compaction group {}: input: {:?}, cost time: {:?}",
487                            label,
488                            compact_task.compaction_group_id,
489                            compact_task.input_ssts,
490                            start_time.elapsed()
491                        );
492                        compact_status.report_compact_task(&compact_task);
493                        update_table_stats_for_vnode_watermark_trivial_reclaim(
494                            &mut version_stats.table_stats,
495                            &compact_task,
496                        );
497                        self.metrics
498                            .compact_frequency
499                            .with_label_values(&[
500                                label,
501                                &compact_task.compaction_group_id.to_string(),
502                                selector.task_type().as_str_name(),
503                                "SUCCESS",
504                            ])
505                            .inc();
506
507                        version.apply_compact_task(&compact_task);
508                        trivial_tasks.push(compact_task);
509                        if trivial_tasks.len() >= self.env.opts.max_trivial_move_task_count_per_loop
510                        {
511                            break 'outside;
512                        }
513                    }
514                    BuiltCompactTask::PendingAssignment(compact_task) => {
515                        compact_task_assignment.insert(
516                            compact_task.task_id,
517                            CompactTaskAssignment {
518                                compact_task: Some(compact_task.clone().into()),
519                                context_id: META_NODE_ID, // deprecated
520                            },
521                        );
522
523                        pick_tasks.push(compact_task);
524                        break;
525                    }
526                }
527
528                stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
529                stats = LocalSelectorStatistic::default();
530            }
531            if pick_tasks
532                .last()
533                .map(|task| task.compaction_group_id != compaction_group_id)
534                .unwrap_or(true)
535            {
536                unschedule_groups.push(compaction_group_id);
537            }
538            stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
539        }
540
541        if !trivial_tasks.is_empty() {
542            commit_multi_var!(
543                self.meta_store_ref(),
544                compaction_statuses,
545                compact_task_assignment,
546                version,
547                version_stats
548            )?;
549            self.metrics
550                .compact_task_batch_count
551                .with_label_values(&["batch_trivial_move"])
552                .observe(trivial_tasks.len() as f64);
553
554            for trivial_task in &trivial_tasks {
555                self.metrics
556                    .compact_task_trivial_move_sst_count
557                    .with_label_values(&[&trivial_task.compaction_group_id.to_string()])
558                    .observe(trivial_task.input_ssts[0].table_infos.len() as _);
559            }
560
561            drop(versioning_guard);
562        } else {
563            // We are using a single transaction to ensure that each task has progress when it is
564            // created.
565            drop(versioning_guard);
566            commit_multi_var!(
567                self.meta_store_ref(),
568                compaction_statuses,
569                compact_task_assignment
570            )?;
571        }
572        drop(compaction_guard);
573        if !pick_tasks.is_empty() {
574            self.metrics
575                .compact_task_batch_count
576                .with_label_values(&["batch_get_compact_task"])
577                .observe(pick_tasks.len() as f64);
578        }
579
580        for compact_task in &mut pick_tasks {
581            let compaction_group_id = compact_task.compaction_group_id;
582
583            // Initiate heartbeat for the task to track its progress.
584            self.compactor_manager
585                .initiate_task_heartbeat(compact_task.clone());
586
587            // this task has been finished.
588            compact_task.task_status = TaskStatus::Pending;
589            let compact_task_statistics = statistics_compact_task(compact_task);
590
591            let level_type_label = build_compact_task_level_type_metrics_label(
592                compact_task.input_ssts[0].level_idx as usize,
593                compact_task.input_ssts.last().unwrap().level_idx as usize,
594            );
595
596            let level_count = compact_task.input_ssts.len();
597            if compact_task.input_ssts[0].level_idx == 0 {
598                self.metrics
599                    .l0_compact_level_count
600                    .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
601                    .observe(level_count as _);
602            }
603
604            self.metrics
605                .compact_task_size
606                .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
607                .observe(compact_task_statistics.total_file_size as _);
608
609            self.metrics
610                .compact_task_size
611                .with_label_values(&[
612                    &compaction_group_id.to_string(),
613                    &format!("{} uncompressed", level_type_label),
614                ])
615                .observe(compact_task_statistics.total_uncompressed_file_size as _);
616
617            self.metrics
618                .compact_task_file_count
619                .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
620                .observe(compact_task_statistics.total_file_count as _);
621
622            tracing::trace!(
623                "For compaction group {}: pick up {} {} sub_level in level {} to compact to target {}. cost time: {:?} compact_task_statistics {:?}",
624                compaction_group_id,
625                level_count,
626                compact_task.input_ssts[0].level_type.as_str_name(),
627                compact_task.input_ssts[0].level_idx,
628                compact_task.target_level,
629                start_time.elapsed(),
630                compact_task_statistics
631            );
632        }
633
634        #[cfg(test)]
635        {
636            self.check_state_consistency().await;
637        }
638        pick_tasks.extend(trivial_tasks);
639        Ok((pick_tasks, unschedule_groups))
640    }
641
642    /// Cancels a compaction task no matter it's assigned or unassigned.
643    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
644        fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore(
645            anyhow::anyhow!("failpoint metastore err")
646        )));
647        let ret = self
648            .cancel_compact_task_impl(vec![task_id], task_status)
649            .await?;
650        Ok(ret[0])
651    }
652
653    pub async fn cancel_compact_tasks(
654        &self,
655        tasks: Vec<u64>,
656        task_status: TaskStatus,
657    ) -> Result<Vec<bool>> {
658        self.cancel_compact_task_impl(tasks, task_status).await
659    }
660
661    async fn cancel_compact_task_impl(
662        &self,
663        task_ids: Vec<u64>,
664        task_status: TaskStatus,
665    ) -> Result<Vec<bool>> {
666        assert!(CANCEL_STATUS_SET.contains(&task_status));
667        let tasks = task_ids
668            .into_iter()
669            .map(|task_id| ReportTask {
670                task_id,
671                task_status,
672                sorted_output_ssts: vec![],
673                table_stats_change: HashMap::default(),
674                object_timestamps: HashMap::default(),
675            })
676            .collect_vec();
677        let rets = self.report_compact_tasks(tasks).await?;
678        #[cfg(test)]
679        {
680            self.check_state_consistency().await;
681        }
682        Ok(rets)
683    }
684
685    async fn get_compact_tasks(
686        &self,
687        mut compaction_groups: Vec<CompactionGroupId>,
688        max_select_count: usize,
689        selector: &mut dyn CompactionSelector,
690    ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
691        fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
692            anyhow::anyhow!("failpoint metastore error")
693        )));
694        compaction_groups.shuffle(&mut thread_rng());
695        let (mut tasks, groups) = self
696            .get_compact_tasks_impl(compaction_groups, max_select_count, selector)
697            .await?;
698        tasks.retain(|task| {
699            if task.task_status == TaskStatus::Success {
700                debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
701                false
702            } else {
703                true
704            }
705        });
706        Ok((tasks, groups))
707    }
708
709    pub async fn get_compact_task(
710        &self,
711        compaction_group_id: CompactionGroupId,
712        selector: &mut dyn CompactionSelector,
713    ) -> Result<Option<CompactTask>> {
714        fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
715            anyhow::anyhow!("failpoint metastore error")
716        )));
717
718        let (normal_tasks, _) = self
719            .get_compact_tasks_impl(vec![compaction_group_id], 1, selector)
720            .await?;
721        for task in normal_tasks {
722            if task.task_status != TaskStatus::Success {
723                return Ok(Some(task));
724            }
725            debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
726        }
727        Ok(None)
728    }
729
730    pub async fn manual_get_compact_task(
731        &self,
732        compaction_group_id: CompactionGroupId,
733        manual_compaction_option: ManualCompactionOption,
734    ) -> Result<Option<CompactTask>> {
735        let (task, _) = self
736            .manual_get_compact_task_with_info(compaction_group_id, manual_compaction_option)
737            .await?;
738        Ok(task)
739    }
740
741    pub async fn manual_get_compact_task_with_info(
742        &self,
743        compaction_group_id: CompactionGroupId,
744        manual_compaction_option: ManualCompactionOption,
745    ) -> Result<(Option<CompactTask>, bool)> {
746        let mut selector = ManualCompactionSelector::new(manual_compaction_option);
747        let task = self
748            .get_compact_task(compaction_group_id, &mut selector)
749            .await?;
750        if let Some(err) = selector.validation_error() {
751            return Err(Error::InvalidManualCompactionOption(err.to_owned()));
752        }
753        Ok((task, selector.blocked_by_pending()))
754    }
755
756    pub async fn report_compact_task(
757        &self,
758        task_id: u64,
759        task_status: TaskStatus,
760        sorted_output_ssts: Vec<SstableInfo>,
761        table_stats_change: Option<PbTableStatsMap>,
762        object_timestamps: HashMap<HummockSstableObjectId, u64>,
763    ) -> Result<bool> {
764        let rets = self
765            .report_compact_tasks(vec![ReportTask {
766                task_id,
767                task_status,
768                sorted_output_ssts,
769                table_stats_change: table_stats_change.unwrap_or_default(),
770                object_timestamps,
771            }])
772            .await?;
773        Ok(rets[0])
774    }
775
776    pub async fn report_compact_tasks(&self, report_tasks: Vec<ReportTask>) -> Result<Vec<bool>> {
777        let compaction_guard = self.compaction.write().await;
778        let versioning_guard = self.versioning.write().await;
779
780        self.report_compact_tasks_impl(report_tasks, compaction_guard, versioning_guard)
781            .await
782    }
783
784    /// Finishes or cancels a compaction task, according to `task_status`.
785    ///
786    /// If `context_id` is not None, its validity will be checked when writing meta store.
787    /// Its ownership of the task is checked as well.
788    ///
789    /// Return Ok(false) indicates either the task is not found,
790    /// or the task is not owned by `context_id` when `context_id` is not None.
791    pub async fn report_compact_tasks_impl(
792        &self,
793        report_tasks: Vec<ReportTask>,
794        mut compaction_guard: RwLockWriteGuard<'_, Compaction>,
795        mut versioning_guard: RwLockWriteGuard<'_, Versioning>,
796    ) -> Result<Vec<bool>> {
797        let deterministic_mode = self.env.opts.compaction_deterministic_test;
798        let compaction: &mut Compaction = &mut compaction_guard;
799        let start_time = Instant::now();
800        let original_keys = compaction.compaction_statuses.keys().cloned().collect_vec();
801        let mut compact_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
802        let mut rets = vec![false; report_tasks.len()];
803        let mut compact_task_assignment =
804            BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
805        // The compaction task is finished.
806        let versioning: &mut Versioning = &mut versioning_guard;
807        let _timer = start_measure_real_process_timer!(self, "report_compact_tasks");
808
809        // purge stale compact_status
810        for group_id in original_keys {
811            if !versioning.current_version.levels.contains_key(&group_id) {
812                compact_statuses.remove(group_id);
813            }
814        }
815        let mut tasks = vec![];
816
817        let mut version = HummockVersionTransaction::new(
818            &mut versioning.current_version,
819            &mut versioning.hummock_version_deltas,
820            &mut versioning.table_change_log,
821            self.env.notification_manager(),
822            None,
823            &self.metrics,
824            &self.env.opts,
825        );
826
827        if deterministic_mode {
828            version.disable_apply_to_txn();
829        }
830
831        let mut version_stats = HummockVersionStatsTransaction::new(
832            &mut versioning.version_stats,
833            self.env.notification_manager(),
834        );
835        let mut success_count = 0;
836        let mut report_results = Vec::with_capacity(rets.len());
837        for (idx, task) in report_tasks.into_iter().enumerate() {
838            rets[idx] = true;
839            let task_id = task.task_id;
840            let mut task_status = task.task_status;
841            let mut compact_task = match compact_task_assignment.remove(task.task_id) {
842                Some(compact_task) => CompactTask::from(compact_task.compact_task.unwrap()),
843                None => {
844                    tracing::warn!("{}", format!("compact task {} not found", task.task_id));
845                    rets[idx] = false;
846                    report_results.push(CompactionTaskReportResult {
847                        task_id,
848                        task_status,
849                        reported: false,
850                    });
851                    continue;
852                }
853            };
854
855            {
856                // apply result
857                compact_task.task_status = task.task_status;
858                compact_task.sorted_output_ssts = task.sorted_output_ssts;
859            }
860
861            match compact_statuses.get_mut(compact_task.compaction_group_id) {
862                Some(mut compact_status) => {
863                    compact_status.report_compact_task(&compact_task);
864                }
865                None => {
866                    // When the group_id is not found in the compaction_statuses, it means the group has been removed.
867                    // The task is invalid and should be canceled.
868                    // e.g.
869                    // 1. The group is removed by the user unregistering the tables
870                    // 2. The group is removed by the group scheduling algorithm
871                    compact_task.task_status = TaskStatus::InvalidGroupCanceled;
872                }
873            }
874
875            let is_success = if let TaskStatus::Success = compact_task.task_status {
876                match self
877                    .report_compaction_sanity_check(&task.object_timestamps)
878                    .await
879                {
880                    Err(e) => {
881                        warn!(
882                            "failed to commit compaction task {} {}",
883                            compact_task.task_id,
884                            e.as_report()
885                        );
886                        compact_task.task_status = TaskStatus::RetentionTimeRejected;
887                        false
888                    }
889                    _ => {
890                        let group = version
891                            .latest_version()
892                            .levels
893                            .get(&compact_task.compaction_group_id)
894                            .unwrap();
895                        let is_expired = compact_task.is_expired(group.compaction_group_version_id);
896                        if is_expired {
897                            compact_task.task_status = TaskStatus::InputOutdatedCanceled;
898                            warn!(
899                                "The task may be expired because of group split, task:\n {:?}",
900                                compact_task_to_string(&compact_task)
901                            );
902                        }
903                        !is_expired
904                    }
905                }
906            } else {
907                false
908            };
909            if is_success {
910                success_count += 1;
911                version.apply_compact_task(&compact_task);
912                if purge_prost_table_stats(
913                    &mut version_stats.table_stats,
914                    version.latest_version(),
915                    &HashSet::default(),
916                ) {
917                    self.metrics.version_stats.reset();
918                    versioning.local_metrics.clear();
919                }
920                add_prost_table_stats_map(&mut version_stats.table_stats, &task.table_stats_change);
921                trigger_local_table_stat(
922                    &self.metrics,
923                    &mut versioning.local_metrics,
924                    &version_stats,
925                    &task.table_stats_change,
926                );
927            }
928            task_status = compact_task.task_status;
929            report_results.push(CompactionTaskReportResult {
930                task_id,
931                task_status,
932                reported: rets[idx],
933            });
934            tasks.push(compact_task);
935        }
936        if success_count > 0 {
937            commit_multi_var!(
938                self.meta_store_ref(),
939                compact_statuses,
940                compact_task_assignment,
941                version,
942                version_stats
943            )?;
944
945            self.metrics
946                .compact_task_batch_count
947                .with_label_values(&["batch_report_task"])
948                .observe(success_count as f64);
949        } else {
950            // The compaction task is cancelled or failed.
951            commit_multi_var!(
952                self.meta_store_ref(),
953                compact_statuses,
954                compact_task_assignment
955            )?;
956        }
957
958        self.notify_compaction_task_report_waiters(report_results);
959
960        let mut success_groups = vec![];
961        for compact_task in &tasks {
962            self.compactor_manager
963                .remove_task_heartbeat(compact_task.task_id);
964            tracing::trace!(
965                "Reported compaction task. {}. cost time: {:?}",
966                compact_task_to_string(compact_task),
967                start_time.elapsed(),
968            );
969
970            if !deterministic_mode
971                && (matches!(compact_task.task_type, compact_task::TaskType::Dynamic)
972                    || matches!(compact_task.task_type, compact_task::TaskType::Emergency))
973            {
974                // only try send Dynamic compaction
975                self.try_send_compaction_request(
976                    compact_task.compaction_group_id,
977                    compact_task::TaskType::Dynamic,
978                );
979            }
980
981            if compact_task.task_status == TaskStatus::Success {
982                success_groups.push(compact_task.compaction_group_id);
983            }
984        }
985
986        trigger_compact_tasks_stat(
987            &self.metrics,
988            &tasks,
989            &compaction.compaction_statuses,
990            &versioning_guard.current_version,
991        );
992        drop(versioning_guard);
993        if !success_groups.is_empty() {
994            self.try_update_write_limits(&success_groups).await;
995        }
996        Ok(rets)
997    }
998
999    /// Triggers compacitons to specified compaction groups.
1000    /// Don't wait for compaction finish
1001    pub async fn trigger_compaction_deterministic(
1002        &self,
1003        _base_version_id: HummockVersionId,
1004        compaction_groups: Vec<CompactionGroupId>,
1005    ) -> Result<()> {
1006        self.on_current_version(|old_version| {
1007            tracing::info!(
1008                "Trigger compaction for version {}, groups {:?}",
1009                old_version.id,
1010                compaction_groups
1011            );
1012        })
1013        .await;
1014
1015        if compaction_groups.is_empty() {
1016            return Ok(());
1017        }
1018        for compaction_group in compaction_groups {
1019            self.try_send_compaction_request(compaction_group, compact_task::TaskType::Dynamic);
1020        }
1021        Ok(())
1022    }
1023
1024    pub async fn trigger_manual_compaction(
1025        &self,
1026        compaction_group: CompactionGroupId,
1027        manual_compaction_option: ManualCompactionOption,
1028    ) -> Result<ManualCompactionTriggerResult> {
1029        let start_time = Instant::now();
1030        let exclusive = manual_compaction_option.exclusive;
1031
1032        // 1. Get idle compactor.
1033        let compactor = match self.compactor_manager.next_compactor() {
1034            Some(compactor) => compactor,
1035            None => {
1036                tracing::warn!("trigger_manual_compaction No compactor is available.");
1037                return Err(anyhow::anyhow!(
1038                    "trigger_manual_compaction No compactor is available. compaction_group {}",
1039                    compaction_group
1040                )
1041                .into());
1042            }
1043        };
1044
1045        // 2. Get manual compaction task.
1046        let compact_task = self
1047            .manual_get_compact_task_with_info(compaction_group, manual_compaction_option)
1048            .await;
1049        let (compact_task, blocked_by_pending) = match compact_task {
1050            Ok((compact_task, blocked_by_pending)) => (compact_task, blocked_by_pending),
1051            Err(err) => {
1052                tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
1053                if matches!(err, Error::InvalidManualCompactionOption(_)) {
1054                    return Err(err);
1055                }
1056
1057                return Err(anyhow::anyhow!(err)
1058                    .context(format!(
1059                        "Failed to get compaction task for compaction_group {}",
1060                        compaction_group,
1061                    ))
1062                    .into());
1063            }
1064        };
1065        let compact_task = match compact_task {
1066            Some(compact_task) => compact_task,
1067            None => {
1068                if exclusive && blocked_by_pending {
1069                    return Ok(ManualCompactionTriggerResult::Retry);
1070                }
1071                // No compaction task available.
1072                return Err(anyhow::anyhow!(
1073                    "trigger_manual_compaction No compaction_task is available. compaction_group {}",
1074                    compaction_group
1075                )
1076                .into());
1077            }
1078        };
1079
1080        // 3. send task to compactor
1081        let task_id = compact_task.task_id;
1082        let compact_task_string = compact_task_to_string(&compact_task);
1083        tracing::info!(
1084            compact_task_string,
1085            duration = ?start_time.elapsed(),
1086            "Triggered manual compaction task."
1087        );
1088
1089        let report_rx = self.register_compaction_task_report_waiter(task_id);
1090        if let Err(err) = compactor
1091            .send_event(ResponseEvent::CompactTask(compact_task.into()))
1092            .with_context(|| {
1093                format!(
1094                    "Failed to trigger compaction task for compaction_group {}",
1095                    compaction_group,
1096                )
1097            })
1098        {
1099            self.remove_compaction_task_report_waiter(task_id);
1100            return Err(err.into());
1101        }
1102
1103        let report_result = match report_rx.await {
1104            Ok(result) => result,
1105            Err(_) => {
1106                self.remove_compaction_task_report_waiter(task_id);
1107                return Err(anyhow::anyhow!(
1108                    "trigger_manual_compaction wait report failed. compaction_group {}",
1109                    compaction_group
1110                )
1111                .into());
1112            }
1113        };
1114        if !report_result.reported {
1115            return Err(anyhow::anyhow!(
1116                "trigger_manual_compaction report not accepted. task_id {}",
1117                report_result.task_id
1118            )
1119            .into());
1120        }
1121
1122        if report_result.task_status == TaskStatus::NoAvailCpuResourceCanceled
1123            || report_result.task_status == TaskStatus::NoAvailMemoryResourceCanceled
1124        {
1125            return Ok(ManualCompactionTriggerResult::Retry);
1126        }
1127
1128        tracing::info!(
1129            ?report_result,
1130            duration = ?start_time.elapsed(),
1131            "Completed manual compaction task."
1132        );
1133
1134        Ok(ManualCompactionTriggerResult::Submitted)
1135    }
1136
1137    /// Sends a compaction request for new data (clears cooldown).
1138    pub fn try_send_compaction_request(
1139        &self,
1140        compaction_group: CompactionGroupId,
1141        task_type: compact_task::TaskType,
1142    ) -> bool {
1143        self.compaction_state.try_sched_compaction(
1144            compaction_group,
1145            task_type,
1146            ScheduleTrigger::NewData,
1147        )
1148    }
1149
1150    /// Apply `split_weight_by_vnode` based partition strategy.
1151    /// This handles dynamic partitioning based on table size and write throughput.
1152    fn apply_split_weight_by_vnode_partition(
1153        &self,
1154        compact_task: &mut CompactTask,
1155        compaction_config: &CompactionConfig,
1156        compact_table_ids: &[TableId],
1157    ) {
1158        if compaction_config.split_weight_by_vnode > 0 {
1159            for table_id in compact_table_ids {
1160                compact_task
1161                    .table_vnode_partition
1162                    .insert(*table_id, compact_task.split_weight_by_vnode);
1163            }
1164
1165            return;
1166        }
1167
1168        // Calculate per-table size from normalized input SSTs.
1169        let mut table_size_info: HashMap<TableId, u64> = HashMap::default();
1170        for input_ssts in &compact_task.input_ssts {
1171            for sst in &input_ssts.table_infos {
1172                for table_id in &sst.table_ids {
1173                    *table_size_info.entry(*table_id).or_default() +=
1174                        sst.sst_size / (sst.table_ids.len() as u64);
1175                }
1176            }
1177        }
1178
1179        let hybrid_vnode_count = self.env.opts.hybrid_partition_node_count;
1180        let default_partition_count = self.env.opts.partition_vnode_count;
1181        let compact_task_table_size_partition_threshold_low = self
1182            .env
1183            .opts
1184            .compact_task_table_size_partition_threshold_low;
1185        let compact_task_table_size_partition_threshold_high = self
1186            .env
1187            .opts
1188            .compact_task_table_size_partition_threshold_high;
1189
1190        // Check latest write throughput
1191        let table_write_throughput_statistic_manager =
1192            self.table_write_throughput_statistic_manager.read();
1193        let timestamp = chrono::Utc::now().timestamp();
1194
1195        for (table_id, compact_table_size) in table_size_info {
1196            let write_throughput = table_write_throughput_statistic_manager
1197                .get_table_throughput_descending(table_id, timestamp)
1198                .peekable()
1199                .peek()
1200                .map(|item| item.throughput)
1201                .unwrap_or(0);
1202
1203            if compact_table_size > compact_task_table_size_partition_threshold_high
1204                && default_partition_count > 0
1205            {
1206                compact_task
1207                    .table_vnode_partition
1208                    .insert(table_id, default_partition_count);
1209            } else if (compact_table_size > compact_task_table_size_partition_threshold_low
1210                || (write_throughput > self.env.opts.table_high_write_throughput_threshold
1211                    && compact_table_size > compaction_config.target_file_size_base))
1212                && hybrid_vnode_count > 0
1213            {
1214                compact_task
1215                    .table_vnode_partition
1216                    .insert(table_id, hybrid_vnode_count);
1217            } else if compact_table_size > compaction_config.target_file_size_base {
1218                compact_task.table_vnode_partition.insert(table_id, 1);
1219            }
1220        }
1221
1222        compact_task
1223            .table_vnode_partition
1224            .retain(|table_id, _| compact_table_ids.contains(table_id));
1225    }
1226
1227    pub(crate) fn calculate_vnode_partition(
1228        &self,
1229        compact_task: &mut CompactTask,
1230        compaction_config: &CompactionConfig,
1231        compact_table_ids: &[TableId],
1232    ) {
1233        // Do not split sst by vnode partition when target_level > base_level
1234        // The purpose of data alignment is mainly to improve the parallelism of base level compaction
1235        // and reduce write amplification. However, at high level, the size of the sst file is often
1236        // larger and only contains the data of a single table_id, so there is no need to cut it.
1237        if compact_task.target_level > compact_task.base_level {
1238            return;
1239        }
1240
1241        // Apply split_weight_by_vnode based partition strategy
1242        self.apply_split_weight_by_vnode_partition(
1243            compact_task,
1244            compaction_config,
1245            compact_table_ids,
1246        );
1247    }
1248
1249    fn build_ready_compact_task(
1250        &self,
1251        picked_task: PickedCompactionTask,
1252        context: CompactTaskBuildContext,
1253        table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
1254        all_versioned_table_schemas: &HashMap<TableId, Vec<i32>>,
1255    ) -> BuiltCompactTask {
1256        let compaction_config = context.compaction_config.clone();
1257        let (mut compact_task, compact_table_ids) = build_base_compact_task(picked_task, context);
1258
1259        if compact_task.is_trivial_reclaim() {
1260            compact_task.task_status = TaskStatus::Success;
1261            compact_task.sorted_output_ssts.clear();
1262            return BuiltCompactTask::MetaFinished(compact_task);
1263        }
1264
1265        if compact_task.is_trivial_move_task() {
1266            compact_task.task_status = TaskStatus::Success;
1267            compact_task.sorted_output_ssts = compact_task.input_ssts[0]
1268                .read_sstable_infos()
1269                .cloned()
1270                .collect();
1271            return BuiltCompactTask::MetaFinished(compact_task);
1272        }
1273
1274        self.prepare_compact_task_for_assignment(
1275            &mut compact_task,
1276            compaction_config.as_ref(),
1277            &compact_table_ids,
1278            safe_epoch_table_watermarks_impl(table_watermarks, &compact_table_ids),
1279            all_versioned_table_schemas,
1280        );
1281
1282        BuiltCompactTask::PendingAssignment(compact_task)
1283    }
1284
1285    fn prepare_compact_task_for_assignment(
1286        &self,
1287        compact_task: &mut CompactTask,
1288        compaction_config: &CompactionConfig,
1289        compact_table_ids: &[TableId],
1290        table_watermarks: BTreeMap<TableId, TableWatermarks>,
1291        all_versioned_table_schemas: &HashMap<TableId, Vec<i32>>,
1292    ) {
1293        self.calculate_vnode_partition(compact_task, compaction_config, compact_table_ids);
1294        attach_compact_task_table_metadata(
1295            compact_task,
1296            compact_table_ids,
1297            table_watermarks,
1298            all_versioned_table_schemas,
1299        );
1300    }
1301
1302    pub fn compactor_manager_ref(&self) -> crate::hummock::CompactorManagerRef {
1303        self.compactor_manager.clone()
1304    }
1305
1306    fn register_compaction_task_report_waiter(
1307        &self,
1308        task_id: HummockCompactionTaskId,
1309    ) -> Receiver<CompactionTaskReportResult> {
1310        let (tx, rx) = tokio::sync::oneshot::channel();
1311        self.compaction_task_report_notifiers
1312            .lock()
1313            .register(task_id, tx);
1314        rx
1315    }
1316
1317    fn remove_compaction_task_report_waiter(&self, task_id: HummockCompactionTaskId) {
1318        self.compaction_task_report_notifiers.lock().remove(task_id);
1319    }
1320
1321    fn notify_compaction_task_report_waiters(&self, results: Vec<CompactionTaskReportResult>) {
1322        let mut guard = self.compaction_task_report_notifiers.lock();
1323        for result in results {
1324            guard.notify(result);
1325        }
1326    }
1327}
1328
1329#[cfg(any(test, feature = "test"))]
1330impl HummockManager {
1331    pub async fn compaction_task_from_assignment_for_test(
1332        &self,
1333        task_id: u64,
1334    ) -> Option<CompactTaskAssignment> {
1335        let compaction_guard = self.compaction.read().await;
1336        let assignment_ref = &compaction_guard.compact_task_assignment;
1337        assignment_ref.get(&task_id).cloned()
1338    }
1339
1340    pub async fn report_compact_task_for_test(
1341        &self,
1342        task_id: u64,
1343        compact_task: Option<CompactTask>,
1344        task_status: TaskStatus,
1345        sorted_output_ssts: Vec<SstableInfo>,
1346        table_stats_change: Option<PbTableStatsMap>,
1347    ) -> Result<()> {
1348        if let Some(task) = compact_task {
1349            let mut guard = self.compaction.write().await;
1350            guard.compact_task_assignment.insert(
1351                task_id,
1352                CompactTaskAssignment {
1353                    compact_task: Some(task.into()),
1354                    context_id: 0.into(),
1355                },
1356            );
1357        }
1358
1359        // In the test, the contents of the compact task may have been modified directly, while the contents of compact_task_assignment were not modified.
1360        // So we pass the modified compact_task directly into the `report_compact_task_impl`
1361        self.report_compact_tasks(vec![ReportTask {
1362            task_id,
1363            task_status,
1364            sorted_output_ssts,
1365            table_stats_change: table_stats_change.unwrap_or_default(),
1366            object_timestamps: HashMap::default(),
1367        }])
1368        .await?;
1369        Ok(())
1370    }
1371}
1372
1373/// What triggered the compaction schedule request.
1374#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1375pub enum ScheduleTrigger {
1376    /// New data arrived (e.g., `commit_epoch`). Clears cooldown.
1377    NewData,
1378    /// Periodic timer. Respects cooldown for Dynamic type.
1379    Periodic,
1380}
1381
1382/// A point-in-time snapshot of the compaction schedule state.
1383///
1384/// `snapshot_time` is used by `unschedule()` to detect whether new data arrived
1385/// after the snapshot was taken, preventing incorrect cooldown.
1386pub struct CompactionScheduleSnapshot {
1387    scheduled: HashSet<(CompactionGroupId, compact_task::TaskType)>,
1388    snapshot_time: Instant,
1389}
1390
1391impl CompactionScheduleSnapshot {
1392    /// Task type priority order for scheduling (checked first = higher priority).
1393    const TASK_TYPE_PRIORITY: &[TaskType] = &[
1394        TaskType::Dynamic,
1395        TaskType::SpaceReclaim,
1396        TaskType::Ttl,
1397        TaskType::Tombstone,
1398        TaskType::VnodeWatermark,
1399    ];
1400
1401    pub fn snapshot_time(&self) -> Instant {
1402        self.snapshot_time
1403    }
1404
1405    /// Pick compaction groups and task type from this snapshot.
1406    ///
1407    /// Returns groups in shuffled order. Non-Dynamic types have higher priority
1408    /// and return a single group; Dynamic groups are batched together.
1409    pub fn pick_compaction_groups_and_type(&self) -> Option<(Vec<CompactionGroupId>, TaskType)> {
1410        let group_ids = self.group_ids_shuffled();
1411        let mut normal_groups = vec![];
1412        for cg_id in group_ids {
1413            if let Some(pick_type) = self.pick_type(cg_id) {
1414                if pick_type == TaskType::Dynamic {
1415                    normal_groups.push(cg_id);
1416                } else if normal_groups.is_empty() {
1417                    return Some((vec![cg_id], pick_type));
1418                }
1419            }
1420        }
1421        if normal_groups.is_empty() {
1422            None
1423        } else {
1424            Some((normal_groups, TaskType::Dynamic))
1425        }
1426    }
1427
1428    fn group_ids_shuffled(&self) -> Vec<CompactionGroupId> {
1429        let mut group_ids: Vec<_> = self.scheduled.iter().map(|(g, _)| *g).unique().collect();
1430        group_ids.shuffle(&mut thread_rng());
1431        group_ids
1432    }
1433
1434    fn pick_type(&self, group: CompactionGroupId) -> Option<TaskType> {
1435        Self::TASK_TYPE_PRIORITY
1436            .iter()
1437            .find(|t| self.scheduled.contains(&(group, **t)))
1438            .copied()
1439    }
1440}
1441
1442/// Tracks which (`compaction_group`, `task_type`) pairs are scheduled for compaction.
1443///
1444/// For `Dynamic` type, includes a cooldown mechanism: groups with no compaction work
1445/// are skipped by periodic triggers until new data arrives via `commit_epoch`.
1446#[derive(Debug, Default)]
1447pub struct CompactionState {
1448    inner: Mutex<CompactionStateInner>,
1449}
1450
1451#[derive(Debug, Default)]
1452struct CompactionStateInner {
1453    scheduled: HashSet<(CompactionGroupId, compact_task::TaskType)>,
1454    /// Groups skipped by periodic Dynamic trigger until new data arrives.
1455    dynamic_cooldown: HashSet<CompactionGroupId>,
1456    /// Tracks new-data arrival time per group for cooldown race detection.
1457    last_new_data_time: HashMap<CompactionGroupId, Instant>,
1458}
1459
1460impl CompactionState {
1461    pub fn new() -> Self {
1462        Self {
1463            inner: Default::default(),
1464        }
1465    }
1466
1467    /// Enqueues a compaction request. Returns `true` if newly scheduled.
1468    ///
1469    /// `trigger` only affects `Dynamic` type — see [`ScheduleTrigger`].
1470    pub fn try_sched_compaction(
1471        &self,
1472        compaction_group: CompactionGroupId,
1473        task_type: TaskType,
1474        trigger: ScheduleTrigger,
1475    ) -> bool {
1476        let mut guard = self.inner.lock();
1477        if task_type == TaskType::Dynamic {
1478            match trigger {
1479                ScheduleTrigger::NewData => {
1480                    guard.dynamic_cooldown.remove(&compaction_group);
1481                    guard
1482                        .last_new_data_time
1483                        .insert(compaction_group, Instant::now());
1484                }
1485                ScheduleTrigger::Periodic => {
1486                    if guard.dynamic_cooldown.contains(&compaction_group) {
1487                        return false;
1488                    }
1489                }
1490            }
1491        }
1492        guard.scheduled.insert((compaction_group, task_type))
1493    }
1494
1495    /// Removes a scheduled entry. For Dynamic type, adds to cooldown unless
1496    /// new data arrived after `snapshot_time`.
1497    pub fn unschedule(
1498        &self,
1499        compaction_group: CompactionGroupId,
1500        task_type: compact_task::TaskType,
1501        snapshot_time: Instant,
1502    ) {
1503        let mut guard = self.inner.lock();
1504        guard.scheduled.remove(&(compaction_group, task_type));
1505        if task_type == TaskType::Dynamic {
1506            let has_new_data = guard
1507                .last_new_data_time
1508                .get(&compaction_group)
1509                .is_some_and(|t| *t > snapshot_time);
1510            if !has_new_data {
1511                guard.dynamic_cooldown.insert(compaction_group);
1512            }
1513        }
1514    }
1515
1516    /// Takes a snapshot of the current schedule state.
1517    pub fn snapshot(&self) -> CompactionScheduleSnapshot {
1518        let guard = self.inner.lock();
1519        // Record time after lock to ensure accurate ordering vs. try_sched_compaction
1520        let snapshot_time = Instant::now();
1521        CompactionScheduleSnapshot {
1522            scheduled: guard.scheduled.clone(),
1523            snapshot_time,
1524        }
1525    }
1526
1527    /// Removes all schedule state for a deleted or merged group.
1528    pub fn remove_compaction_group(&self, compaction_group: CompactionGroupId) {
1529        let mut guard = self.inner.lock();
1530        guard
1531            .scheduled
1532            .retain(|(group, _)| *group != compaction_group);
1533        guard.dynamic_cooldown.remove(&compaction_group);
1534        guard.last_new_data_time.remove(&compaction_group);
1535    }
1536}
1537
1538impl Compaction {
1539    pub fn get_compact_task_assignments_by_group_id(
1540        &self,
1541        compaction_group_id: CompactionGroupId,
1542    ) -> Vec<CompactTaskAssignment> {
1543        self.compact_task_assignment
1544            .iter()
1545            .filter_map(|(_, assignment)| {
1546                if assignment
1547                    .compact_task
1548                    .as_ref()
1549                    .is_some_and(|task| task.compaction_group_id == compaction_group_id)
1550                {
1551                    Some(CompactTaskAssignment {
1552                        compact_task: assignment.compact_task.clone(),
1553                        context_id: assignment.context_id,
1554                    })
1555                } else {
1556                    None
1557                }
1558            })
1559            .collect()
1560    }
1561}
1562
1563#[derive(Clone, Default)]
1564pub struct CompactionGroupStatistic {
1565    pub group_id: CompactionGroupId,
1566    pub group_size: u64,
1567    pub table_statistic: BTreeMap<StateTableId, u64>,
1568    pub compaction_group_config: CompactionGroup,
1569}
1570
1571/// Updates table stats caused by vnode watermark trivial reclaim compaction.
1572fn update_table_stats_for_vnode_watermark_trivial_reclaim(
1573    table_stats: &mut PbTableStatsMap,
1574    task: &CompactTask,
1575) {
1576    if task.task_type != TaskType::VnodeWatermark {
1577        return;
1578    }
1579    let mut deleted_table_keys: HashMap<TableId, u64> = HashMap::default();
1580    for s in task.input_ssts.iter().flat_map(|l| l.table_infos.iter()) {
1581        assert_eq!(s.table_ids.len(), 1);
1582        let e = deleted_table_keys.entry(s.table_ids[0]).or_insert(0);
1583        *e += s.total_key_count;
1584    }
1585    for (table_id, delete_count) in deleted_table_keys {
1586        let Some(stats) = table_stats.get_mut(&table_id) else {
1587            continue;
1588        };
1589        if stats.total_key_count == 0 {
1590            continue;
1591        }
1592        let new_total_key_count = stats.total_key_count.saturating_sub(delete_count as i64);
1593        let ratio = new_total_key_count as f64 / stats.total_key_count as f64;
1594        // total_key_count is updated accurately.
1595        stats.total_key_count = new_total_key_count;
1596        // others are updated approximately.
1597        stats.total_key_size = (stats.total_key_size as f64 * ratio).ceil() as i64;
1598        stats.total_value_size = (stats.total_value_size as f64 * ratio).ceil() as i64;
1599    }
1600}
1601
1602#[derive(Debug, Clone)]
1603pub enum GroupState {
1604    /// The compaction group is not in emergency state.
1605    Normal,
1606
1607    /// The compaction group is in emergency state.
1608    Emergency(String), // reason
1609
1610    /// The compaction group is in write stop state.
1611    WriteStop(String), // reason
1612}
1613
1614impl GroupState {
1615    pub fn is_write_stop(&self) -> bool {
1616        matches!(self, Self::WriteStop(_))
1617    }
1618
1619    pub fn is_emergency(&self) -> bool {
1620        matches!(self, Self::Emergency(_))
1621    }
1622
1623    pub fn reason(&self) -> Option<&str> {
1624        match self {
1625            Self::Emergency(reason) | Self::WriteStop(reason) => Some(reason),
1626            _ => None,
1627        }
1628    }
1629}
1630
1631#[derive(Clone, Default)]
1632pub struct GroupStateValidator;
1633
1634impl GroupStateValidator {
1635    pub fn write_stop_sub_level_count(
1636        level_count: usize,
1637        compaction_config: &CompactionConfig,
1638    ) -> bool {
1639        let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize;
1640        level_count > threshold
1641    }
1642
1643    pub fn write_stop_l0_size(l0_size: u64, compaction_config: &CompactionConfig) -> bool {
1644        l0_size
1645            > compaction_config
1646                .level0_stop_write_threshold_max_size
1647                .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1648    }
1649
1650    pub fn write_stop_l0_file_count(
1651        l0_file_count: usize,
1652        compaction_config: &CompactionConfig,
1653    ) -> bool {
1654        l0_file_count
1655            > compaction_config
1656                .level0_stop_write_threshold_max_sst_count
1657                .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1658                as usize
1659    }
1660
1661    pub fn emergency_l0_file_count(
1662        l0_file_count: usize,
1663        compaction_config: &CompactionConfig,
1664    ) -> bool {
1665        l0_file_count
1666            > compaction_config
1667                .emergency_level0_sst_file_count
1668                .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1669                as usize
1670    }
1671
1672    pub fn emergency_l0_partition_count(
1673        last_l0_sub_level_partition_count: usize,
1674        compaction_config: &CompactionConfig,
1675    ) -> bool {
1676        last_l0_sub_level_partition_count
1677            > compaction_config
1678                .emergency_level0_sub_level_partition
1679                .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1680                as usize
1681    }
1682
1683    pub fn check_single_group_write_stop(
1684        levels: &Levels,
1685        compaction_config: &CompactionConfig,
1686    ) -> GroupState {
1687        if Self::write_stop_sub_level_count(levels.l0.sub_levels.len(), compaction_config) {
1688            return GroupState::WriteStop(format!(
1689                "WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels",
1690                levels.l0.sub_levels.len(),
1691                compaction_config.level0_stop_write_threshold_sub_level_number
1692            ));
1693        }
1694
1695        if Self::write_stop_l0_file_count(
1696            levels
1697                .l0
1698                .sub_levels
1699                .iter()
1700                .map(|l| l.table_infos.len())
1701                .sum(),
1702            compaction_config,
1703        ) {
1704            return GroupState::WriteStop(format!(
1705                "WriteStop(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1706                levels
1707                    .l0
1708                    .sub_levels
1709                    .iter()
1710                    .map(|l| l.table_infos.len())
1711                    .sum::<usize>(),
1712                compaction_config
1713                    .level0_stop_write_threshold_max_sst_count
1714                    .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1715            ));
1716        }
1717
1718        if Self::write_stop_l0_size(levels.l0.total_file_size, compaction_config) {
1719            return GroupState::WriteStop(format!(
1720                "WriteStop(l0_size: {}, threshold: {}) too large L0 size",
1721                levels.l0.total_file_size,
1722                compaction_config
1723                    .level0_stop_write_threshold_max_size
1724                    .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1725            ));
1726        }
1727
1728        GroupState::Normal
1729    }
1730
1731    pub fn check_single_group_emergency(
1732        levels: &Levels,
1733        compaction_config: &CompactionConfig,
1734    ) -> GroupState {
1735        if Self::emergency_l0_file_count(
1736            levels
1737                .l0
1738                .sub_levels
1739                .iter()
1740                .map(|l| l.table_infos.len())
1741                .sum(),
1742            compaction_config,
1743        ) {
1744            return GroupState::Emergency(format!(
1745                "Emergency(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1746                levels
1747                    .l0
1748                    .sub_levels
1749                    .iter()
1750                    .map(|l| l.table_infos.len())
1751                    .sum::<usize>(),
1752                compaction_config
1753                    .emergency_level0_sst_file_count
1754                    .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1755            ));
1756        }
1757
1758        if Self::emergency_l0_partition_count(
1759            levels
1760                .l0
1761                .sub_levels
1762                .first()
1763                .map(|l| l.table_infos.len())
1764                .unwrap_or(0),
1765            compaction_config,
1766        ) {
1767            return GroupState::Emergency(format!(
1768                "Emergency(l0_partition_count: {}, threshold: {}) too many L0 partitions",
1769                levels
1770                    .l0
1771                    .sub_levels
1772                    .first()
1773                    .map(|l| l.table_infos.len())
1774                    .unwrap_or(0),
1775                compaction_config
1776                    .emergency_level0_sub_level_partition
1777                    .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1778            ));
1779        }
1780
1781        GroupState::Normal
1782    }
1783
1784    pub fn group_state(levels: &Levels, compaction_config: &CompactionConfig) -> GroupState {
1785        let state = Self::check_single_group_write_stop(levels, compaction_config);
1786        if state.is_write_stop() {
1787            return state;
1788        }
1789
1790        Self::check_single_group_emergency(levels, compaction_config)
1791    }
1792}
1793
1794#[cfg(test)]
1795mod prefetched_task_id_tests {
1796    use crate::manager::MetaOpts;
1797
1798    #[test]
1799    fn test_compaction_task_id_refill_capacity_default() {
1800        assert_eq!(MetaOpts::test(false).compaction_task_id_refill_capacity, 64);
1801    }
1802}
1803
1804#[cfg(test)]
1805mod compaction_state_tests {
1806    use risingwave_pb::hummock::compact_task::TaskType;
1807
1808    use super::*;
1809
1810    #[test]
1811    fn test_basic_schedule_and_unschedule() {
1812        let state = CompactionState::new();
1813        let group_id: CompactionGroupId = 1.into();
1814
1815        // First schedule should succeed
1816        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1817        // Duplicate schedule should fail
1818        assert!(!state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1819        // Different task type should succeed
1820        assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1821
1822        // Snapshot should contain both
1823        let snapshot = state.snapshot();
1824        assert!(snapshot.scheduled.contains(&(group_id, TaskType::Dynamic)));
1825        assert!(snapshot.scheduled.contains(&(group_id, TaskType::Ttl)));
1826
1827        // Unschedule removes from scheduled set
1828        state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1829        let snapshot2 = state.snapshot();
1830        assert!(!snapshot2.scheduled.contains(&(group_id, TaskType::Dynamic)));
1831        assert!(snapshot2.scheduled.contains(&(group_id, TaskType::Ttl)));
1832    }
1833
1834    #[test]
1835    fn test_cooldown_blocks_periodic_trigger() {
1836        let state = CompactionState::new();
1837        let group_id: CompactionGroupId = 1.into();
1838
1839        // Schedule then unschedule - should add to cooldown
1840        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1841        let snapshot = state.snapshot();
1842        state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1843
1844        // Verify in cooldown
1845        assert!(state.inner.lock().dynamic_cooldown.contains(&group_id));
1846
1847        // Periodic trigger should be blocked
1848        assert!(!state.try_sched_compaction(
1849            group_id,
1850            TaskType::Dynamic,
1851            ScheduleTrigger::Periodic
1852        ));
1853    }
1854
1855    #[test]
1856    fn test_new_data_clears_cooldown() {
1857        let state = CompactionState::new();
1858        let group_id: CompactionGroupId = 1.into();
1859
1860        // Put group in cooldown
1861        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1862        let snapshot = state.snapshot();
1863        state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1864        assert!(state.inner.lock().dynamic_cooldown.contains(&group_id));
1865
1866        // NewData trigger should clear cooldown and schedule
1867        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1868        assert!(!state.inner.lock().dynamic_cooldown.contains(&group_id));
1869    }
1870
1871    #[test]
1872    fn test_cooldown_only_affects_dynamic_type() {
1873        let state = CompactionState::new();
1874        let group_id: CompactionGroupId = 1.into();
1875
1876        // Put group in cooldown for Dynamic
1877        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1878        let snapshot = state.snapshot();
1879        state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1880
1881        // Ttl unschedule should NOT add to cooldown
1882        let group_id_2: CompactionGroupId = 2.into();
1883        assert!(state.try_sched_compaction(group_id_2, TaskType::Ttl, ScheduleTrigger::Periodic));
1884        let snapshot2 = state.snapshot();
1885        state.unschedule(group_id_2, TaskType::Ttl, snapshot2.snapshot_time());
1886        assert!(!state.inner.lock().dynamic_cooldown.contains(&group_id_2));
1887
1888        // Other task types should work regardless of cooldown
1889        assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1890        assert!(state.try_sched_compaction(
1891            group_id,
1892            TaskType::SpaceReclaim,
1893            ScheduleTrigger::Periodic
1894        ));
1895    }
1896
1897    #[test]
1898    fn test_race_condition_new_data_after_snapshot() {
1899        let state = CompactionState::new();
1900        let group_id: CompactionGroupId = 1.into();
1901
1902        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1903        let snapshot = state.snapshot();
1904
1905        // Simulate new data arriving AFTER snapshot
1906        {
1907            let mut guard = state.inner.lock();
1908            guard.last_new_data_time.insert(group_id, Instant::now());
1909        }
1910
1911        // Unschedule should NOT add to cooldown (new data arrived after snapshot)
1912        state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1913        assert!(
1914            !state.inner.lock().dynamic_cooldown.contains(&group_id),
1915            "Should skip cooldown when new data arrived after snapshot"
1916        );
1917    }
1918
1919    #[test]
1920    fn test_remove_compaction_group_cleans_all_state() {
1921        let state = CompactionState::new();
1922        let group_id: CompactionGroupId = 1.into();
1923
1924        // Set up state
1925        assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1926        assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1927        state.inner.lock().dynamic_cooldown.insert(group_id);
1928
1929        // Remove group
1930        state.remove_compaction_group(group_id);
1931
1932        // Verify all state cleaned up
1933        let guard = state.inner.lock();
1934        assert!(!guard.scheduled.contains(&(group_id, TaskType::Dynamic)));
1935        assert!(!guard.scheduled.contains(&(group_id, TaskType::Ttl)));
1936        assert!(!guard.dynamic_cooldown.contains(&group_id));
1937        assert!(!guard.last_new_data_time.contains_key(&group_id));
1938    }
1939
1940    #[test]
1941    fn test_snapshot_pick_type_priority() {
1942        let state = CompactionState::new();
1943        let group_id: CompactionGroupId = 1.into();
1944
1945        // Empty group returns None
1946        assert_eq!(state.snapshot().pick_type(group_id), None);
1947
1948        // Priority order: Dynamic > SpaceReclaim > Ttl > Tombstone > VnodeWatermark
1949        state.try_sched_compaction(
1950            group_id,
1951            TaskType::VnodeWatermark,
1952            ScheduleTrigger::Periodic,
1953        );
1954        assert_eq!(
1955            state.snapshot().pick_type(group_id),
1956            Some(TaskType::VnodeWatermark)
1957        );
1958
1959        state.try_sched_compaction(group_id, TaskType::Tombstone, ScheduleTrigger::Periodic);
1960        assert_eq!(
1961            state.snapshot().pick_type(group_id),
1962            Some(TaskType::Tombstone)
1963        );
1964
1965        state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic);
1966        assert_eq!(state.snapshot().pick_type(group_id), Some(TaskType::Ttl));
1967
1968        state.try_sched_compaction(group_id, TaskType::SpaceReclaim, ScheduleTrigger::Periodic);
1969        assert_eq!(
1970            state.snapshot().pick_type(group_id),
1971            Some(TaskType::SpaceReclaim)
1972        );
1973
1974        state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData);
1975        assert_eq!(
1976            state.snapshot().pick_type(group_id),
1977            Some(TaskType::Dynamic)
1978        );
1979    }
1980
1981    #[test]
1982    fn test_multiple_groups_independent_cooldown() {
1983        let state = CompactionState::new();
1984        let g1: CompactionGroupId = 1.into();
1985        let g2: CompactionGroupId = 2.into();
1986
1987        state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
1988        state.try_sched_compaction(g2, TaskType::Dynamic, ScheduleTrigger::NewData);
1989        let snapshot = state.snapshot();
1990
1991        // Only unschedule g1
1992        state.unschedule(g1, TaskType::Dynamic, snapshot.snapshot_time());
1993
1994        let guard = state.inner.lock();
1995        assert!(guard.dynamic_cooldown.contains(&g1));
1996        assert!(!guard.dynamic_cooldown.contains(&g2));
1997    }
1998
1999    #[test]
2000    fn test_pick_compaction_groups_empty() {
2001        let state = CompactionState::new();
2002        let snapshot = state.snapshot();
2003        // No scheduled groups → returns None
2004        assert!(snapshot.pick_compaction_groups_and_type().is_none());
2005    }
2006
2007    #[test]
2008    fn test_pick_compaction_groups_mixed_types() {
2009        let state = CompactionState::new();
2010        let g1: CompactionGroupId = 1.into();
2011        let g2: CompactionGroupId = 2.into();
2012        let g3: CompactionGroupId = 3.into();
2013
2014        // g1: Dynamic, g2: Ttl, g3: Dynamic
2015        state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
2016        state.try_sched_compaction(g2, TaskType::Ttl, ScheduleTrigger::Periodic);
2017        state.try_sched_compaction(g3, TaskType::Dynamic, ScheduleTrigger::NewData);
2018
2019        let snapshot = state.snapshot();
2020        let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
2021
2022        // Due to shuffle, either:
2023        // - Ttl group is encountered first → returns (vec![g2], Ttl)
2024        // - Dynamic group is encountered first → collects all Dynamic, skips Ttl
2025        //   → returns ([g1, g3] in some order, Dynamic)
2026        if task_type == TaskType::Dynamic {
2027            assert!(groups.contains(&g1));
2028            assert!(groups.contains(&g3));
2029            assert!(!groups.contains(&g2)); // Ttl group excluded from Dynamic result
2030        } else {
2031            assert_eq!(task_type, TaskType::Ttl);
2032            assert_eq!(groups, vec![g2]);
2033        }
2034    }
2035
2036    #[test]
2037    fn test_pick_compaction_groups_all_dynamic() {
2038        let state = CompactionState::new();
2039        let g1: CompactionGroupId = 1.into();
2040        let g2: CompactionGroupId = 2.into();
2041
2042        state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
2043        state.try_sched_compaction(g2, TaskType::Dynamic, ScheduleTrigger::NewData);
2044
2045        let snapshot = state.snapshot();
2046        let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
2047        assert_eq!(task_type, TaskType::Dynamic);
2048        assert!(groups.contains(&g1));
2049        assert!(groups.contains(&g2));
2050    }
2051
2052    #[test]
2053    fn test_pick_compaction_groups_single_non_dynamic() {
2054        let state = CompactionState::new();
2055        let g1: CompactionGroupId = 1.into();
2056
2057        state.try_sched_compaction(g1, TaskType::SpaceReclaim, ScheduleTrigger::Periodic);
2058
2059        let snapshot = state.snapshot();
2060        let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
2061        assert_eq!(task_type, TaskType::SpaceReclaim);
2062        assert_eq!(groups, vec![g1]);
2063    }
2064}