risingwave_meta/hummock/manager/compaction/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17use std::time::Instant;
18
19use anyhow::Context;
20use compaction_event_loop::{
21    HummockCompactionEventDispatcher, HummockCompactionEventHandler, HummockCompactionEventLoop,
22    HummockCompactorDedicatedEventLoop,
23};
24use fail::fail_point;
25use itertools::Itertools;
26use parking_lot::Mutex;
27use rand::rng as thread_rng;
28use rand::seq::SliceRandom;
29use risingwave_common::catalog::TableId;
30use risingwave_common::config::meta::default::compaction_config;
31use risingwave_common::hash::VnodeCountCompat;
32use risingwave_common::util::epoch::Epoch;
33use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask};
34use risingwave_hummock_sdk::compaction_group::StateTableId;
35use risingwave_hummock_sdk::key_range::KeyRange;
36use risingwave_hummock_sdk::level::Levels;
37use risingwave_hummock_sdk::sstable_info::SstableInfo;
38use risingwave_hummock_sdk::table_stats::{
39    PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats,
40};
41use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
42use risingwave_hummock_sdk::version::{GroupDelta, IntraLevelDelta};
43use risingwave_hummock_sdk::{
44    CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockSstableId,
45    HummockSstableObjectId, HummockVersionId, compact_task_to_string, statistics_compact_task,
46};
47use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
48use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
49use risingwave_pb::hummock::{
50    CompactTaskAssignment, CompactionConfig, PbCompactStatus, PbCompactTaskAssignment,
51    SubscribeCompactionEventRequest, TableOption, TableSchema, compact_task,
52};
53use thiserror_ext::AsReport;
54use tokio::sync::RwLockWriteGuard;
55use tokio::sync::mpsc::UnboundedReceiver;
56use tokio::sync::mpsc::error::SendError;
57use tokio::sync::oneshot::Sender;
58use tokio::task::JoinHandle;
59use tonic::Streaming;
60use tracing::warn;
61
62use crate::hummock::compaction::selector::level_selector::PickerInfo;
63use crate::hummock::compaction::selector::{
64    DynamicLevelSelector, DynamicLevelSelectorCore, LocalSelectorStatistic, ManualCompactionOption,
65    ManualCompactionSelector, SpaceReclaimCompactionSelector, TombstoneCompactionSelector,
66    TtlCompactionSelector, VnodeWatermarkCompactionSelector,
67};
68use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig, CompactionSelector};
69use crate::hummock::error::{Error, Result};
70use crate::hummock::manager::transaction::{
71    HummockVersionStatsTransaction, HummockVersionTransaction,
72};
73use crate::hummock::manager::versioning::Versioning;
74use crate::hummock::metrics_utils::{
75    build_compact_task_level_type_metrics_label, trigger_compact_tasks_stat,
76    trigger_local_table_stat,
77};
78use crate::hummock::model::CompactionGroup;
79use crate::hummock::sequence::next_compaction_task_id;
80use crate::hummock::{HummockManager, commit_multi_var, start_measure_real_process_timer};
81use crate::manager::META_NODE_ID;
82use crate::model::BTreeMapTransaction;
83
84pub mod compaction_event_loop;
85pub mod compaction_group_manager;
86pub mod compaction_group_schedule;
87
88static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
89    [
90        TaskStatus::ManualCanceled,
91        TaskStatus::SendFailCanceled,
92        TaskStatus::AssignFailCanceled,
93        TaskStatus::HeartbeatCanceled,
94        TaskStatus::InvalidGroupCanceled,
95        TaskStatus::NoAvailMemoryResourceCanceled,
96        TaskStatus::NoAvailCpuResourceCanceled,
97        TaskStatus::HeartbeatProgressCanceled,
98    ]
99    .into_iter()
100    .collect()
101});
102
103type CompactionRequestChannelItem = (CompactionGroupId, compact_task::TaskType);
104
105fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> {
106    let mut compaction_selectors: HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> =
107        HashMap::default();
108    compaction_selectors.insert(
109        compact_task::TaskType::Dynamic,
110        Box::<DynamicLevelSelector>::default(),
111    );
112    compaction_selectors.insert(
113        compact_task::TaskType::SpaceReclaim,
114        Box::<SpaceReclaimCompactionSelector>::default(),
115    );
116    compaction_selectors.insert(
117        compact_task::TaskType::Ttl,
118        Box::<TtlCompactionSelector>::default(),
119    );
120    compaction_selectors.insert(
121        compact_task::TaskType::Tombstone,
122        Box::<TombstoneCompactionSelector>::default(),
123    );
124    compaction_selectors.insert(
125        compact_task::TaskType::VnodeWatermark,
126        Box::<VnodeWatermarkCompactionSelector>::default(),
127    );
128    compaction_selectors
129}
130
131impl HummockVersionTransaction<'_> {
132    fn apply_compact_task(&mut self, compact_task: &CompactTask) {
133        let mut version_delta = self.new_delta();
134        let trivial_move = compact_task.is_trivial_move_task();
135        version_delta.trivial_move = trivial_move;
136
137        let group_deltas = &mut version_delta
138            .group_deltas
139            .entry(compact_task.compaction_group_id)
140            .or_default()
141            .group_deltas;
142        let mut removed_table_ids_map: BTreeMap<u32, HashSet<HummockSstableId>> =
143            BTreeMap::default();
144
145        for level in &compact_task.input_ssts {
146            let level_idx = level.level_idx;
147
148            removed_table_ids_map
149                .entry(level_idx)
150                .or_default()
151                .extend(level.table_infos.iter().map(|sst| sst.sst_id));
152        }
153
154        for (level_idx, removed_table_ids) in removed_table_ids_map {
155            let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
156                level_idx,
157                0, // default
158                removed_table_ids,
159                vec![], // default
160                0,      // default
161                compact_task.compaction_group_version_id,
162            ));
163
164            group_deltas.push(group_delta);
165        }
166
167        let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
168            compact_task.target_level,
169            compact_task.target_sub_level_id,
170            HashSet::new(), // default
171            compact_task.sorted_output_ssts.clone(),
172            compact_task.split_weight_by_vnode,
173            compact_task.compaction_group_version_id,
174        ));
175
176        group_deltas.push(group_delta);
177        version_delta.pre_apply();
178    }
179}
180
181#[derive(Default)]
182pub struct Compaction {
183    /// Compaction task that is already assigned to a compactor
184    pub compact_task_assignment: BTreeMap<HummockCompactionTaskId, PbCompactTaskAssignment>,
185    /// `CompactStatus` of each compaction group
186    pub compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus>,
187
188    pub _deterministic_mode: bool,
189}
190
191impl HummockManager {
192    pub async fn get_assigned_compact_task_num(&self) -> u64 {
193        self.compaction.read().await.compact_task_assignment.len() as u64
194    }
195
196    pub async fn list_compaction_status(
197        &self,
198    ) -> (Vec<PbCompactStatus>, Vec<CompactTaskAssignment>) {
199        let compaction = self.compaction.read().await;
200        (
201            compaction.compaction_statuses.values().map_into().collect(),
202            compaction
203                .compact_task_assignment
204                .values()
205                .cloned()
206                .collect(),
207        )
208    }
209
210    pub async fn get_compaction_scores(
211        &self,
212        compaction_group_id: CompactionGroupId,
213    ) -> Vec<PickerInfo> {
214        let (status, levels, group) = {
215            let compaction = self.compaction.read().await;
216            let versioning = self.versioning.read().await;
217            let config_manager = self.compaction_group_manager.read().await;
218            match (
219                compaction.compaction_statuses.get(&compaction_group_id),
220                versioning.current_version.levels.get(&compaction_group_id),
221                config_manager.try_get_compaction_group_config(compaction_group_id),
222            ) {
223                (Some(cs), Some(v), Some(cf)) => (cs.to_owned(), v.to_owned(), cf),
224                _ => {
225                    return vec![];
226                }
227            }
228        };
229        let dynamic_level_core = DynamicLevelSelectorCore::new(
230            group.compaction_config,
231            Arc::new(CompactionDeveloperConfig::default()),
232        );
233        let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers);
234        ctx.score_levels
235    }
236}
237
238impl HummockManager {
239    pub fn compaction_event_loop(
240        hummock_manager: Arc<Self>,
241        compactor_streams_change_rx: UnboundedReceiver<(
242            HummockContextId,
243            Streaming<SubscribeCompactionEventRequest>,
244        )>,
245    ) -> Vec<(JoinHandle<()>, Sender<()>)> {
246        let mut join_handle_vec = Vec::default();
247
248        let hummock_compaction_event_handler =
249            HummockCompactionEventHandler::new(hummock_manager.clone());
250
251        let dedicated_event_loop = HummockCompactorDedicatedEventLoop::new(
252            hummock_manager.clone(),
253            hummock_compaction_event_handler.clone(),
254        );
255
256        let (dedicated_event_loop_join_handle, event_tx, shutdown_tx) = dedicated_event_loop.run();
257        join_handle_vec.push((dedicated_event_loop_join_handle, shutdown_tx));
258
259        let hummock_compaction_event_dispatcher = HummockCompactionEventDispatcher::new(
260            hummock_manager.env.opts.clone(),
261            hummock_compaction_event_handler,
262            Some(event_tx),
263        );
264
265        let event_loop = HummockCompactionEventLoop::new(
266            hummock_compaction_event_dispatcher,
267            hummock_manager.metrics.clone(),
268            compactor_streams_change_rx,
269        );
270
271        let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
272        join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
273
274        join_handle_vec
275    }
276
277    pub fn add_compactor_stream(
278        &self,
279        context_id: HummockContextId,
280        req_stream: Streaming<SubscribeCompactionEventRequest>,
281    ) {
282        self.compactor_streams_change_tx
283            .send((context_id, req_stream))
284            .unwrap();
285    }
286
287    pub async fn auto_pick_compaction_group_and_type(
288        &self,
289    ) -> Option<(CompactionGroupId, compact_task::TaskType)> {
290        let mut compaction_group_ids = self.compaction_group_ids().await;
291        compaction_group_ids.shuffle(&mut thread_rng());
292
293        for cg_id in compaction_group_ids {
294            if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) {
295                return Some((cg_id, pick_type));
296            }
297        }
298
299        None
300    }
301
302    /// This method will return all compaction group id in a random order and task type. If there are any group block by `write_limit`, it will return a single array with `TaskType::Emergency`.
303    /// If these groups get different task-type, it will return all group id with `TaskType::Dynamic` if the first group get `TaskType::Dynamic`, otherwise it will return the single group with other task type.
304    async fn auto_pick_compaction_groups_and_type(
305        &self,
306    ) -> (Vec<CompactionGroupId>, compact_task::TaskType) {
307        let mut compaction_group_ids = self.compaction_group_ids().await;
308        compaction_group_ids.shuffle(&mut thread_rng());
309
310        let mut normal_groups = vec![];
311        for cg_id in compaction_group_ids {
312            if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) {
313                if pick_type == TaskType::Dynamic {
314                    normal_groups.push(cg_id);
315                } else if normal_groups.is_empty() {
316                    return (vec![cg_id], pick_type);
317                }
318            }
319        }
320        (normal_groups, TaskType::Dynamic)
321    }
322}
323
324impl HummockManager {
325    pub async fn get_compact_tasks_impl(
326        &self,
327        compaction_groups: Vec<CompactionGroupId>,
328        max_select_count: usize,
329        selector: &mut Box<dyn CompactionSelector>,
330    ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
331        let deterministic_mode = self.env.opts.compaction_deterministic_test;
332
333        let mut compaction_guard = self.compaction.write().await;
334        let compaction: &mut Compaction = &mut compaction_guard;
335        let mut versioning_guard = self.versioning.write().await;
336        let versioning: &mut Versioning = &mut versioning_guard;
337
338        let _timer = start_measure_real_process_timer!(self, "get_compact_tasks_impl");
339
340        let start_time = Instant::now();
341        let mut compaction_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
342
343        let mut compact_task_assignment =
344            BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
345
346        let mut version = HummockVersionTransaction::new(
347            &mut versioning.current_version,
348            &mut versioning.hummock_version_deltas,
349            self.env.notification_manager(),
350            None,
351            &self.metrics,
352        );
353        // Apply stats changes.
354        let mut version_stats = HummockVersionStatsTransaction::new(
355            &mut versioning.version_stats,
356            self.env.notification_manager(),
357        );
358
359        if deterministic_mode {
360            version.disable_apply_to_txn();
361        }
362        let all_versioned_table_schemas = if self.env.opts.enable_dropped_column_reclaim {
363            self.metadata_manager
364                .catalog_controller
365                .get_versioned_table_schemas()
366                .await
367                .map_err(|e| Error::Internal(e.into()))?
368        } else {
369            HashMap::default()
370        };
371        let mut unschedule_groups = vec![];
372        let mut trivial_tasks = vec![];
373        let mut pick_tasks = vec![];
374        let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts(
375            &self.env.opts,
376        ));
377        'outside: for compaction_group_id in compaction_groups {
378            if pick_tasks.len() >= max_select_count {
379                break;
380            }
381
382            if !version
383                .latest_version()
384                .levels
385                .contains_key(&compaction_group_id)
386            {
387                continue;
388            }
389
390            // When the last table of a compaction group is deleted, the compaction group (and its
391            // config) is destroyed as well. Then a compaction task for this group may come later and
392            // cannot find its config.
393            let group_config = {
394                let config_manager = self.compaction_group_manager.read().await;
395
396                match config_manager.try_get_compaction_group_config(compaction_group_id) {
397                    Some(config) => config,
398                    None => continue,
399                }
400            };
401
402            // StoredIdGenerator already implements ids pre-allocation by ID_PREALLOCATE_INTERVAL.
403            let task_id = next_compaction_task_id(&self.env).await?;
404
405            if !compaction_statuses.contains_key(&compaction_group_id) {
406                // lazy initialize.
407                compaction_statuses.insert(
408                    compaction_group_id,
409                    CompactStatus::new(
410                        compaction_group_id,
411                        group_config.compaction_config.max_level,
412                    ),
413                );
414            }
415            let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap();
416
417            let can_trivial_move = matches!(selector.task_type(), TaskType::Dynamic)
418                || matches!(selector.task_type(), TaskType::Emergency);
419
420            let mut stats = LocalSelectorStatistic::default();
421            let member_table_ids: Vec<_> = version
422                .latest_version()
423                .state_table_info
424                .compaction_group_member_table_ids(compaction_group_id)
425                .iter()
426                .copied()
427                .collect();
428
429            let mut table_id_to_option: HashMap<TableId, _> = HashMap::default();
430
431            {
432                let guard = self.table_id_to_table_option.read();
433                for table_id in &member_table_ids {
434                    if let Some(opts) = guard.get(table_id) {
435                        table_id_to_option.insert(*table_id, *opts);
436                    }
437                }
438            }
439
440            while let Some(compact_task) = compact_status.get_compact_task(
441                version
442                    .latest_version()
443                    .get_compaction_group_levels(compaction_group_id),
444                version
445                    .latest_version()
446                    .state_table_info
447                    .compaction_group_member_table_ids(compaction_group_id),
448                task_id as HummockCompactionTaskId,
449                &group_config,
450                &mut stats,
451                selector,
452                &table_id_to_option,
453                developer_config.clone(),
454                &version.latest_version().table_watermarks,
455                &version.latest_version().state_table_info,
456            ) {
457                let target_level_id = compact_task.input.target_level as u32;
458                let compaction_group_version_id = version
459                    .latest_version()
460                    .get_compaction_group_levels(compaction_group_id)
461                    .compaction_group_version_id;
462                let compression_algorithm = match compact_task.compression_algorithm.as_str() {
463                    "Lz4" => 1,
464                    "Zstd" => 2,
465                    _ => 0,
466                };
467                let vnode_partition_count = compact_task.input.vnode_partition_count;
468                let mut compact_task = CompactTask {
469                    input_ssts: compact_task.input.input_levels,
470                    splits: vec![KeyRange::inf()],
471                    sorted_output_ssts: vec![],
472                    task_id,
473                    target_level: target_level_id,
474                    // only gc delete keys in last level because there may be older version in more bottom
475                    // level.
476                    gc_delete_keys: version
477                        .latest_version()
478                        .get_compaction_group_levels(compaction_group_id)
479                        .is_last_level(target_level_id),
480                    base_level: compact_task.base_level as u32,
481                    task_status: TaskStatus::Pending,
482                    compaction_group_id: group_config.group_id,
483                    compaction_group_version_id,
484                    existing_table_ids: member_table_ids.clone(),
485                    compression_algorithm,
486                    target_file_size: compact_task.target_file_size,
487                    table_options: table_id_to_option
488                        .iter()
489                        .map(|(table_id, table_option)| {
490                            (*table_id, TableOption::from(table_option))
491                        })
492                        .collect(),
493                    current_epoch_time: Epoch::now().0,
494                    compaction_filter_mask: group_config.compaction_config.compaction_filter_mask,
495                    target_sub_level_id: compact_task.input.target_sub_level_id,
496                    task_type: compact_task.compaction_task_type,
497                    split_weight_by_vnode: vnode_partition_count,
498                    max_sub_compaction: group_config.compaction_config.max_sub_compaction,
499                    max_kv_count_for_xor16: group_config.compaction_config.max_kv_count_for_xor16,
500                    ..Default::default()
501                };
502
503                let is_trivial_reclaim = compact_task.is_trivial_reclaim();
504                let is_trivial_move = compact_task.is_trivial_move_task();
505                if is_trivial_reclaim || (is_trivial_move && can_trivial_move) {
506                    let log_label = if is_trivial_reclaim {
507                        "TrivialReclaim"
508                    } else {
509                        "TrivialMove"
510                    };
511                    let label = if is_trivial_reclaim {
512                        "trivial-space-reclaim"
513                    } else {
514                        "trivial-move"
515                    };
516
517                    tracing::debug!(
518                        "{} for compaction group {}: input: {:?}, cost time: {:?}",
519                        log_label,
520                        compact_task.compaction_group_id,
521                        compact_task.input_ssts,
522                        start_time.elapsed()
523                    );
524                    compact_task.task_status = TaskStatus::Success;
525                    compact_status.report_compact_task(&compact_task);
526                    if !is_trivial_reclaim {
527                        compact_task
528                            .sorted_output_ssts
529                            .clone_from(&compact_task.input_ssts[0].table_infos);
530                    }
531                    update_table_stats_for_vnode_watermark_trivial_reclaim(
532                        &mut version_stats.table_stats,
533                        &compact_task,
534                    );
535                    self.metrics
536                        .compact_frequency
537                        .with_label_values(&[
538                            label,
539                            &compact_task.compaction_group_id.to_string(),
540                            selector.task_type().as_str_name(),
541                            "SUCCESS",
542                        ])
543                        .inc();
544
545                    version.apply_compact_task(&compact_task);
546                    trivial_tasks.push(compact_task);
547                    if trivial_tasks.len() >= self.env.opts.max_trivial_move_task_count_per_loop {
548                        break 'outside;
549                    }
550                } else {
551                    self.calculate_vnode_partition(
552                        &mut compact_task,
553                        group_config.compaction_config.as_ref(),
554                        version
555                            .latest_version()
556                            .get_compaction_group_levels(compaction_group_id),
557                    )
558                    .await?;
559
560                    let table_ids_to_be_compacted = compact_task.build_compact_table_ids();
561
562                    let mut pk_prefix_table_watermarks = BTreeMap::default();
563                    let mut non_pk_prefix_table_watermarks = BTreeMap::default();
564                    let mut value_table_watermarks = BTreeMap::default();
565                    for (table_id, watermark) in version
566                        .latest_version()
567                        .safe_epoch_table_watermarks(&table_ids_to_be_compacted)
568                    {
569                        match watermark.watermark_type {
570                            WatermarkSerdeType::PkPrefix => {
571                                pk_prefix_table_watermarks.insert(table_id, watermark);
572                            }
573                            WatermarkSerdeType::NonPkPrefix => {
574                                non_pk_prefix_table_watermarks.insert(table_id, watermark);
575                            }
576                            WatermarkSerdeType::Value => {
577                                value_table_watermarks.insert(table_id, watermark);
578                            }
579                        }
580                    }
581                    compact_task.pk_prefix_table_watermarks = pk_prefix_table_watermarks;
582                    compact_task.non_pk_prefix_table_watermarks = non_pk_prefix_table_watermarks;
583                    compact_task.value_table_watermarks = value_table_watermarks;
584
585                    compact_task.table_schemas = compact_task
586                        .existing_table_ids
587                        .iter()
588                        .filter_map(|table_id| {
589                            all_versioned_table_schemas.get(table_id).map(|column_ids| {
590                                (
591                                    *table_id,
592                                    TableSchema {
593                                        column_ids: column_ids.clone(),
594                                    },
595                                )
596                            })
597                        })
598                        .collect();
599
600                    compact_task_assignment.insert(
601                        compact_task.task_id,
602                        CompactTaskAssignment {
603                            compact_task: Some(compact_task.clone().into()),
604                            context_id: META_NODE_ID, // deprecated
605                        },
606                    );
607
608                    pick_tasks.push(compact_task);
609                    break;
610                }
611
612                stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
613                stats = LocalSelectorStatistic::default();
614            }
615            if pick_tasks
616                .last()
617                .map(|task| task.compaction_group_id != compaction_group_id)
618                .unwrap_or(true)
619            {
620                unschedule_groups.push(compaction_group_id);
621            }
622            stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
623        }
624
625        if !trivial_tasks.is_empty() {
626            commit_multi_var!(
627                self.meta_store_ref(),
628                compaction_statuses,
629                compact_task_assignment,
630                version,
631                version_stats
632            )?;
633            self.metrics
634                .compact_task_batch_count
635                .with_label_values(&["batch_trivial_move"])
636                .observe(trivial_tasks.len() as f64);
637
638            for trivial_task in &trivial_tasks {
639                self.metrics
640                    .compact_task_trivial_move_sst_count
641                    .with_label_values(&[&trivial_task.compaction_group_id.to_string()])
642                    .observe(trivial_task.input_ssts[0].table_infos.len() as _);
643            }
644
645            drop(versioning_guard);
646        } else {
647            // We are using a single transaction to ensure that each task has progress when it is
648            // created.
649            drop(versioning_guard);
650            commit_multi_var!(
651                self.meta_store_ref(),
652                compaction_statuses,
653                compact_task_assignment
654            )?;
655        }
656        drop(compaction_guard);
657        if !pick_tasks.is_empty() {
658            self.metrics
659                .compact_task_batch_count
660                .with_label_values(&["batch_get_compact_task"])
661                .observe(pick_tasks.len() as f64);
662        }
663
664        for compact_task in &mut pick_tasks {
665            let compaction_group_id = compact_task.compaction_group_id;
666
667            // Initiate heartbeat for the task to track its progress.
668            self.compactor_manager
669                .initiate_task_heartbeat(compact_task.clone());
670
671            // this task has been finished.
672            compact_task.task_status = TaskStatus::Pending;
673            let compact_task_statistics = statistics_compact_task(compact_task);
674
675            let level_type_label = build_compact_task_level_type_metrics_label(
676                compact_task.input_ssts[0].level_idx as usize,
677                compact_task.input_ssts.last().unwrap().level_idx as usize,
678            );
679
680            let level_count = compact_task.input_ssts.len();
681            if compact_task.input_ssts[0].level_idx == 0 {
682                self.metrics
683                    .l0_compact_level_count
684                    .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
685                    .observe(level_count as _);
686            }
687
688            self.metrics
689                .compact_task_size
690                .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
691                .observe(compact_task_statistics.total_file_size as _);
692
693            self.metrics
694                .compact_task_size
695                .with_label_values(&[
696                    &compaction_group_id.to_string(),
697                    &format!("{} uncompressed", level_type_label),
698                ])
699                .observe(compact_task_statistics.total_uncompressed_file_size as _);
700
701            self.metrics
702                .compact_task_file_count
703                .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
704                .observe(compact_task_statistics.total_file_count as _);
705
706            tracing::trace!(
707                "For compaction group {}: pick up {} {} sub_level in level {} to compact to target {}. cost time: {:?} compact_task_statistics {:?}",
708                compaction_group_id,
709                level_count,
710                compact_task.input_ssts[0].level_type.as_str_name(),
711                compact_task.input_ssts[0].level_idx,
712                compact_task.target_level,
713                start_time.elapsed(),
714                compact_task_statistics
715            );
716        }
717
718        #[cfg(test)]
719        {
720            self.check_state_consistency().await;
721        }
722        pick_tasks.extend(trivial_tasks);
723        Ok((pick_tasks, unschedule_groups))
724    }
725
726    /// Cancels a compaction task no matter it's assigned or unassigned.
727    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
728        fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore(
729            anyhow::anyhow!("failpoint metastore err")
730        )));
731        let ret = self
732            .cancel_compact_task_impl(vec![task_id], task_status)
733            .await?;
734        Ok(ret[0])
735    }
736
737    pub async fn cancel_compact_tasks(
738        &self,
739        tasks: Vec<u64>,
740        task_status: TaskStatus,
741    ) -> Result<Vec<bool>> {
742        self.cancel_compact_task_impl(tasks, task_status).await
743    }
744
745    async fn cancel_compact_task_impl(
746        &self,
747        task_ids: Vec<u64>,
748        task_status: TaskStatus,
749    ) -> Result<Vec<bool>> {
750        assert!(CANCEL_STATUS_SET.contains(&task_status));
751        let tasks = task_ids
752            .into_iter()
753            .map(|task_id| ReportTask {
754                task_id,
755                task_status,
756                sorted_output_ssts: vec![],
757                table_stats_change: HashMap::default(),
758                object_timestamps: HashMap::default(),
759            })
760            .collect_vec();
761        let rets = self.report_compact_tasks(tasks).await?;
762        #[cfg(test)]
763        {
764            self.check_state_consistency().await;
765        }
766        Ok(rets)
767    }
768
769    async fn get_compact_tasks(
770        &self,
771        mut compaction_groups: Vec<CompactionGroupId>,
772        max_select_count: usize,
773        selector: &mut Box<dyn CompactionSelector>,
774    ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
775        fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
776            anyhow::anyhow!("failpoint metastore error")
777        )));
778        compaction_groups.shuffle(&mut thread_rng());
779        let (mut tasks, groups) = self
780            .get_compact_tasks_impl(compaction_groups, max_select_count, selector)
781            .await?;
782        tasks.retain(|task| {
783            if task.task_status == TaskStatus::Success {
784                debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
785                false
786            } else {
787                true
788            }
789        });
790        Ok((tasks, groups))
791    }
792
793    pub async fn get_compact_task(
794        &self,
795        compaction_group_id: CompactionGroupId,
796        selector: &mut Box<dyn CompactionSelector>,
797    ) -> Result<Option<CompactTask>> {
798        fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
799            anyhow::anyhow!("failpoint metastore error")
800        )));
801
802        let (normal_tasks, _) = self
803            .get_compact_tasks_impl(vec![compaction_group_id], 1, selector)
804            .await?;
805        for task in normal_tasks {
806            if task.task_status != TaskStatus::Success {
807                return Ok(Some(task));
808            }
809            debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
810        }
811        Ok(None)
812    }
813
814    pub async fn manual_get_compact_task(
815        &self,
816        compaction_group_id: CompactionGroupId,
817        manual_compaction_option: ManualCompactionOption,
818    ) -> Result<Option<CompactTask>> {
819        let mut selector: Box<dyn CompactionSelector> =
820            Box::new(ManualCompactionSelector::new(manual_compaction_option));
821        self.get_compact_task(compaction_group_id, &mut selector)
822            .await
823    }
824
825    pub async fn report_compact_task(
826        &self,
827        task_id: u64,
828        task_status: TaskStatus,
829        sorted_output_ssts: Vec<SstableInfo>,
830        table_stats_change: Option<PbTableStatsMap>,
831        object_timestamps: HashMap<HummockSstableObjectId, u64>,
832    ) -> Result<bool> {
833        let rets = self
834            .report_compact_tasks(vec![ReportTask {
835                task_id,
836                task_status,
837                sorted_output_ssts,
838                table_stats_change: table_stats_change.unwrap_or_default(),
839                object_timestamps,
840            }])
841            .await?;
842        Ok(rets[0])
843    }
844
845    pub async fn report_compact_tasks(&self, report_tasks: Vec<ReportTask>) -> Result<Vec<bool>> {
846        let compaction_guard = self.compaction.write().await;
847        let versioning_guard = self.versioning.write().await;
848
849        self.report_compact_tasks_impl(report_tasks, compaction_guard, versioning_guard)
850            .await
851    }
852
853    /// Finishes or cancels a compaction task, according to `task_status`.
854    ///
855    /// If `context_id` is not None, its validity will be checked when writing meta store.
856    /// Its ownership of the task is checked as well.
857    ///
858    /// Return Ok(false) indicates either the task is not found,
859    /// or the task is not owned by `context_id` when `context_id` is not None.
860    pub async fn report_compact_tasks_impl(
861        &self,
862        report_tasks: Vec<ReportTask>,
863        mut compaction_guard: RwLockWriteGuard<'_, Compaction>,
864        mut versioning_guard: RwLockWriteGuard<'_, Versioning>,
865    ) -> Result<Vec<bool>> {
866        let deterministic_mode = self.env.opts.compaction_deterministic_test;
867        let compaction: &mut Compaction = &mut compaction_guard;
868        let start_time = Instant::now();
869        let original_keys = compaction.compaction_statuses.keys().cloned().collect_vec();
870        let mut compact_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
871        let mut rets = vec![false; report_tasks.len()];
872        let mut compact_task_assignment =
873            BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
874        // The compaction task is finished.
875        let versioning: &mut Versioning = &mut versioning_guard;
876        let _timer = start_measure_real_process_timer!(self, "report_compact_tasks");
877
878        // purge stale compact_status
879        for group_id in original_keys {
880            if !versioning.current_version.levels.contains_key(&group_id) {
881                compact_statuses.remove(group_id);
882            }
883        }
884        let mut tasks = vec![];
885
886        let mut version = HummockVersionTransaction::new(
887            &mut versioning.current_version,
888            &mut versioning.hummock_version_deltas,
889            self.env.notification_manager(),
890            None,
891            &self.metrics,
892        );
893
894        if deterministic_mode {
895            version.disable_apply_to_txn();
896        }
897
898        let mut version_stats = HummockVersionStatsTransaction::new(
899            &mut versioning.version_stats,
900            self.env.notification_manager(),
901        );
902        let mut success_count = 0;
903        for (idx, task) in report_tasks.into_iter().enumerate() {
904            rets[idx] = true;
905            let mut compact_task = match compact_task_assignment.remove(task.task_id) {
906                Some(compact_task) => CompactTask::from(compact_task.compact_task.unwrap()),
907                None => {
908                    tracing::warn!("{}", format!("compact task {} not found", task.task_id));
909                    rets[idx] = false;
910                    continue;
911                }
912            };
913
914            {
915                // apply result
916                compact_task.task_status = task.task_status;
917                compact_task.sorted_output_ssts = task.sorted_output_ssts;
918            }
919
920            match compact_statuses.get_mut(compact_task.compaction_group_id) {
921                Some(mut compact_status) => {
922                    compact_status.report_compact_task(&compact_task);
923                }
924                None => {
925                    // When the group_id is not found in the compaction_statuses, it means the group has been removed.
926                    // The task is invalid and should be canceled.
927                    // e.g.
928                    // 1. The group is removed by the user unregistering the tables
929                    // 2. The group is removed by the group scheduling algorithm
930                    compact_task.task_status = TaskStatus::InvalidGroupCanceled;
931                }
932            }
933
934            let is_success = if let TaskStatus::Success = compact_task.task_status {
935                match self
936                    .report_compaction_sanity_check(&task.object_timestamps)
937                    .await
938                {
939                    Err(e) => {
940                        warn!(
941                            "failed to commit compaction task {} {}",
942                            compact_task.task_id,
943                            e.as_report()
944                        );
945                        compact_task.task_status = TaskStatus::RetentionTimeRejected;
946                        false
947                    }
948                    _ => {
949                        let group = version
950                            .latest_version()
951                            .levels
952                            .get(&compact_task.compaction_group_id)
953                            .unwrap();
954                        let is_expired = compact_task.is_expired(group.compaction_group_version_id);
955                        if is_expired {
956                            compact_task.task_status = TaskStatus::InputOutdatedCanceled;
957                            warn!(
958                                "The task may be expired because of group split, task:\n {:?}",
959                                compact_task_to_string(&compact_task)
960                            );
961                        }
962                        !is_expired
963                    }
964                }
965            } else {
966                false
967            };
968            if is_success {
969                success_count += 1;
970                version.apply_compact_task(&compact_task);
971                if purge_prost_table_stats(
972                    &mut version_stats.table_stats,
973                    version.latest_version(),
974                    &HashSet::default(),
975                ) {
976                    self.metrics.version_stats.reset();
977                    versioning.local_metrics.clear();
978                }
979                add_prost_table_stats_map(&mut version_stats.table_stats, &task.table_stats_change);
980                trigger_local_table_stat(
981                    &self.metrics,
982                    &mut versioning.local_metrics,
983                    &version_stats,
984                    &task.table_stats_change,
985                );
986            }
987            tasks.push(compact_task);
988        }
989        if success_count > 0 {
990            commit_multi_var!(
991                self.meta_store_ref(),
992                compact_statuses,
993                compact_task_assignment,
994                version,
995                version_stats
996            )?;
997
998            self.metrics
999                .compact_task_batch_count
1000                .with_label_values(&["batch_report_task"])
1001                .observe(success_count as f64);
1002        } else {
1003            // The compaction task is cancelled or failed.
1004            commit_multi_var!(
1005                self.meta_store_ref(),
1006                compact_statuses,
1007                compact_task_assignment
1008            )?;
1009        }
1010
1011        let mut success_groups = vec![];
1012        for compact_task in &tasks {
1013            self.compactor_manager
1014                .remove_task_heartbeat(compact_task.task_id);
1015            tracing::trace!(
1016                "Reported compaction task. {}. cost time: {:?}",
1017                compact_task_to_string(compact_task),
1018                start_time.elapsed(),
1019            );
1020
1021            if !deterministic_mode
1022                && (matches!(compact_task.task_type, compact_task::TaskType::Dynamic)
1023                    || matches!(compact_task.task_type, compact_task::TaskType::Emergency))
1024            {
1025                // only try send Dynamic compaction
1026                self.try_send_compaction_request(
1027                    compact_task.compaction_group_id,
1028                    compact_task::TaskType::Dynamic,
1029                );
1030            }
1031
1032            if compact_task.task_status == TaskStatus::Success {
1033                success_groups.push(compact_task.compaction_group_id);
1034            }
1035        }
1036
1037        trigger_compact_tasks_stat(
1038            &self.metrics,
1039            &tasks,
1040            &compaction.compaction_statuses,
1041            &versioning_guard.current_version,
1042        );
1043        drop(versioning_guard);
1044        if !success_groups.is_empty() {
1045            self.try_update_write_limits(&success_groups).await;
1046        }
1047        Ok(rets)
1048    }
1049
1050    /// Triggers compacitons to specified compaction groups.
1051    /// Don't wait for compaction finish
1052    pub async fn trigger_compaction_deterministic(
1053        &self,
1054        _base_version_id: HummockVersionId,
1055        compaction_groups: Vec<CompactionGroupId>,
1056    ) -> Result<()> {
1057        self.on_current_version(|old_version| {
1058            tracing::info!(
1059                "Trigger compaction for version {}, groups {:?}",
1060                old_version.id,
1061                compaction_groups
1062            );
1063        })
1064        .await;
1065
1066        if compaction_groups.is_empty() {
1067            return Ok(());
1068        }
1069        for compaction_group in compaction_groups {
1070            self.try_send_compaction_request(compaction_group, compact_task::TaskType::Dynamic);
1071        }
1072        Ok(())
1073    }
1074
1075    pub async fn trigger_manual_compaction(
1076        &self,
1077        compaction_group: CompactionGroupId,
1078        manual_compaction_option: ManualCompactionOption,
1079    ) -> Result<()> {
1080        let start_time = Instant::now();
1081
1082        // 1. Get idle compactor.
1083        let compactor = match self.compactor_manager.next_compactor() {
1084            Some(compactor) => compactor,
1085            None => {
1086                tracing::warn!("trigger_manual_compaction No compactor is available.");
1087                return Err(anyhow::anyhow!(
1088                    "trigger_manual_compaction No compactor is available. compaction_group {}",
1089                    compaction_group
1090                )
1091                .into());
1092            }
1093        };
1094
1095        // 2. Get manual compaction task.
1096        let compact_task = self
1097            .manual_get_compact_task(compaction_group, manual_compaction_option)
1098            .await;
1099        let compact_task = match compact_task {
1100            Ok(Some(compact_task)) => compact_task,
1101            Ok(None) => {
1102                // No compaction task available.
1103                return Err(anyhow::anyhow!(
1104                    "trigger_manual_compaction No compaction_task is available. compaction_group {}",
1105                    compaction_group
1106                )
1107                    .into());
1108            }
1109            Err(err) => {
1110                tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
1111
1112                return Err(anyhow::anyhow!(err)
1113                    .context(format!(
1114                        "Failed to get compaction task for compaction_group {}",
1115                        compaction_group,
1116                    ))
1117                    .into());
1118            }
1119        };
1120
1121        // 3. send task to compactor
1122        let compact_task_string = compact_task_to_string(&compact_task);
1123        // TODO: shall we need to cancel on meta ?
1124        compactor
1125            .send_event(ResponseEvent::CompactTask(compact_task.into()))
1126            .with_context(|| {
1127                format!(
1128                    "Failed to trigger compaction task for compaction_group {}",
1129                    compaction_group,
1130                )
1131            })?;
1132
1133        tracing::info!(
1134            "Trigger manual compaction task. {}. cost time: {:?}",
1135            &compact_task_string,
1136            start_time.elapsed(),
1137        );
1138
1139        Ok(())
1140    }
1141
1142    /// Sends a compaction request.
1143    pub fn try_send_compaction_request(
1144        &self,
1145        compaction_group: CompactionGroupId,
1146        task_type: compact_task::TaskType,
1147    ) -> bool {
1148        match self
1149            .compaction_state
1150            .try_sched_compaction(compaction_group, task_type)
1151        {
1152            Ok(_) => true,
1153            Err(e) => {
1154                tracing::error!(
1155                    error = %e.as_report(),
1156                    "failed to send compaction request for compaction group {}",
1157                    compaction_group,
1158                );
1159                false
1160            }
1161        }
1162    }
1163
1164    /// Apply vnode-aligned compaction for large single-table levels.
1165    /// This enables one-vnode-per-SST alignment for precise query pruning.
1166    async fn try_apply_vnode_aligned_partition(
1167        &self,
1168        compact_task: &mut CompactTask,
1169        compaction_config: &CompactionConfig,
1170        levels: &Levels,
1171    ) -> Result<bool> {
1172        // Check if vnode-aligned compaction is enabled for this level
1173        // Only enable for single-table scenarios to avoid cross-table complexity
1174        let Some(threshold) = compaction_config.vnode_aligned_level_size_threshold else {
1175            return Ok(false);
1176        };
1177
1178        if compact_task.target_level < compact_task.base_level
1179            || compact_task.existing_table_ids.len() != 1
1180        {
1181            return Ok(false);
1182        }
1183
1184        // Calculate total size of the entire target level
1185        let target_level_size = levels
1186            .get_level(compact_task.target_level as usize)
1187            .total_file_size;
1188
1189        if target_level_size < threshold {
1190            return Ok(false);
1191        }
1192
1193        // Enable strict one-vnode-per-SST alignment for single table
1194        let table_id = compact_task.existing_table_ids[0];
1195
1196        // Get the actual vnode count from table catalog
1197        let table = self
1198            .metadata_manager
1199            .get_table_catalog_by_ids(&[table_id])
1200            .await
1201            .with_context(|| {
1202                format!(
1203                    "Failed to get table catalog for table_id {} in compaction_group {}",
1204                    table_id, compact_task.compaction_group_id
1205                )
1206            })
1207            .map_err(Error::Internal)?
1208            .into_iter()
1209            .next()
1210            .ok_or_else(|| {
1211                Error::Internal(anyhow::anyhow!(
1212                    "Table catalog not found for table_id {} in compaction_group {}",
1213                    table_id,
1214                    compact_task.compaction_group_id
1215                ))
1216            })?;
1217
1218        compact_task
1219            .table_vnode_partition
1220            .insert(table_id, table.vnode_count() as u32);
1221
1222        Ok(true)
1223    }
1224
1225    /// Apply `split_weight_by_vnode` based partition strategy.
1226    /// This handles dynamic partitioning based on table size and write throughput.
1227    fn apply_split_weight_by_vnode_partition(
1228        &self,
1229        compact_task: &mut CompactTask,
1230        compaction_config: &CompactionConfig,
1231    ) {
1232        if compaction_config.split_weight_by_vnode > 0 {
1233            for table_id in &compact_task.existing_table_ids {
1234                compact_task
1235                    .table_vnode_partition
1236                    .insert(*table_id, compact_task.split_weight_by_vnode);
1237            }
1238
1239            return;
1240        }
1241
1242        // Calculate per-table size from input SSTs
1243        let mut table_size_info: HashMap<TableId, u64> = HashMap::default();
1244        let mut existing_table_ids: HashSet<TableId> = HashSet::default();
1245        for input_ssts in &compact_task.input_ssts {
1246            for sst in &input_ssts.table_infos {
1247                existing_table_ids.extend(sst.table_ids.iter());
1248                for table_id in &sst.table_ids {
1249                    *table_size_info.entry(*table_id).or_default() +=
1250                        sst.sst_size / (sst.table_ids.len() as u64);
1251                }
1252            }
1253        }
1254        compact_task
1255            .existing_table_ids
1256            .retain(|table_id| existing_table_ids.contains(table_id));
1257
1258        let hybrid_vnode_count = self.env.opts.hybrid_partition_node_count;
1259        let default_partition_count = self.env.opts.partition_vnode_count;
1260        let compact_task_table_size_partition_threshold_low = self
1261            .env
1262            .opts
1263            .compact_task_table_size_partition_threshold_low;
1264        let compact_task_table_size_partition_threshold_high = self
1265            .env
1266            .opts
1267            .compact_task_table_size_partition_threshold_high;
1268
1269        // Check latest write throughput
1270        let table_write_throughput_statistic_manager =
1271            self.table_write_throughput_statistic_manager.read();
1272        let timestamp = chrono::Utc::now().timestamp();
1273
1274        for (table_id, compact_table_size) in table_size_info {
1275            let write_throughput = table_write_throughput_statistic_manager
1276                .get_table_throughput_descending(table_id, timestamp)
1277                .peekable()
1278                .peek()
1279                .map(|item| item.throughput)
1280                .unwrap_or(0);
1281
1282            if compact_table_size > compact_task_table_size_partition_threshold_high
1283                && default_partition_count > 0
1284            {
1285                compact_task
1286                    .table_vnode_partition
1287                    .insert(table_id, default_partition_count);
1288            } else if (compact_table_size > compact_task_table_size_partition_threshold_low
1289                || (write_throughput > self.env.opts.table_high_write_throughput_threshold
1290                    && compact_table_size > compaction_config.target_file_size_base))
1291                && hybrid_vnode_count > 0
1292            {
1293                compact_task
1294                    .table_vnode_partition
1295                    .insert(table_id, hybrid_vnode_count);
1296            } else if compact_table_size > compaction_config.target_file_size_base {
1297                compact_task.table_vnode_partition.insert(table_id, 1);
1298            }
1299        }
1300
1301        compact_task
1302            .table_vnode_partition
1303            .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id));
1304    }
1305
1306    pub(crate) async fn calculate_vnode_partition(
1307        &self,
1308        compact_task: &mut CompactTask,
1309        compaction_config: &CompactionConfig,
1310        levels: &Levels,
1311    ) -> Result<()> {
1312        // Try vnode-aligned partition first (for large single-table levels)
1313        if self
1314            .try_apply_vnode_aligned_partition(compact_task, compaction_config, levels)
1315            .await?
1316        {
1317            return Ok(());
1318        }
1319
1320        // Do not split sst by vnode partition when target_level > base_level
1321        // The purpose of data alignment is mainly to improve the parallelism of base level compaction
1322        // and reduce write amplification. However, at high level, the size of the sst file is often
1323        // larger and only contains the data of a single table_id, so there is no need to cut it.
1324        if compact_task.target_level > compact_task.base_level {
1325            return Ok(());
1326        }
1327
1328        // Apply split_weight_by_vnode based partition strategy
1329        self.apply_split_weight_by_vnode_partition(compact_task, compaction_config);
1330
1331        Ok(())
1332    }
1333
1334    pub fn compactor_manager_ref(&self) -> crate::hummock::CompactorManagerRef {
1335        self.compactor_manager.clone()
1336    }
1337}
1338
1339#[cfg(any(test, feature = "test"))]
1340impl HummockManager {
1341    pub async fn compaction_task_from_assignment_for_test(
1342        &self,
1343        task_id: u64,
1344    ) -> Option<CompactTaskAssignment> {
1345        let compaction_guard = self.compaction.read().await;
1346        let assignment_ref = &compaction_guard.compact_task_assignment;
1347        assignment_ref.get(&task_id).cloned()
1348    }
1349
1350    pub async fn report_compact_task_for_test(
1351        &self,
1352        task_id: u64,
1353        compact_task: Option<CompactTask>,
1354        task_status: TaskStatus,
1355        sorted_output_ssts: Vec<SstableInfo>,
1356        table_stats_change: Option<PbTableStatsMap>,
1357    ) -> Result<()> {
1358        if let Some(task) = compact_task {
1359            let mut guard = self.compaction.write().await;
1360            guard.compact_task_assignment.insert(
1361                task_id,
1362                CompactTaskAssignment {
1363                    compact_task: Some(task.into()),
1364                    context_id: 0.into(),
1365                },
1366            );
1367        }
1368
1369        // In the test, the contents of the compact task may have been modified directly, while the contents of compact_task_assignment were not modified.
1370        // So we pass the modified compact_task directly into the `report_compact_task_impl`
1371        self.report_compact_tasks(vec![ReportTask {
1372            task_id,
1373            task_status,
1374            sorted_output_ssts,
1375            table_stats_change: table_stats_change.unwrap_or_default(),
1376            object_timestamps: HashMap::default(),
1377        }])
1378        .await?;
1379        Ok(())
1380    }
1381}
1382
1383#[derive(Debug, Default)]
1384pub struct CompactionState {
1385    scheduled: Mutex<HashSet<(CompactionGroupId, compact_task::TaskType)>>,
1386}
1387
1388impl CompactionState {
1389    pub fn new() -> Self {
1390        Self {
1391            scheduled: Default::default(),
1392        }
1393    }
1394
1395    /// Enqueues only if the target is not yet in queue.
1396    pub fn try_sched_compaction(
1397        &self,
1398        compaction_group: CompactionGroupId,
1399        task_type: TaskType,
1400    ) -> std::result::Result<bool, SendError<CompactionRequestChannelItem>> {
1401        let mut guard = self.scheduled.lock();
1402        let key = (compaction_group, task_type);
1403        if guard.contains(&key) {
1404            return Ok(false);
1405        }
1406        guard.insert(key);
1407        Ok(true)
1408    }
1409
1410    pub fn unschedule(
1411        &self,
1412        compaction_group: CompactionGroupId,
1413        task_type: compact_task::TaskType,
1414    ) {
1415        self.scheduled.lock().remove(&(compaction_group, task_type));
1416    }
1417
1418    pub fn auto_pick_type(&self, group: CompactionGroupId) -> Option<TaskType> {
1419        let guard = self.scheduled.lock();
1420        if guard.contains(&(group, compact_task::TaskType::Dynamic)) {
1421            Some(compact_task::TaskType::Dynamic)
1422        } else if guard.contains(&(group, compact_task::TaskType::SpaceReclaim)) {
1423            Some(compact_task::TaskType::SpaceReclaim)
1424        } else if guard.contains(&(group, compact_task::TaskType::Ttl)) {
1425            Some(compact_task::TaskType::Ttl)
1426        } else if guard.contains(&(group, compact_task::TaskType::Tombstone)) {
1427            Some(compact_task::TaskType::Tombstone)
1428        } else if guard.contains(&(group, compact_task::TaskType::VnodeWatermark)) {
1429            Some(compact_task::TaskType::VnodeWatermark)
1430        } else {
1431            None
1432        }
1433    }
1434}
1435
1436impl Compaction {
1437    pub fn get_compact_task_assignments_by_group_id(
1438        &self,
1439        compaction_group_id: CompactionGroupId,
1440    ) -> Vec<CompactTaskAssignment> {
1441        self.compact_task_assignment
1442            .iter()
1443            .filter_map(|(_, assignment)| {
1444                if assignment
1445                    .compact_task
1446                    .as_ref()
1447                    .is_some_and(|task| task.compaction_group_id == compaction_group_id)
1448                {
1449                    Some(CompactTaskAssignment {
1450                        compact_task: assignment.compact_task.clone(),
1451                        context_id: assignment.context_id,
1452                    })
1453                } else {
1454                    None
1455                }
1456            })
1457            .collect()
1458    }
1459}
1460
1461#[derive(Clone, Default)]
1462pub struct CompactionGroupStatistic {
1463    pub group_id: CompactionGroupId,
1464    pub group_size: u64,
1465    pub table_statistic: BTreeMap<StateTableId, u64>,
1466    pub compaction_group_config: CompactionGroup,
1467}
1468
1469/// Updates table stats caused by vnode watermark trivial reclaim compaction.
1470fn update_table_stats_for_vnode_watermark_trivial_reclaim(
1471    table_stats: &mut PbTableStatsMap,
1472    task: &CompactTask,
1473) {
1474    if task.task_type != TaskType::VnodeWatermark {
1475        return;
1476    }
1477    let mut deleted_table_keys: HashMap<TableId, u64> = HashMap::default();
1478    for s in task.input_ssts.iter().flat_map(|l| l.table_infos.iter()) {
1479        assert_eq!(s.table_ids.len(), 1);
1480        let e = deleted_table_keys.entry(s.table_ids[0]).or_insert(0);
1481        *e += s.total_key_count;
1482    }
1483    for (table_id, delete_count) in deleted_table_keys {
1484        let Some(stats) = table_stats.get_mut(&table_id) else {
1485            continue;
1486        };
1487        if stats.total_key_count == 0 {
1488            continue;
1489        }
1490        let new_total_key_count = stats.total_key_count.saturating_sub(delete_count as i64);
1491        let ratio = new_total_key_count as f64 / stats.total_key_count as f64;
1492        // total_key_count is updated accurately.
1493        stats.total_key_count = new_total_key_count;
1494        // others are updated approximately.
1495        stats.total_key_size = (stats.total_key_size as f64 * ratio).ceil() as i64;
1496        stats.total_value_size = (stats.total_value_size as f64 * ratio).ceil() as i64;
1497    }
1498}
1499
1500#[derive(Debug, Clone)]
1501pub enum GroupState {
1502    /// The compaction group is not in emergency state.
1503    Normal,
1504
1505    /// The compaction group is in emergency state.
1506    Emergency(String), // reason
1507
1508    /// The compaction group is in write stop state.
1509    WriteStop(String), // reason
1510}
1511
1512impl GroupState {
1513    pub fn is_write_stop(&self) -> bool {
1514        matches!(self, Self::WriteStop(_))
1515    }
1516
1517    pub fn is_emergency(&self) -> bool {
1518        matches!(self, Self::Emergency(_))
1519    }
1520
1521    pub fn reason(&self) -> Option<&str> {
1522        match self {
1523            Self::Emergency(reason) | Self::WriteStop(reason) => Some(reason),
1524            _ => None,
1525        }
1526    }
1527}
1528
1529#[derive(Clone, Default)]
1530pub struct GroupStateValidator;
1531
1532impl GroupStateValidator {
1533    pub fn write_stop_sub_level_count(
1534        level_count: usize,
1535        compaction_config: &CompactionConfig,
1536    ) -> bool {
1537        let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize;
1538        level_count > threshold
1539    }
1540
1541    pub fn write_stop_l0_size(l0_size: u64, compaction_config: &CompactionConfig) -> bool {
1542        l0_size
1543            > compaction_config
1544                .level0_stop_write_threshold_max_size
1545                .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1546    }
1547
1548    pub fn write_stop_l0_file_count(
1549        l0_file_count: usize,
1550        compaction_config: &CompactionConfig,
1551    ) -> bool {
1552        l0_file_count
1553            > compaction_config
1554                .level0_stop_write_threshold_max_sst_count
1555                .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1556                as usize
1557    }
1558
1559    pub fn emergency_l0_file_count(
1560        l0_file_count: usize,
1561        compaction_config: &CompactionConfig,
1562    ) -> bool {
1563        l0_file_count
1564            > compaction_config
1565                .emergency_level0_sst_file_count
1566                .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1567                as usize
1568    }
1569
1570    pub fn emergency_l0_partition_count(
1571        last_l0_sub_level_partition_count: usize,
1572        compaction_config: &CompactionConfig,
1573    ) -> bool {
1574        last_l0_sub_level_partition_count
1575            > compaction_config
1576                .emergency_level0_sub_level_partition
1577                .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1578                as usize
1579    }
1580
1581    pub fn check_single_group_write_stop(
1582        levels: &Levels,
1583        compaction_config: &CompactionConfig,
1584    ) -> GroupState {
1585        if Self::write_stop_sub_level_count(levels.l0.sub_levels.len(), compaction_config) {
1586            return GroupState::WriteStop(format!(
1587                "WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels",
1588                levels.l0.sub_levels.len(),
1589                compaction_config.level0_stop_write_threshold_sub_level_number
1590            ));
1591        }
1592
1593        if Self::write_stop_l0_file_count(
1594            levels
1595                .l0
1596                .sub_levels
1597                .iter()
1598                .map(|l| l.table_infos.len())
1599                .sum(),
1600            compaction_config,
1601        ) {
1602            return GroupState::WriteStop(format!(
1603                "WriteStop(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1604                levels
1605                    .l0
1606                    .sub_levels
1607                    .iter()
1608                    .map(|l| l.table_infos.len())
1609                    .sum::<usize>(),
1610                compaction_config
1611                    .level0_stop_write_threshold_max_sst_count
1612                    .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1613            ));
1614        }
1615
1616        if Self::write_stop_l0_size(levels.l0.total_file_size, compaction_config) {
1617            return GroupState::WriteStop(format!(
1618                "WriteStop(l0_size: {}, threshold: {}) too large L0 size",
1619                levels.l0.total_file_size,
1620                compaction_config
1621                    .level0_stop_write_threshold_max_size
1622                    .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1623            ));
1624        }
1625
1626        GroupState::Normal
1627    }
1628
1629    pub fn check_single_group_emergency(
1630        levels: &Levels,
1631        compaction_config: &CompactionConfig,
1632    ) -> GroupState {
1633        if Self::emergency_l0_file_count(
1634            levels
1635                .l0
1636                .sub_levels
1637                .iter()
1638                .map(|l| l.table_infos.len())
1639                .sum(),
1640            compaction_config,
1641        ) {
1642            return GroupState::Emergency(format!(
1643                "Emergency(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1644                levels
1645                    .l0
1646                    .sub_levels
1647                    .iter()
1648                    .map(|l| l.table_infos.len())
1649                    .sum::<usize>(),
1650                compaction_config
1651                    .emergency_level0_sst_file_count
1652                    .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1653            ));
1654        }
1655
1656        if Self::emergency_l0_partition_count(
1657            levels
1658                .l0
1659                .sub_levels
1660                .first()
1661                .map(|l| l.table_infos.len())
1662                .unwrap_or(0),
1663            compaction_config,
1664        ) {
1665            return GroupState::Emergency(format!(
1666                "Emergency(l0_partition_count: {}, threshold: {}) too many L0 partitions",
1667                levels
1668                    .l0
1669                    .sub_levels
1670                    .first()
1671                    .map(|l| l.table_infos.len())
1672                    .unwrap_or(0),
1673                compaction_config
1674                    .emergency_level0_sub_level_partition
1675                    .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1676            ));
1677        }
1678
1679        GroupState::Normal
1680    }
1681
1682    pub fn group_state(levels: &Levels, compaction_config: &CompactionConfig) -> GroupState {
1683        let state = Self::check_single_group_write_stop(levels, compaction_config);
1684        if state.is_write_stop() {
1685            return state;
1686        }
1687
1688        Self::check_single_group_emergency(levels, compaction_config)
1689    }
1690}