risingwave_meta/hummock/manager/compaction/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Copyright 2025 RisingWave Labs
16//
17// Licensed under the Apache License, Version 2.0 (the "License");
18// you may not use this file except in compliance with the License.
19// You may obtain a copy of the License at
20//
21//     http://www.apache.org/licenses/LICENSE-2.0
22//
23// Unless required by applicable law or agreed to in writing, software
24// distributed under the License is distributed on an "AS IS" BASIS,
25// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
26// See the License for the specific language governing permissions and
27// limitations under the License.
28
29use std::collections::{BTreeMap, HashMap, HashSet};
30use std::sync::{Arc, LazyLock};
31use std::time::Instant;
32
33use anyhow::Context;
34use compaction_event_loop::{
35    HummockCompactionEventDispatcher, HummockCompactionEventHandler, HummockCompactionEventLoop,
36    HummockCompactorDedicatedEventLoop,
37};
38use fail::fail_point;
39use itertools::Itertools;
40use parking_lot::Mutex;
41use rand::rng as thread_rng;
42use rand::seq::SliceRandom;
43use risingwave_common::catalog::TableId;
44use risingwave_common::config::meta::default::compaction_config;
45use risingwave_common::hash::VnodeCountCompat;
46use risingwave_common::util::epoch::Epoch;
47use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask};
48use risingwave_hummock_sdk::compaction_group::StateTableId;
49use risingwave_hummock_sdk::key_range::KeyRange;
50use risingwave_hummock_sdk::level::Levels;
51use risingwave_hummock_sdk::sstable_info::SstableInfo;
52use risingwave_hummock_sdk::table_stats::{
53    PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats,
54};
55use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
56use risingwave_hummock_sdk::version::{GroupDelta, IntraLevelDelta};
57use risingwave_hummock_sdk::{
58    CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockSstableId,
59    HummockSstableObjectId, HummockVersionId, compact_task_to_string, statistics_compact_task,
60};
61use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
62use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
63use risingwave_pb::hummock::{
64    CompactTaskAssignment, CompactionConfig, PbCompactStatus, PbCompactTaskAssignment,
65    SubscribeCompactionEventRequest, TableOption, TableSchema, compact_task,
66};
67use thiserror_ext::AsReport;
68use tokio::sync::RwLockWriteGuard;
69use tokio::sync::mpsc::UnboundedReceiver;
70use tokio::sync::mpsc::error::SendError;
71use tokio::sync::oneshot::Sender;
72use tokio::task::JoinHandle;
73use tonic::Streaming;
74use tracing::warn;
75
76use crate::hummock::compaction::selector::level_selector::PickerInfo;
77use crate::hummock::compaction::selector::{
78    DynamicLevelSelector, DynamicLevelSelectorCore, LocalSelectorStatistic, ManualCompactionOption,
79    ManualCompactionSelector, SpaceReclaimCompactionSelector, TombstoneCompactionSelector,
80    TtlCompactionSelector, VnodeWatermarkCompactionSelector,
81};
82use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig, CompactionSelector};
83use crate::hummock::error::{Error, Result};
84use crate::hummock::manager::transaction::{
85    HummockVersionStatsTransaction, HummockVersionTransaction,
86};
87use crate::hummock::manager::versioning::Versioning;
88use crate::hummock::metrics_utils::{
89    build_compact_task_level_type_metrics_label, trigger_compact_tasks_stat,
90    trigger_local_table_stat,
91};
92use crate::hummock::model::CompactionGroup;
93use crate::hummock::sequence::next_compaction_task_id;
94use crate::hummock::{HummockManager, commit_multi_var, start_measure_real_process_timer};
95use crate::manager::META_NODE_ID;
96use crate::model::BTreeMapTransaction;
97
98pub mod compaction_event_loop;
99pub mod compaction_group_manager;
100pub mod compaction_group_schedule;
101
102static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
103    [
104        TaskStatus::ManualCanceled,
105        TaskStatus::SendFailCanceled,
106        TaskStatus::AssignFailCanceled,
107        TaskStatus::HeartbeatCanceled,
108        TaskStatus::InvalidGroupCanceled,
109        TaskStatus::NoAvailMemoryResourceCanceled,
110        TaskStatus::NoAvailCpuResourceCanceled,
111        TaskStatus::HeartbeatProgressCanceled,
112    ]
113    .into_iter()
114    .collect()
115});
116
117type CompactionRequestChannelItem = (CompactionGroupId, compact_task::TaskType);
118
119fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> {
120    let mut compaction_selectors: HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> =
121        HashMap::default();
122    compaction_selectors.insert(
123        compact_task::TaskType::Dynamic,
124        Box::<DynamicLevelSelector>::default(),
125    );
126    compaction_selectors.insert(
127        compact_task::TaskType::SpaceReclaim,
128        Box::<SpaceReclaimCompactionSelector>::default(),
129    );
130    compaction_selectors.insert(
131        compact_task::TaskType::Ttl,
132        Box::<TtlCompactionSelector>::default(),
133    );
134    compaction_selectors.insert(
135        compact_task::TaskType::Tombstone,
136        Box::<TombstoneCompactionSelector>::default(),
137    );
138    compaction_selectors.insert(
139        compact_task::TaskType::VnodeWatermark,
140        Box::<VnodeWatermarkCompactionSelector>::default(),
141    );
142    compaction_selectors
143}
144
145impl HummockVersionTransaction<'_> {
146    fn apply_compact_task(&mut self, compact_task: &CompactTask) {
147        let mut version_delta = self.new_delta();
148        let trivial_move = compact_task.is_trivial_move_task();
149        version_delta.trivial_move = trivial_move;
150
151        let group_deltas = &mut version_delta
152            .group_deltas
153            .entry(compact_task.compaction_group_id)
154            .or_default()
155            .group_deltas;
156        let mut removed_table_ids_map: BTreeMap<u32, HashSet<HummockSstableId>> =
157            BTreeMap::default();
158
159        for level in &compact_task.input_ssts {
160            let level_idx = level.level_idx;
161
162            removed_table_ids_map
163                .entry(level_idx)
164                .or_default()
165                .extend(level.table_infos.iter().map(|sst| sst.sst_id));
166        }
167
168        for (level_idx, removed_table_ids) in removed_table_ids_map {
169            let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
170                level_idx,
171                0, // default
172                removed_table_ids,
173                vec![], // default
174                0,      // default
175                compact_task.compaction_group_version_id,
176            ));
177
178            group_deltas.push(group_delta);
179        }
180
181        let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
182            compact_task.target_level,
183            compact_task.target_sub_level_id,
184            HashSet::new(), // default
185            compact_task.sorted_output_ssts.clone(),
186            compact_task.split_weight_by_vnode,
187            compact_task.compaction_group_version_id,
188        ));
189
190        group_deltas.push(group_delta);
191        version_delta.pre_apply();
192    }
193}
194
195#[derive(Default)]
196pub struct Compaction {
197    /// Compaction task that is already assigned to a compactor
198    pub compact_task_assignment: BTreeMap<HummockCompactionTaskId, PbCompactTaskAssignment>,
199    /// `CompactStatus` of each compaction group
200    pub compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus>,
201
202    pub _deterministic_mode: bool,
203}
204
205impl HummockManager {
206    pub async fn get_assigned_compact_task_num(&self) -> u64 {
207        self.compaction.read().await.compact_task_assignment.len() as u64
208    }
209
210    pub async fn list_compaction_status(
211        &self,
212    ) -> (Vec<PbCompactStatus>, Vec<CompactTaskAssignment>) {
213        let compaction = self.compaction.read().await;
214        (
215            compaction.compaction_statuses.values().map_into().collect(),
216            compaction
217                .compact_task_assignment
218                .values()
219                .cloned()
220                .collect(),
221        )
222    }
223
224    pub async fn get_compaction_scores(
225        &self,
226        compaction_group_id: CompactionGroupId,
227    ) -> Vec<PickerInfo> {
228        let (status, levels, group) = {
229            let compaction = self.compaction.read().await;
230            let versioning = self.versioning.read().await;
231            let config_manager = self.compaction_group_manager.read().await;
232            match (
233                compaction.compaction_statuses.get(&compaction_group_id),
234                versioning.current_version.levels.get(&compaction_group_id),
235                config_manager.try_get_compaction_group_config(compaction_group_id),
236            ) {
237                (Some(cs), Some(v), Some(cf)) => (cs.to_owned(), v.to_owned(), cf),
238                _ => {
239                    return vec![];
240                }
241            }
242        };
243        let dynamic_level_core = DynamicLevelSelectorCore::new(
244            group.compaction_config,
245            Arc::new(CompactionDeveloperConfig::default()),
246        );
247        let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers);
248        ctx.score_levels
249    }
250}
251
252impl HummockManager {
253    pub fn compaction_event_loop(
254        hummock_manager: Arc<Self>,
255        compactor_streams_change_rx: UnboundedReceiver<(
256            HummockContextId,
257            Streaming<SubscribeCompactionEventRequest>,
258        )>,
259    ) -> Vec<(JoinHandle<()>, Sender<()>)> {
260        let mut join_handle_vec = Vec::default();
261
262        let hummock_compaction_event_handler =
263            HummockCompactionEventHandler::new(hummock_manager.clone());
264
265        let dedicated_event_loop = HummockCompactorDedicatedEventLoop::new(
266            hummock_manager.clone(),
267            hummock_compaction_event_handler.clone(),
268        );
269
270        let (dedicated_event_loop_join_handle, event_tx, shutdown_tx) = dedicated_event_loop.run();
271        join_handle_vec.push((dedicated_event_loop_join_handle, shutdown_tx));
272
273        let hummock_compaction_event_dispatcher = HummockCompactionEventDispatcher::new(
274            hummock_manager.env.opts.clone(),
275            hummock_compaction_event_handler,
276            Some(event_tx),
277        );
278
279        let event_loop = HummockCompactionEventLoop::new(
280            hummock_compaction_event_dispatcher,
281            hummock_manager.metrics.clone(),
282            compactor_streams_change_rx,
283        );
284
285        let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
286        join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
287
288        join_handle_vec
289    }
290
291    pub fn add_compactor_stream(
292        &self,
293        context_id: HummockContextId,
294        req_stream: Streaming<SubscribeCompactionEventRequest>,
295    ) {
296        self.compactor_streams_change_tx
297            .send((context_id, req_stream))
298            .unwrap();
299    }
300
301    pub async fn auto_pick_compaction_group_and_type(
302        &self,
303    ) -> Option<(CompactionGroupId, compact_task::TaskType)> {
304        let mut compaction_group_ids = self.compaction_group_ids().await;
305        compaction_group_ids.shuffle(&mut thread_rng());
306
307        for cg_id in compaction_group_ids {
308            if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) {
309                return Some((cg_id, pick_type));
310            }
311        }
312
313        None
314    }
315
316    /// This method will return all compaction group id in a random order and task type. If there are any group block by `write_limit`, it will return a single array with `TaskType::Emergency`.
317    /// If these groups get different task-type, it will return all group id with `TaskType::Dynamic` if the first group get `TaskType::Dynamic`, otherwise it will return the single group with other task type.
318    async fn auto_pick_compaction_groups_and_type(
319        &self,
320    ) -> (Vec<CompactionGroupId>, compact_task::TaskType) {
321        let mut compaction_group_ids = self.compaction_group_ids().await;
322        compaction_group_ids.shuffle(&mut thread_rng());
323
324        let mut normal_groups = vec![];
325        for cg_id in compaction_group_ids {
326            if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) {
327                if pick_type == TaskType::Dynamic {
328                    normal_groups.push(cg_id);
329                } else if normal_groups.is_empty() {
330                    return (vec![cg_id], pick_type);
331                }
332            }
333        }
334        (normal_groups, TaskType::Dynamic)
335    }
336}
337
338impl HummockManager {
339    pub async fn get_compact_tasks_impl(
340        &self,
341        compaction_groups: Vec<CompactionGroupId>,
342        max_select_count: usize,
343        selector: &mut Box<dyn CompactionSelector>,
344    ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
345        let deterministic_mode = self.env.opts.compaction_deterministic_test;
346
347        let mut compaction_guard = self.compaction.write().await;
348        let compaction: &mut Compaction = &mut compaction_guard;
349        let mut versioning_guard = self.versioning.write().await;
350        let versioning: &mut Versioning = &mut versioning_guard;
351
352        let _timer = start_measure_real_process_timer!(self, "get_compact_tasks_impl");
353
354        let start_time = Instant::now();
355        let mut compaction_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
356
357        let mut compact_task_assignment =
358            BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
359
360        let mut version = HummockVersionTransaction::new(
361            &mut versioning.current_version,
362            &mut versioning.hummock_version_deltas,
363            self.env.notification_manager(),
364            None,
365            &self.metrics,
366        );
367        // Apply stats changes.
368        let mut version_stats = HummockVersionStatsTransaction::new(
369            &mut versioning.version_stats,
370            self.env.notification_manager(),
371        );
372
373        if deterministic_mode {
374            version.disable_apply_to_txn();
375        }
376        let all_versioned_table_schemas = if self.env.opts.enable_dropped_column_reclaim {
377            self.metadata_manager
378                .catalog_controller
379                .get_versioned_table_schemas()
380                .await
381                .map_err(|e| Error::Internal(e.into()))?
382        } else {
383            HashMap::default()
384        };
385        let mut unschedule_groups = vec![];
386        let mut trivial_tasks = vec![];
387        let mut pick_tasks = vec![];
388        let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts(
389            &self.env.opts,
390        ));
391        'outside: for compaction_group_id in compaction_groups {
392            if pick_tasks.len() >= max_select_count {
393                break;
394            }
395
396            if !version
397                .latest_version()
398                .levels
399                .contains_key(&compaction_group_id)
400            {
401                continue;
402            }
403
404            // When the last table of a compaction group is deleted, the compaction group (and its
405            // config) is destroyed as well. Then a compaction task for this group may come later and
406            // cannot find its config.
407            let group_config = {
408                let config_manager = self.compaction_group_manager.read().await;
409
410                match config_manager.try_get_compaction_group_config(compaction_group_id) {
411                    Some(config) => config,
412                    None => continue,
413                }
414            };
415
416            // StoredIdGenerator already implements ids pre-allocation by ID_PREALLOCATE_INTERVAL.
417            let task_id = next_compaction_task_id(&self.env).await?;
418
419            if !compaction_statuses.contains_key(&compaction_group_id) {
420                // lazy initialize.
421                compaction_statuses.insert(
422                    compaction_group_id,
423                    CompactStatus::new(
424                        compaction_group_id,
425                        group_config.compaction_config.max_level,
426                    ),
427                );
428            }
429            let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap();
430
431            let can_trivial_move = matches!(selector.task_type(), TaskType::Dynamic)
432                || matches!(selector.task_type(), TaskType::Emergency);
433
434            let mut stats = LocalSelectorStatistic::default();
435            let member_table_ids: Vec<_> = version
436                .latest_version()
437                .state_table_info
438                .compaction_group_member_table_ids(compaction_group_id)
439                .iter()
440                .copied()
441                .collect();
442
443            let mut table_id_to_option: HashMap<TableId, _> = HashMap::default();
444
445            {
446                let guard = self.table_id_to_table_option.read();
447                for table_id in &member_table_ids {
448                    if let Some(opts) = guard.get(table_id) {
449                        table_id_to_option.insert(*table_id, *opts);
450                    }
451                }
452            }
453
454            while let Some(compact_task) = compact_status.get_compact_task(
455                version
456                    .latest_version()
457                    .get_compaction_group_levels(compaction_group_id),
458                version
459                    .latest_version()
460                    .state_table_info
461                    .compaction_group_member_table_ids(compaction_group_id),
462                task_id as HummockCompactionTaskId,
463                &group_config,
464                &mut stats,
465                selector,
466                &table_id_to_option,
467                developer_config.clone(),
468                &version.latest_version().table_watermarks,
469                &version.latest_version().state_table_info,
470            ) {
471                let target_level_id = compact_task.input.target_level as u32;
472                let compaction_group_version_id = version
473                    .latest_version()
474                    .get_compaction_group_levels(compaction_group_id)
475                    .compaction_group_version_id;
476                let compression_algorithm = match compact_task.compression_algorithm.as_str() {
477                    "Lz4" => 1,
478                    "Zstd" => 2,
479                    _ => 0,
480                };
481                let vnode_partition_count = compact_task.input.vnode_partition_count;
482                let mut compact_task = CompactTask {
483                    input_ssts: compact_task.input.input_levels,
484                    splits: vec![KeyRange::inf()],
485                    sorted_output_ssts: vec![],
486                    task_id,
487                    target_level: target_level_id,
488                    // only gc delete keys in last level because there may be older version in more bottom
489                    // level.
490                    gc_delete_keys: version
491                        .latest_version()
492                        .get_compaction_group_levels(compaction_group_id)
493                        .is_last_level(target_level_id),
494                    base_level: compact_task.base_level as u32,
495                    task_status: TaskStatus::Pending,
496                    compaction_group_id: group_config.group_id,
497                    compaction_group_version_id,
498                    existing_table_ids: member_table_ids.clone(),
499                    compression_algorithm,
500                    target_file_size: compact_task.target_file_size,
501                    table_options: table_id_to_option
502                        .iter()
503                        .map(|(table_id, table_option)| {
504                            (*table_id, TableOption::from(table_option))
505                        })
506                        .collect(),
507                    current_epoch_time: Epoch::now().0,
508                    compaction_filter_mask: group_config.compaction_config.compaction_filter_mask,
509                    target_sub_level_id: compact_task.input.target_sub_level_id,
510                    task_type: compact_task.compaction_task_type,
511                    split_weight_by_vnode: vnode_partition_count,
512                    max_sub_compaction: group_config.compaction_config.max_sub_compaction,
513                    max_kv_count_for_xor16: group_config.compaction_config.max_kv_count_for_xor16,
514                    ..Default::default()
515                };
516
517                let is_trivial_reclaim = compact_task.is_trivial_reclaim();
518                let is_trivial_move = compact_task.is_trivial_move_task();
519                if is_trivial_reclaim || (is_trivial_move && can_trivial_move) {
520                    let log_label = if is_trivial_reclaim {
521                        "TrivialReclaim"
522                    } else {
523                        "TrivialMove"
524                    };
525                    let label = if is_trivial_reclaim {
526                        "trivial-space-reclaim"
527                    } else {
528                        "trivial-move"
529                    };
530
531                    tracing::debug!(
532                        "{} for compaction group {}: input: {:?}, cost time: {:?}",
533                        log_label,
534                        compact_task.compaction_group_id,
535                        compact_task.input_ssts,
536                        start_time.elapsed()
537                    );
538                    compact_task.task_status = TaskStatus::Success;
539                    compact_status.report_compact_task(&compact_task);
540                    if !is_trivial_reclaim {
541                        compact_task
542                            .sorted_output_ssts
543                            .clone_from(&compact_task.input_ssts[0].table_infos);
544                    }
545                    update_table_stats_for_vnode_watermark_trivial_reclaim(
546                        &mut version_stats.table_stats,
547                        &compact_task,
548                    );
549                    self.metrics
550                        .compact_frequency
551                        .with_label_values(&[
552                            label,
553                            &compact_task.compaction_group_id.to_string(),
554                            selector.task_type().as_str_name(),
555                            "SUCCESS",
556                        ])
557                        .inc();
558
559                    version.apply_compact_task(&compact_task);
560                    trivial_tasks.push(compact_task);
561                    if trivial_tasks.len() >= self.env.opts.max_trivial_move_task_count_per_loop {
562                        break 'outside;
563                    }
564                } else {
565                    self.calculate_vnode_partition(
566                        &mut compact_task,
567                        group_config.compaction_config.as_ref(),
568                        version
569                            .latest_version()
570                            .get_compaction_group_levels(compaction_group_id),
571                    )
572                    .await?;
573
574                    let table_ids_to_be_compacted = compact_task.build_compact_table_ids();
575
576                    let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks) = version
577                        .latest_version()
578                        .safe_epoch_table_watermarks(&table_ids_to_be_compacted)
579                        .into_iter()
580                        .partition(|(_table_id, table_watermarke)| {
581                            matches!(
582                                table_watermarke.watermark_type,
583                                WatermarkSerdeType::PkPrefix
584                            )
585                        });
586
587                    compact_task.pk_prefix_table_watermarks = pk_prefix_table_watermarks;
588                    compact_task.non_pk_prefix_table_watermarks = non_pk_prefix_table_watermarks;
589
590                    compact_task.table_schemas = compact_task
591                        .existing_table_ids
592                        .iter()
593                        .filter_map(|table_id| {
594                            all_versioned_table_schemas.get(table_id).map(|column_ids| {
595                                (
596                                    *table_id,
597                                    TableSchema {
598                                        column_ids: column_ids.clone(),
599                                    },
600                                )
601                            })
602                        })
603                        .collect();
604
605                    compact_task_assignment.insert(
606                        compact_task.task_id,
607                        CompactTaskAssignment {
608                            compact_task: Some(compact_task.clone().into()),
609                            context_id: META_NODE_ID, // deprecated
610                        },
611                    );
612
613                    pick_tasks.push(compact_task);
614                    break;
615                }
616
617                stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
618                stats = LocalSelectorStatistic::default();
619            }
620            if pick_tasks
621                .last()
622                .map(|task| task.compaction_group_id != compaction_group_id)
623                .unwrap_or(true)
624            {
625                unschedule_groups.push(compaction_group_id);
626            }
627            stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
628        }
629
630        if !trivial_tasks.is_empty() {
631            commit_multi_var!(
632                self.meta_store_ref(),
633                compaction_statuses,
634                compact_task_assignment,
635                version,
636                version_stats
637            )?;
638            self.metrics
639                .compact_task_batch_count
640                .with_label_values(&["batch_trivial_move"])
641                .observe(trivial_tasks.len() as f64);
642
643            for trivial_task in &trivial_tasks {
644                self.metrics
645                    .compact_task_trivial_move_sst_count
646                    .with_label_values(&[&trivial_task.compaction_group_id.to_string()])
647                    .observe(trivial_task.input_ssts[0].table_infos.len() as _);
648            }
649
650            drop(versioning_guard);
651        } else {
652            // We are using a single transaction to ensure that each task has progress when it is
653            // created.
654            drop(versioning_guard);
655            commit_multi_var!(
656                self.meta_store_ref(),
657                compaction_statuses,
658                compact_task_assignment
659            )?;
660        }
661        drop(compaction_guard);
662        if !pick_tasks.is_empty() {
663            self.metrics
664                .compact_task_batch_count
665                .with_label_values(&["batch_get_compact_task"])
666                .observe(pick_tasks.len() as f64);
667        }
668
669        for compact_task in &mut pick_tasks {
670            let compaction_group_id = compact_task.compaction_group_id;
671
672            // Initiate heartbeat for the task to track its progress.
673            self.compactor_manager
674                .initiate_task_heartbeat(compact_task.clone());
675
676            // this task has been finished.
677            compact_task.task_status = TaskStatus::Pending;
678            let compact_task_statistics = statistics_compact_task(compact_task);
679
680            let level_type_label = build_compact_task_level_type_metrics_label(
681                compact_task.input_ssts[0].level_idx as usize,
682                compact_task.input_ssts.last().unwrap().level_idx as usize,
683            );
684
685            let level_count = compact_task.input_ssts.len();
686            if compact_task.input_ssts[0].level_idx == 0 {
687                self.metrics
688                    .l0_compact_level_count
689                    .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
690                    .observe(level_count as _);
691            }
692
693            self.metrics
694                .compact_task_size
695                .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
696                .observe(compact_task_statistics.total_file_size as _);
697
698            self.metrics
699                .compact_task_size
700                .with_label_values(&[
701                    &compaction_group_id.to_string(),
702                    &format!("{} uncompressed", level_type_label),
703                ])
704                .observe(compact_task_statistics.total_uncompressed_file_size as _);
705
706            self.metrics
707                .compact_task_file_count
708                .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
709                .observe(compact_task_statistics.total_file_count as _);
710
711            tracing::trace!(
712                "For compaction group {}: pick up {} {} sub_level in level {} to compact to target {}. cost time: {:?} compact_task_statistics {:?}",
713                compaction_group_id,
714                level_count,
715                compact_task.input_ssts[0].level_type.as_str_name(),
716                compact_task.input_ssts[0].level_idx,
717                compact_task.target_level,
718                start_time.elapsed(),
719                compact_task_statistics
720            );
721        }
722
723        #[cfg(test)]
724        {
725            self.check_state_consistency().await;
726        }
727        pick_tasks.extend(trivial_tasks);
728        Ok((pick_tasks, unschedule_groups))
729    }
730
731    /// Cancels a compaction task no matter it's assigned or unassigned.
732    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
733        fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore(
734            anyhow::anyhow!("failpoint metastore err")
735        )));
736        let ret = self
737            .cancel_compact_task_impl(vec![task_id], task_status)
738            .await?;
739        Ok(ret[0])
740    }
741
742    pub async fn cancel_compact_tasks(
743        &self,
744        tasks: Vec<u64>,
745        task_status: TaskStatus,
746    ) -> Result<Vec<bool>> {
747        self.cancel_compact_task_impl(tasks, task_status).await
748    }
749
750    async fn cancel_compact_task_impl(
751        &self,
752        task_ids: Vec<u64>,
753        task_status: TaskStatus,
754    ) -> Result<Vec<bool>> {
755        assert!(CANCEL_STATUS_SET.contains(&task_status));
756        let tasks = task_ids
757            .into_iter()
758            .map(|task_id| ReportTask {
759                task_id,
760                task_status,
761                sorted_output_ssts: vec![],
762                table_stats_change: HashMap::default(),
763                object_timestamps: HashMap::default(),
764            })
765            .collect_vec();
766        let rets = self.report_compact_tasks(tasks).await?;
767        #[cfg(test)]
768        {
769            self.check_state_consistency().await;
770        }
771        Ok(rets)
772    }
773
774    async fn get_compact_tasks(
775        &self,
776        mut compaction_groups: Vec<CompactionGroupId>,
777        max_select_count: usize,
778        selector: &mut Box<dyn CompactionSelector>,
779    ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
780        fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
781            anyhow::anyhow!("failpoint metastore error")
782        )));
783        compaction_groups.shuffle(&mut thread_rng());
784        let (mut tasks, groups) = self
785            .get_compact_tasks_impl(compaction_groups, max_select_count, selector)
786            .await?;
787        tasks.retain(|task| {
788            if task.task_status == TaskStatus::Success {
789                debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
790                false
791            } else {
792                true
793            }
794        });
795        Ok((tasks, groups))
796    }
797
798    pub async fn get_compact_task(
799        &self,
800        compaction_group_id: CompactionGroupId,
801        selector: &mut Box<dyn CompactionSelector>,
802    ) -> Result<Option<CompactTask>> {
803        fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
804            anyhow::anyhow!("failpoint metastore error")
805        )));
806
807        let (normal_tasks, _) = self
808            .get_compact_tasks_impl(vec![compaction_group_id], 1, selector)
809            .await?;
810        for task in normal_tasks {
811            if task.task_status != TaskStatus::Success {
812                return Ok(Some(task));
813            }
814            debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
815        }
816        Ok(None)
817    }
818
819    pub async fn manual_get_compact_task(
820        &self,
821        compaction_group_id: CompactionGroupId,
822        manual_compaction_option: ManualCompactionOption,
823    ) -> Result<Option<CompactTask>> {
824        let mut selector: Box<dyn CompactionSelector> =
825            Box::new(ManualCompactionSelector::new(manual_compaction_option));
826        self.get_compact_task(compaction_group_id, &mut selector)
827            .await
828    }
829
830    pub async fn report_compact_task(
831        &self,
832        task_id: u64,
833        task_status: TaskStatus,
834        sorted_output_ssts: Vec<SstableInfo>,
835        table_stats_change: Option<PbTableStatsMap>,
836        object_timestamps: HashMap<HummockSstableObjectId, u64>,
837    ) -> Result<bool> {
838        let rets = self
839            .report_compact_tasks(vec![ReportTask {
840                task_id,
841                task_status,
842                sorted_output_ssts,
843                table_stats_change: table_stats_change.unwrap_or_default(),
844                object_timestamps,
845            }])
846            .await?;
847        Ok(rets[0])
848    }
849
850    pub async fn report_compact_tasks(&self, report_tasks: Vec<ReportTask>) -> Result<Vec<bool>> {
851        let compaction_guard = self.compaction.write().await;
852        let versioning_guard = self.versioning.write().await;
853
854        self.report_compact_tasks_impl(report_tasks, compaction_guard, versioning_guard)
855            .await
856    }
857
858    /// Finishes or cancels a compaction task, according to `task_status`.
859    ///
860    /// If `context_id` is not None, its validity will be checked when writing meta store.
861    /// Its ownership of the task is checked as well.
862    ///
863    /// Return Ok(false) indicates either the task is not found,
864    /// or the task is not owned by `context_id` when `context_id` is not None.
865    pub async fn report_compact_tasks_impl(
866        &self,
867        report_tasks: Vec<ReportTask>,
868        mut compaction_guard: RwLockWriteGuard<'_, Compaction>,
869        mut versioning_guard: RwLockWriteGuard<'_, Versioning>,
870    ) -> Result<Vec<bool>> {
871        let deterministic_mode = self.env.opts.compaction_deterministic_test;
872        let compaction: &mut Compaction = &mut compaction_guard;
873        let start_time = Instant::now();
874        let original_keys = compaction.compaction_statuses.keys().cloned().collect_vec();
875        let mut compact_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
876        let mut rets = vec![false; report_tasks.len()];
877        let mut compact_task_assignment =
878            BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
879        // The compaction task is finished.
880        let versioning: &mut Versioning = &mut versioning_guard;
881        let _timer = start_measure_real_process_timer!(self, "report_compact_tasks");
882
883        // purge stale compact_status
884        for group_id in original_keys {
885            if !versioning.current_version.levels.contains_key(&group_id) {
886                compact_statuses.remove(group_id);
887            }
888        }
889        let mut tasks = vec![];
890
891        let mut version = HummockVersionTransaction::new(
892            &mut versioning.current_version,
893            &mut versioning.hummock_version_deltas,
894            self.env.notification_manager(),
895            None,
896            &self.metrics,
897        );
898
899        if deterministic_mode {
900            version.disable_apply_to_txn();
901        }
902
903        let mut version_stats = HummockVersionStatsTransaction::new(
904            &mut versioning.version_stats,
905            self.env.notification_manager(),
906        );
907        let mut success_count = 0;
908        for (idx, task) in report_tasks.into_iter().enumerate() {
909            rets[idx] = true;
910            let mut compact_task = match compact_task_assignment.remove(task.task_id) {
911                Some(compact_task) => CompactTask::from(compact_task.compact_task.unwrap()),
912                None => {
913                    tracing::warn!("{}", format!("compact task {} not found", task.task_id));
914                    rets[idx] = false;
915                    continue;
916                }
917            };
918
919            {
920                // apply result
921                compact_task.task_status = task.task_status;
922                compact_task.sorted_output_ssts = task.sorted_output_ssts;
923            }
924
925            match compact_statuses.get_mut(compact_task.compaction_group_id) {
926                Some(mut compact_status) => {
927                    compact_status.report_compact_task(&compact_task);
928                }
929                None => {
930                    // When the group_id is not found in the compaction_statuses, it means the group has been removed.
931                    // The task is invalid and should be canceled.
932                    // e.g.
933                    // 1. The group is removed by the user unregistering the tables
934                    // 2. The group is removed by the group scheduling algorithm
935                    compact_task.task_status = TaskStatus::InvalidGroupCanceled;
936                }
937            }
938
939            let is_success = if let TaskStatus::Success = compact_task.task_status {
940                match self
941                    .report_compaction_sanity_check(&task.object_timestamps)
942                    .await
943                {
944                    Err(e) => {
945                        warn!(
946                            "failed to commit compaction task {} {}",
947                            compact_task.task_id,
948                            e.as_report()
949                        );
950                        compact_task.task_status = TaskStatus::RetentionTimeRejected;
951                        false
952                    }
953                    _ => {
954                        let group = version
955                            .latest_version()
956                            .levels
957                            .get(&compact_task.compaction_group_id)
958                            .unwrap();
959                        let is_expired = compact_task.is_expired(group.compaction_group_version_id);
960                        if is_expired {
961                            compact_task.task_status = TaskStatus::InputOutdatedCanceled;
962                            warn!(
963                                "The task may be expired because of group split, task:\n {:?}",
964                                compact_task_to_string(&compact_task)
965                            );
966                        }
967                        !is_expired
968                    }
969                }
970            } else {
971                false
972            };
973            if is_success {
974                success_count += 1;
975                version.apply_compact_task(&compact_task);
976                if purge_prost_table_stats(&mut version_stats.table_stats, version.latest_version())
977                {
978                    self.metrics.version_stats.reset();
979                    versioning.local_metrics.clear();
980                }
981                add_prost_table_stats_map(&mut version_stats.table_stats, &task.table_stats_change);
982                trigger_local_table_stat(
983                    &self.metrics,
984                    &mut versioning.local_metrics,
985                    &version_stats,
986                    &task.table_stats_change,
987                );
988            }
989            tasks.push(compact_task);
990        }
991        if success_count > 0 {
992            commit_multi_var!(
993                self.meta_store_ref(),
994                compact_statuses,
995                compact_task_assignment,
996                version,
997                version_stats
998            )?;
999
1000            self.metrics
1001                .compact_task_batch_count
1002                .with_label_values(&["batch_report_task"])
1003                .observe(success_count as f64);
1004        } else {
1005            // The compaction task is cancelled or failed.
1006            commit_multi_var!(
1007                self.meta_store_ref(),
1008                compact_statuses,
1009                compact_task_assignment
1010            )?;
1011        }
1012
1013        let mut success_groups = vec![];
1014        for compact_task in &tasks {
1015            self.compactor_manager
1016                .remove_task_heartbeat(compact_task.task_id);
1017            tracing::trace!(
1018                "Reported compaction task. {}. cost time: {:?}",
1019                compact_task_to_string(compact_task),
1020                start_time.elapsed(),
1021            );
1022
1023            if !deterministic_mode
1024                && (matches!(compact_task.task_type, compact_task::TaskType::Dynamic)
1025                    || matches!(compact_task.task_type, compact_task::TaskType::Emergency))
1026            {
1027                // only try send Dynamic compaction
1028                self.try_send_compaction_request(
1029                    compact_task.compaction_group_id,
1030                    compact_task::TaskType::Dynamic,
1031                );
1032            }
1033
1034            if compact_task.task_status == TaskStatus::Success {
1035                success_groups.push(compact_task.compaction_group_id);
1036            }
1037        }
1038
1039        trigger_compact_tasks_stat(
1040            &self.metrics,
1041            &tasks,
1042            &compaction.compaction_statuses,
1043            &versioning_guard.current_version,
1044        );
1045        drop(versioning_guard);
1046        if !success_groups.is_empty() {
1047            self.try_update_write_limits(&success_groups).await;
1048        }
1049        Ok(rets)
1050    }
1051
1052    /// Triggers compacitons to specified compaction groups.
1053    /// Don't wait for compaction finish
1054    pub async fn trigger_compaction_deterministic(
1055        &self,
1056        _base_version_id: HummockVersionId,
1057        compaction_groups: Vec<CompactionGroupId>,
1058    ) -> Result<()> {
1059        self.on_current_version(|old_version| {
1060            tracing::info!(
1061                "Trigger compaction for version {}, groups {:?}",
1062                old_version.id,
1063                compaction_groups
1064            );
1065        })
1066        .await;
1067
1068        if compaction_groups.is_empty() {
1069            return Ok(());
1070        }
1071        for compaction_group in compaction_groups {
1072            self.try_send_compaction_request(compaction_group, compact_task::TaskType::Dynamic);
1073        }
1074        Ok(())
1075    }
1076
1077    pub async fn trigger_manual_compaction(
1078        &self,
1079        compaction_group: CompactionGroupId,
1080        manual_compaction_option: ManualCompactionOption,
1081    ) -> Result<()> {
1082        let start_time = Instant::now();
1083
1084        // 1. Get idle compactor.
1085        let compactor = match self.compactor_manager.next_compactor() {
1086            Some(compactor) => compactor,
1087            None => {
1088                tracing::warn!("trigger_manual_compaction No compactor is available.");
1089                return Err(anyhow::anyhow!(
1090                    "trigger_manual_compaction No compactor is available. compaction_group {}",
1091                    compaction_group
1092                )
1093                .into());
1094            }
1095        };
1096
1097        // 2. Get manual compaction task.
1098        let compact_task = self
1099            .manual_get_compact_task(compaction_group, manual_compaction_option)
1100            .await;
1101        let compact_task = match compact_task {
1102            Ok(Some(compact_task)) => compact_task,
1103            Ok(None) => {
1104                // No compaction task available.
1105                return Err(anyhow::anyhow!(
1106                    "trigger_manual_compaction No compaction_task is available. compaction_group {}",
1107                    compaction_group
1108                )
1109                    .into());
1110            }
1111            Err(err) => {
1112                tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
1113
1114                return Err(anyhow::anyhow!(err)
1115                    .context(format!(
1116                        "Failed to get compaction task for compaction_group {}",
1117                        compaction_group,
1118                    ))
1119                    .into());
1120            }
1121        };
1122
1123        // 3. send task to compactor
1124        let compact_task_string = compact_task_to_string(&compact_task);
1125        // TODO: shall we need to cancel on meta ?
1126        compactor
1127            .send_event(ResponseEvent::CompactTask(compact_task.into()))
1128            .with_context(|| {
1129                format!(
1130                    "Failed to trigger compaction task for compaction_group {}",
1131                    compaction_group,
1132                )
1133            })?;
1134
1135        tracing::info!(
1136            "Trigger manual compaction task. {}. cost time: {:?}",
1137            &compact_task_string,
1138            start_time.elapsed(),
1139        );
1140
1141        Ok(())
1142    }
1143
1144    /// Sends a compaction request.
1145    pub fn try_send_compaction_request(
1146        &self,
1147        compaction_group: CompactionGroupId,
1148        task_type: compact_task::TaskType,
1149    ) -> bool {
1150        match self
1151            .compaction_state
1152            .try_sched_compaction(compaction_group, task_type)
1153        {
1154            Ok(_) => true,
1155            Err(e) => {
1156                tracing::error!(
1157                    error = %e.as_report(),
1158                    "failed to send compaction request for compaction group {}",
1159                    compaction_group,
1160                );
1161                false
1162            }
1163        }
1164    }
1165
1166    /// Apply vnode-aligned compaction for large single-table levels.
1167    /// This enables one-vnode-per-SST alignment for precise query pruning.
1168    async fn try_apply_vnode_aligned_partition(
1169        &self,
1170        compact_task: &mut CompactTask,
1171        compaction_config: &CompactionConfig,
1172        levels: &Levels,
1173    ) -> Result<bool> {
1174        // Check if vnode-aligned compaction is enabled for this level
1175        // Only enable for single-table scenarios to avoid cross-table complexity
1176        let Some(threshold) = compaction_config.vnode_aligned_level_size_threshold else {
1177            return Ok(false);
1178        };
1179
1180        if compact_task.target_level < compact_task.base_level
1181            || compact_task.existing_table_ids.len() != 1
1182        {
1183            return Ok(false);
1184        }
1185
1186        // Calculate total size of the entire target level
1187        let target_level_size = levels
1188            .get_level(compact_task.target_level as usize)
1189            .total_file_size;
1190
1191        if target_level_size < threshold {
1192            return Ok(false);
1193        }
1194
1195        // Enable strict one-vnode-per-SST alignment for single table
1196        let table_id = compact_task.existing_table_ids[0];
1197
1198        // Get the actual vnode count from table catalog
1199        let table = self
1200            .metadata_manager
1201            .get_table_catalog_by_ids(&[table_id])
1202            .await
1203            .with_context(|| {
1204                format!(
1205                    "Failed to get table catalog for table_id {} in compaction_group {}",
1206                    table_id, compact_task.compaction_group_id
1207                )
1208            })
1209            .map_err(Error::Internal)?
1210            .into_iter()
1211            .next()
1212            .ok_or_else(|| {
1213                Error::Internal(anyhow::anyhow!(
1214                    "Table catalog not found for table_id {} in compaction_group {}",
1215                    table_id,
1216                    compact_task.compaction_group_id
1217                ))
1218            })?;
1219
1220        compact_task
1221            .table_vnode_partition
1222            .insert(table_id, table.vnode_count() as u32);
1223
1224        Ok(true)
1225    }
1226
1227    /// Apply `split_weight_by_vnode` based partition strategy.
1228    /// This handles dynamic partitioning based on table size and write throughput.
1229    fn apply_split_weight_by_vnode_partition(
1230        &self,
1231        compact_task: &mut CompactTask,
1232        compaction_config: &CompactionConfig,
1233    ) {
1234        if compaction_config.split_weight_by_vnode > 0 {
1235            for table_id in &compact_task.existing_table_ids {
1236                compact_task
1237                    .table_vnode_partition
1238                    .insert(*table_id, compact_task.split_weight_by_vnode);
1239            }
1240
1241            return;
1242        }
1243
1244        // Calculate per-table size from input SSTs
1245        let mut table_size_info: HashMap<TableId, u64> = HashMap::default();
1246        let mut existing_table_ids: HashSet<TableId> = HashSet::default();
1247        for input_ssts in &compact_task.input_ssts {
1248            for sst in &input_ssts.table_infos {
1249                existing_table_ids.extend(sst.table_ids.iter());
1250                for table_id in &sst.table_ids {
1251                    *table_size_info.entry(*table_id).or_default() +=
1252                        sst.sst_size / (sst.table_ids.len() as u64);
1253                }
1254            }
1255        }
1256        compact_task
1257            .existing_table_ids
1258            .retain(|table_id| existing_table_ids.contains(table_id));
1259
1260        let hybrid_vnode_count = self.env.opts.hybrid_partition_node_count;
1261        let default_partition_count = self.env.opts.partition_vnode_count;
1262        let compact_task_table_size_partition_threshold_low = self
1263            .env
1264            .opts
1265            .compact_task_table_size_partition_threshold_low;
1266        let compact_task_table_size_partition_threshold_high = self
1267            .env
1268            .opts
1269            .compact_task_table_size_partition_threshold_high;
1270
1271        // Check latest write throughput
1272        let table_write_throughput_statistic_manager =
1273            self.table_write_throughput_statistic_manager.read();
1274        let timestamp = chrono::Utc::now().timestamp();
1275
1276        for (table_id, compact_table_size) in table_size_info {
1277            let write_throughput = table_write_throughput_statistic_manager
1278                .get_table_throughput_descending(table_id, timestamp)
1279                .peekable()
1280                .peek()
1281                .map(|item| item.throughput)
1282                .unwrap_or(0);
1283
1284            if compact_table_size > compact_task_table_size_partition_threshold_high
1285                && default_partition_count > 0
1286            {
1287                compact_task
1288                    .table_vnode_partition
1289                    .insert(table_id, default_partition_count);
1290            } else if (compact_table_size > compact_task_table_size_partition_threshold_low
1291                || (write_throughput > self.env.opts.table_high_write_throughput_threshold
1292                    && compact_table_size > compaction_config.target_file_size_base))
1293                && hybrid_vnode_count > 0
1294            {
1295                compact_task
1296                    .table_vnode_partition
1297                    .insert(table_id, hybrid_vnode_count);
1298            } else if compact_table_size > compaction_config.target_file_size_base {
1299                compact_task.table_vnode_partition.insert(table_id, 1);
1300            }
1301        }
1302
1303        compact_task
1304            .table_vnode_partition
1305            .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id));
1306    }
1307
1308    pub(crate) async fn calculate_vnode_partition(
1309        &self,
1310        compact_task: &mut CompactTask,
1311        compaction_config: &CompactionConfig,
1312        levels: &Levels,
1313    ) -> Result<()> {
1314        // Try vnode-aligned partition first (for large single-table levels)
1315        if self
1316            .try_apply_vnode_aligned_partition(compact_task, compaction_config, levels)
1317            .await?
1318        {
1319            return Ok(());
1320        }
1321
1322        // Do not split sst by vnode partition when target_level > base_level
1323        // The purpose of data alignment is mainly to improve the parallelism of base level compaction
1324        // and reduce write amplification. However, at high level, the size of the sst file is often
1325        // larger and only contains the data of a single table_id, so there is no need to cut it.
1326        if compact_task.target_level > compact_task.base_level {
1327            return Ok(());
1328        }
1329
1330        // Apply split_weight_by_vnode based partition strategy
1331        self.apply_split_weight_by_vnode_partition(compact_task, compaction_config);
1332
1333        Ok(())
1334    }
1335
1336    pub fn compactor_manager_ref(&self) -> crate::hummock::CompactorManagerRef {
1337        self.compactor_manager.clone()
1338    }
1339}
1340
1341#[cfg(any(test, feature = "test"))]
1342impl HummockManager {
1343    pub async fn compaction_task_from_assignment_for_test(
1344        &self,
1345        task_id: u64,
1346    ) -> Option<CompactTaskAssignment> {
1347        let compaction_guard = self.compaction.read().await;
1348        let assignment_ref = &compaction_guard.compact_task_assignment;
1349        assignment_ref.get(&task_id).cloned()
1350    }
1351
1352    pub async fn report_compact_task_for_test(
1353        &self,
1354        task_id: u64,
1355        compact_task: Option<CompactTask>,
1356        task_status: TaskStatus,
1357        sorted_output_ssts: Vec<SstableInfo>,
1358        table_stats_change: Option<PbTableStatsMap>,
1359    ) -> Result<()> {
1360        if let Some(task) = compact_task {
1361            let mut guard = self.compaction.write().await;
1362            guard.compact_task_assignment.insert(
1363                task_id,
1364                CompactTaskAssignment {
1365                    compact_task: Some(task.into()),
1366                    context_id: 0.into(),
1367                },
1368            );
1369        }
1370
1371        // In the test, the contents of the compact task may have been modified directly, while the contents of compact_task_assignment were not modified.
1372        // So we pass the modified compact_task directly into the `report_compact_task_impl`
1373        self.report_compact_tasks(vec![ReportTask {
1374            task_id,
1375            task_status,
1376            sorted_output_ssts,
1377            table_stats_change: table_stats_change.unwrap_or_default(),
1378            object_timestamps: HashMap::default(),
1379        }])
1380        .await?;
1381        Ok(())
1382    }
1383}
1384
1385#[derive(Debug, Default)]
1386pub struct CompactionState {
1387    scheduled: Mutex<HashSet<(CompactionGroupId, compact_task::TaskType)>>,
1388}
1389
1390impl CompactionState {
1391    pub fn new() -> Self {
1392        Self {
1393            scheduled: Default::default(),
1394        }
1395    }
1396
1397    /// Enqueues only if the target is not yet in queue.
1398    pub fn try_sched_compaction(
1399        &self,
1400        compaction_group: CompactionGroupId,
1401        task_type: TaskType,
1402    ) -> std::result::Result<bool, SendError<CompactionRequestChannelItem>> {
1403        let mut guard = self.scheduled.lock();
1404        let key = (compaction_group, task_type);
1405        if guard.contains(&key) {
1406            return Ok(false);
1407        }
1408        guard.insert(key);
1409        Ok(true)
1410    }
1411
1412    pub fn unschedule(
1413        &self,
1414        compaction_group: CompactionGroupId,
1415        task_type: compact_task::TaskType,
1416    ) {
1417        self.scheduled.lock().remove(&(compaction_group, task_type));
1418    }
1419
1420    pub fn auto_pick_type(&self, group: CompactionGroupId) -> Option<TaskType> {
1421        let guard = self.scheduled.lock();
1422        if guard.contains(&(group, compact_task::TaskType::Dynamic)) {
1423            Some(compact_task::TaskType::Dynamic)
1424        } else if guard.contains(&(group, compact_task::TaskType::SpaceReclaim)) {
1425            Some(compact_task::TaskType::SpaceReclaim)
1426        } else if guard.contains(&(group, compact_task::TaskType::Ttl)) {
1427            Some(compact_task::TaskType::Ttl)
1428        } else if guard.contains(&(group, compact_task::TaskType::Tombstone)) {
1429            Some(compact_task::TaskType::Tombstone)
1430        } else if guard.contains(&(group, compact_task::TaskType::VnodeWatermark)) {
1431            Some(compact_task::TaskType::VnodeWatermark)
1432        } else {
1433            None
1434        }
1435    }
1436}
1437
1438impl Compaction {
1439    pub fn get_compact_task_assignments_by_group_id(
1440        &self,
1441        compaction_group_id: CompactionGroupId,
1442    ) -> Vec<CompactTaskAssignment> {
1443        self.compact_task_assignment
1444            .iter()
1445            .filter_map(|(_, assignment)| {
1446                if assignment
1447                    .compact_task
1448                    .as_ref()
1449                    .is_some_and(|task| task.compaction_group_id == compaction_group_id)
1450                {
1451                    Some(CompactTaskAssignment {
1452                        compact_task: assignment.compact_task.clone(),
1453                        context_id: assignment.context_id,
1454                    })
1455                } else {
1456                    None
1457                }
1458            })
1459            .collect()
1460    }
1461}
1462
1463#[derive(Clone, Default)]
1464pub struct CompactionGroupStatistic {
1465    pub group_id: CompactionGroupId,
1466    pub group_size: u64,
1467    pub table_statistic: BTreeMap<StateTableId, u64>,
1468    pub compaction_group_config: CompactionGroup,
1469}
1470
1471/// Updates table stats caused by vnode watermark trivial reclaim compaction.
1472fn update_table_stats_for_vnode_watermark_trivial_reclaim(
1473    table_stats: &mut PbTableStatsMap,
1474    task: &CompactTask,
1475) {
1476    if task.task_type != TaskType::VnodeWatermark {
1477        return;
1478    }
1479    let mut deleted_table_keys: HashMap<TableId, u64> = HashMap::default();
1480    for s in task.input_ssts.iter().flat_map(|l| l.table_infos.iter()) {
1481        assert_eq!(s.table_ids.len(), 1);
1482        let e = deleted_table_keys.entry(s.table_ids[0]).or_insert(0);
1483        *e += s.total_key_count;
1484    }
1485    for (table_id, delete_count) in deleted_table_keys {
1486        let Some(stats) = table_stats.get_mut(&table_id) else {
1487            continue;
1488        };
1489        if stats.total_key_count == 0 {
1490            continue;
1491        }
1492        let new_total_key_count = stats.total_key_count.saturating_sub(delete_count as i64);
1493        let ratio = new_total_key_count as f64 / stats.total_key_count as f64;
1494        // total_key_count is updated accurately.
1495        stats.total_key_count = new_total_key_count;
1496        // others are updated approximately.
1497        stats.total_key_size = (stats.total_key_size as f64 * ratio).ceil() as i64;
1498        stats.total_value_size = (stats.total_value_size as f64 * ratio).ceil() as i64;
1499    }
1500}
1501
1502#[derive(Debug, Clone)]
1503pub enum GroupState {
1504    /// The compaction group is not in emergency state.
1505    Normal,
1506
1507    /// The compaction group is in emergency state.
1508    Emergency(String), // reason
1509
1510    /// The compaction group is in write stop state.
1511    WriteStop(String), // reason
1512}
1513
1514impl GroupState {
1515    pub fn is_write_stop(&self) -> bool {
1516        matches!(self, Self::WriteStop(_))
1517    }
1518
1519    pub fn is_emergency(&self) -> bool {
1520        matches!(self, Self::Emergency(_))
1521    }
1522
1523    pub fn reason(&self) -> Option<&str> {
1524        match self {
1525            Self::Emergency(reason) | Self::WriteStop(reason) => Some(reason),
1526            _ => None,
1527        }
1528    }
1529}
1530
1531#[derive(Clone, Default)]
1532pub struct GroupStateValidator;
1533
1534impl GroupStateValidator {
1535    pub fn write_stop_sub_level_count(
1536        level_count: usize,
1537        compaction_config: &CompactionConfig,
1538    ) -> bool {
1539        let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize;
1540        level_count > threshold
1541    }
1542
1543    pub fn write_stop_l0_size(l0_size: u64, compaction_config: &CompactionConfig) -> bool {
1544        l0_size
1545            > compaction_config
1546                .level0_stop_write_threshold_max_size
1547                .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1548    }
1549
1550    pub fn write_stop_l0_file_count(
1551        l0_file_count: usize,
1552        compaction_config: &CompactionConfig,
1553    ) -> bool {
1554        l0_file_count
1555            > compaction_config
1556                .level0_stop_write_threshold_max_sst_count
1557                .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1558                as usize
1559    }
1560
1561    pub fn emergency_l0_file_count(
1562        l0_file_count: usize,
1563        compaction_config: &CompactionConfig,
1564    ) -> bool {
1565        l0_file_count
1566            > compaction_config
1567                .emergency_level0_sst_file_count
1568                .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1569                as usize
1570    }
1571
1572    pub fn emergency_l0_partition_count(
1573        last_l0_sub_level_partition_count: usize,
1574        compaction_config: &CompactionConfig,
1575    ) -> bool {
1576        last_l0_sub_level_partition_count
1577            > compaction_config
1578                .emergency_level0_sub_level_partition
1579                .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1580                as usize
1581    }
1582
1583    pub fn check_single_group_write_stop(
1584        levels: &Levels,
1585        compaction_config: &CompactionConfig,
1586    ) -> GroupState {
1587        if Self::write_stop_sub_level_count(levels.l0.sub_levels.len(), compaction_config) {
1588            return GroupState::WriteStop(format!(
1589                "WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels",
1590                levels.l0.sub_levels.len(),
1591                compaction_config.level0_stop_write_threshold_sub_level_number
1592            ));
1593        }
1594
1595        if Self::write_stop_l0_file_count(
1596            levels
1597                .l0
1598                .sub_levels
1599                .iter()
1600                .map(|l| l.table_infos.len())
1601                .sum(),
1602            compaction_config,
1603        ) {
1604            return GroupState::WriteStop(format!(
1605                "WriteStop(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1606                levels
1607                    .l0
1608                    .sub_levels
1609                    .iter()
1610                    .map(|l| l.table_infos.len())
1611                    .sum::<usize>(),
1612                compaction_config
1613                    .level0_stop_write_threshold_max_sst_count
1614                    .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1615            ));
1616        }
1617
1618        if Self::write_stop_l0_size(levels.l0.total_file_size, compaction_config) {
1619            return GroupState::WriteStop(format!(
1620                "WriteStop(l0_size: {}, threshold: {}) too large L0 size",
1621                levels.l0.total_file_size,
1622                compaction_config
1623                    .level0_stop_write_threshold_max_size
1624                    .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1625            ));
1626        }
1627
1628        GroupState::Normal
1629    }
1630
1631    pub fn check_single_group_emergency(
1632        levels: &Levels,
1633        compaction_config: &CompactionConfig,
1634    ) -> GroupState {
1635        if Self::emergency_l0_file_count(
1636            levels
1637                .l0
1638                .sub_levels
1639                .iter()
1640                .map(|l| l.table_infos.len())
1641                .sum(),
1642            compaction_config,
1643        ) {
1644            return GroupState::Emergency(format!(
1645                "Emergency(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1646                levels
1647                    .l0
1648                    .sub_levels
1649                    .iter()
1650                    .map(|l| l.table_infos.len())
1651                    .sum::<usize>(),
1652                compaction_config
1653                    .emergency_level0_sst_file_count
1654                    .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1655            ));
1656        }
1657
1658        if Self::emergency_l0_partition_count(
1659            levels
1660                .l0
1661                .sub_levels
1662                .first()
1663                .map(|l| l.table_infos.len())
1664                .unwrap_or(0),
1665            compaction_config,
1666        ) {
1667            return GroupState::Emergency(format!(
1668                "Emergency(l0_partition_count: {}, threshold: {}) too many L0 partitions",
1669                levels
1670                    .l0
1671                    .sub_levels
1672                    .first()
1673                    .map(|l| l.table_infos.len())
1674                    .unwrap_or(0),
1675                compaction_config
1676                    .emergency_level0_sub_level_partition
1677                    .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1678            ));
1679        }
1680
1681        GroupState::Normal
1682    }
1683
1684    pub fn group_state(levels: &Levels, compaction_config: &CompactionConfig) -> GroupState {
1685        let state = Self::check_single_group_write_stop(levels, compaction_config);
1686        if state.is_write_stop() {
1687            return state;
1688        }
1689
1690        Self::check_single_group_emergency(levels, compaction_config)
1691    }
1692}