risingwave_meta/hummock/manager/compaction/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Copyright 2025 RisingWave Labs
16//
17// Licensed under the Apache License, Version 2.0 (the "License");
18// you may not use this file except in compliance with the License.
19// You may obtain a copy of the License at
20//
21//     http://www.apache.org/licenses/LICENSE-2.0
22//
23// Unless required by applicable law or agreed to in writing, software
24// distributed under the License is distributed on an "AS IS" BASIS,
25// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
26// See the License for the specific language governing permissions and
27// limitations under the License.
28
29use std::collections::{BTreeMap, HashMap, HashSet};
30use std::sync::{Arc, LazyLock};
31use std::time::Instant;
32
33use anyhow::Context;
34use compaction_event_loop::{
35    HummockCompactionEventDispatcher, HummockCompactionEventHandler, HummockCompactionEventLoop,
36    HummockCompactorDedicatedEventLoop,
37};
38use fail::fail_point;
39use itertools::Itertools;
40use parking_lot::Mutex;
41use rand::rng as thread_rng;
42use rand::seq::SliceRandom;
43use risingwave_common::catalog::TableId;
44use risingwave_common::config::meta::default::compaction_config;
45use risingwave_common::util::epoch::Epoch;
46use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask};
47use risingwave_hummock_sdk::compaction_group::StateTableId;
48use risingwave_hummock_sdk::key_range::KeyRange;
49use risingwave_hummock_sdk::level::Levels;
50use risingwave_hummock_sdk::sstable_info::SstableInfo;
51use risingwave_hummock_sdk::table_stats::{
52    PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats,
53};
54use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
55use risingwave_hummock_sdk::version::{GroupDelta, IntraLevelDelta};
56use risingwave_hummock_sdk::{
57    CompactionGroupId, HummockCompactionTaskId, HummockSstableId, HummockSstableObjectId,
58    HummockVersionId, compact_task_to_string, statistics_compact_task,
59};
60use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
61use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
62use risingwave_pb::hummock::{
63    CompactTaskAssignment, CompactionConfig, PbCompactStatus, PbCompactTaskAssignment,
64    SubscribeCompactionEventRequest, TableOption, TableSchema, compact_task,
65};
66use thiserror_ext::AsReport;
67use tokio::sync::RwLockWriteGuard;
68use tokio::sync::mpsc::UnboundedReceiver;
69use tokio::sync::mpsc::error::SendError;
70use tokio::sync::oneshot::Sender;
71use tokio::task::JoinHandle;
72use tonic::Streaming;
73use tracing::warn;
74
75use crate::hummock::compaction::selector::level_selector::PickerInfo;
76use crate::hummock::compaction::selector::{
77    DynamicLevelSelector, DynamicLevelSelectorCore, LocalSelectorStatistic, ManualCompactionOption,
78    ManualCompactionSelector, SpaceReclaimCompactionSelector, TombstoneCompactionSelector,
79    TtlCompactionSelector, VnodeWatermarkCompactionSelector,
80};
81use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig, CompactionSelector};
82use crate::hummock::error::{Error, Result};
83use crate::hummock::manager::transaction::{
84    HummockVersionStatsTransaction, HummockVersionTransaction,
85};
86use crate::hummock::manager::versioning::Versioning;
87use crate::hummock::metrics_utils::{
88    build_compact_task_level_type_metrics_label, trigger_compact_tasks_stat,
89    trigger_local_table_stat,
90};
91use crate::hummock::model::CompactionGroup;
92use crate::hummock::sequence::next_compaction_task_id;
93use crate::hummock::{HummockManager, commit_multi_var, start_measure_real_process_timer};
94use crate::manager::META_NODE_ID;
95use crate::model::BTreeMapTransaction;
96
97pub mod compaction_event_loop;
98pub mod compaction_group_manager;
99pub mod compaction_group_schedule;
100
101static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
102    [
103        TaskStatus::ManualCanceled,
104        TaskStatus::SendFailCanceled,
105        TaskStatus::AssignFailCanceled,
106        TaskStatus::HeartbeatCanceled,
107        TaskStatus::InvalidGroupCanceled,
108        TaskStatus::NoAvailMemoryResourceCanceled,
109        TaskStatus::NoAvailCpuResourceCanceled,
110        TaskStatus::HeartbeatProgressCanceled,
111    ]
112    .into_iter()
113    .collect()
114});
115
116type CompactionRequestChannelItem = (CompactionGroupId, compact_task::TaskType);
117
118fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> {
119    let mut compaction_selectors: HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> =
120        HashMap::default();
121    compaction_selectors.insert(
122        compact_task::TaskType::Dynamic,
123        Box::<DynamicLevelSelector>::default(),
124    );
125    compaction_selectors.insert(
126        compact_task::TaskType::SpaceReclaim,
127        Box::<SpaceReclaimCompactionSelector>::default(),
128    );
129    compaction_selectors.insert(
130        compact_task::TaskType::Ttl,
131        Box::<TtlCompactionSelector>::default(),
132    );
133    compaction_selectors.insert(
134        compact_task::TaskType::Tombstone,
135        Box::<TombstoneCompactionSelector>::default(),
136    );
137    compaction_selectors.insert(
138        compact_task::TaskType::VnodeWatermark,
139        Box::<VnodeWatermarkCompactionSelector>::default(),
140    );
141    compaction_selectors
142}
143
144impl HummockVersionTransaction<'_> {
145    fn apply_compact_task(&mut self, compact_task: &CompactTask) {
146        let mut version_delta = self.new_delta();
147        let trivial_move = compact_task.is_trivial_move_task();
148        version_delta.trivial_move = trivial_move;
149
150        let group_deltas = &mut version_delta
151            .group_deltas
152            .entry(compact_task.compaction_group_id)
153            .or_default()
154            .group_deltas;
155        let mut removed_table_ids_map: BTreeMap<u32, HashSet<HummockSstableId>> =
156            BTreeMap::default();
157
158        for level in &compact_task.input_ssts {
159            let level_idx = level.level_idx;
160
161            removed_table_ids_map
162                .entry(level_idx)
163                .or_default()
164                .extend(level.table_infos.iter().map(|sst| sst.sst_id));
165        }
166
167        for (level_idx, removed_table_ids) in removed_table_ids_map {
168            let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
169                level_idx,
170                0, // default
171                removed_table_ids,
172                vec![], // default
173                0,      // default
174                compact_task.compaction_group_version_id,
175            ));
176
177            group_deltas.push(group_delta);
178        }
179
180        let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
181            compact_task.target_level,
182            compact_task.target_sub_level_id,
183            HashSet::new(), // default
184            compact_task.sorted_output_ssts.clone(),
185            compact_task.split_weight_by_vnode,
186            compact_task.compaction_group_version_id,
187        ));
188
189        group_deltas.push(group_delta);
190        version_delta.pre_apply();
191    }
192}
193
194#[derive(Default)]
195pub struct Compaction {
196    /// Compaction task that is already assigned to a compactor
197    pub compact_task_assignment: BTreeMap<HummockCompactionTaskId, PbCompactTaskAssignment>,
198    /// `CompactStatus` of each compaction group
199    pub compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus>,
200
201    pub _deterministic_mode: bool,
202}
203
204impl HummockManager {
205    pub async fn get_assigned_compact_task_num(&self) -> u64 {
206        self.compaction.read().await.compact_task_assignment.len() as u64
207    }
208
209    pub async fn list_compaction_status(
210        &self,
211    ) -> (Vec<PbCompactStatus>, Vec<CompactTaskAssignment>) {
212        let compaction = self.compaction.read().await;
213        (
214            compaction.compaction_statuses.values().map_into().collect(),
215            compaction
216                .compact_task_assignment
217                .values()
218                .cloned()
219                .collect(),
220        )
221    }
222
223    pub async fn get_compaction_scores(
224        &self,
225        compaction_group_id: CompactionGroupId,
226    ) -> Vec<PickerInfo> {
227        let (status, levels, group) = {
228            let compaction = self.compaction.read().await;
229            let versioning = self.versioning.read().await;
230            let config_manager = self.compaction_group_manager.read().await;
231            match (
232                compaction.compaction_statuses.get(&compaction_group_id),
233                versioning.current_version.levels.get(&compaction_group_id),
234                config_manager.try_get_compaction_group_config(compaction_group_id),
235            ) {
236                (Some(cs), Some(v), Some(cf)) => (cs.to_owned(), v.to_owned(), cf),
237                _ => {
238                    return vec![];
239                }
240            }
241        };
242        let dynamic_level_core = DynamicLevelSelectorCore::new(
243            group.compaction_config,
244            Arc::new(CompactionDeveloperConfig::default()),
245        );
246        let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers);
247        ctx.score_levels
248    }
249}
250
251impl HummockManager {
252    pub fn compaction_event_loop(
253        hummock_manager: Arc<Self>,
254        compactor_streams_change_rx: UnboundedReceiver<(
255            u32,
256            Streaming<SubscribeCompactionEventRequest>,
257        )>,
258    ) -> Vec<(JoinHandle<()>, Sender<()>)> {
259        let mut join_handle_vec = Vec::default();
260
261        let hummock_compaction_event_handler =
262            HummockCompactionEventHandler::new(hummock_manager.clone());
263
264        let dedicated_event_loop = HummockCompactorDedicatedEventLoop::new(
265            hummock_manager.clone(),
266            hummock_compaction_event_handler.clone(),
267        );
268
269        let (dedicated_event_loop_join_handle, event_tx, shutdown_tx) = dedicated_event_loop.run();
270        join_handle_vec.push((dedicated_event_loop_join_handle, shutdown_tx));
271
272        let hummock_compaction_event_dispatcher = HummockCompactionEventDispatcher::new(
273            hummock_manager.env.opts.clone(),
274            hummock_compaction_event_handler,
275            Some(event_tx),
276        );
277
278        let event_loop = HummockCompactionEventLoop::new(
279            hummock_compaction_event_dispatcher,
280            hummock_manager.metrics.clone(),
281            compactor_streams_change_rx,
282        );
283
284        let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
285        join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
286
287        join_handle_vec
288    }
289
290    pub fn add_compactor_stream(
291        &self,
292        context_id: u32,
293        req_stream: Streaming<SubscribeCompactionEventRequest>,
294    ) {
295        self.compactor_streams_change_tx
296            .send((context_id, req_stream))
297            .unwrap();
298    }
299
300    pub async fn auto_pick_compaction_group_and_type(
301        &self,
302    ) -> Option<(CompactionGroupId, compact_task::TaskType)> {
303        let mut compaction_group_ids = self.compaction_group_ids().await;
304        compaction_group_ids.shuffle(&mut thread_rng());
305
306        for cg_id in compaction_group_ids {
307            if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) {
308                return Some((cg_id, pick_type));
309            }
310        }
311
312        None
313    }
314
315    /// This method will return all compaction group id in a random order and task type. If there are any group block by `write_limit`, it will return a single array with `TaskType::Emergency`.
316    /// If these groups get different task-type, it will return all group id with `TaskType::Dynamic` if the first group get `TaskType::Dynamic`, otherwise it will return the single group with other task type.
317    async fn auto_pick_compaction_groups_and_type(
318        &self,
319    ) -> (Vec<CompactionGroupId>, compact_task::TaskType) {
320        let mut compaction_group_ids = self.compaction_group_ids().await;
321        compaction_group_ids.shuffle(&mut thread_rng());
322
323        let mut normal_groups = vec![];
324        for cg_id in compaction_group_ids {
325            if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) {
326                if pick_type == TaskType::Dynamic {
327                    normal_groups.push(cg_id);
328                } else if normal_groups.is_empty() {
329                    return (vec![cg_id], pick_type);
330                }
331            }
332        }
333        (normal_groups, TaskType::Dynamic)
334    }
335}
336
337impl HummockManager {
338    pub async fn get_compact_tasks_impl(
339        &self,
340        compaction_groups: Vec<CompactionGroupId>,
341        max_select_count: usize,
342        selector: &mut Box<dyn CompactionSelector>,
343    ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
344        let deterministic_mode = self.env.opts.compaction_deterministic_test;
345
346        let mut compaction_guard = self.compaction.write().await;
347        let compaction: &mut Compaction = &mut compaction_guard;
348        let mut versioning_guard = self.versioning.write().await;
349        let versioning: &mut Versioning = &mut versioning_guard;
350
351        let _timer = start_measure_real_process_timer!(self, "get_compact_tasks_impl");
352
353        let start_time = Instant::now();
354        let mut compaction_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
355
356        let mut compact_task_assignment =
357            BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
358
359        let mut version = HummockVersionTransaction::new(
360            &mut versioning.current_version,
361            &mut versioning.hummock_version_deltas,
362            self.env.notification_manager(),
363            None,
364            &self.metrics,
365        );
366        // Apply stats changes.
367        let mut version_stats = HummockVersionStatsTransaction::new(
368            &mut versioning.version_stats,
369            self.env.notification_manager(),
370        );
371
372        if deterministic_mode {
373            version.disable_apply_to_txn();
374        }
375        let all_versioned_table_schemas = if self.env.opts.enable_dropped_column_reclaim {
376            self.metadata_manager
377                .catalog_controller
378                .get_versioned_table_schemas()
379                .await
380                .map_err(|e| Error::Internal(e.into()))?
381        } else {
382            HashMap::default()
383        };
384        let mut unschedule_groups = vec![];
385        let mut trivial_tasks = vec![];
386        let mut pick_tasks = vec![];
387        let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts(
388            &self.env.opts,
389        ));
390        'outside: for compaction_group_id in compaction_groups {
391            if pick_tasks.len() >= max_select_count {
392                break;
393            }
394
395            if !version
396                .latest_version()
397                .levels
398                .contains_key(&compaction_group_id)
399            {
400                continue;
401            }
402
403            // When the last table of a compaction group is deleted, the compaction group (and its
404            // config) is destroyed as well. Then a compaction task for this group may come later and
405            // cannot find its config.
406            let group_config = {
407                let config_manager = self.compaction_group_manager.read().await;
408
409                match config_manager.try_get_compaction_group_config(compaction_group_id) {
410                    Some(config) => config,
411                    None => continue,
412                }
413            };
414
415            // StoredIdGenerator already implements ids pre-allocation by ID_PREALLOCATE_INTERVAL.
416            let task_id = next_compaction_task_id(&self.env).await?;
417
418            if !compaction_statuses.contains_key(&compaction_group_id) {
419                // lazy initialize.
420                compaction_statuses.insert(
421                    compaction_group_id,
422                    CompactStatus::new(
423                        compaction_group_id,
424                        group_config.compaction_config.max_level,
425                    ),
426                );
427            }
428            let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap();
429
430            let can_trivial_move = matches!(selector.task_type(), TaskType::Dynamic)
431                || matches!(selector.task_type(), TaskType::Emergency);
432
433            let mut stats = LocalSelectorStatistic::default();
434            let member_table_ids: Vec<_> = version
435                .latest_version()
436                .state_table_info
437                .compaction_group_member_table_ids(compaction_group_id)
438                .iter()
439                .copied()
440                .collect();
441
442            let mut table_id_to_option: HashMap<TableId, _> = HashMap::default();
443
444            {
445                let guard = self.table_id_to_table_option.read();
446                for table_id in &member_table_ids {
447                    if let Some(opts) = guard.get(table_id) {
448                        table_id_to_option.insert(*table_id, *opts);
449                    }
450                }
451            }
452
453            while let Some(compact_task) = compact_status.get_compact_task(
454                version
455                    .latest_version()
456                    .get_compaction_group_levels(compaction_group_id),
457                version
458                    .latest_version()
459                    .state_table_info
460                    .compaction_group_member_table_ids(compaction_group_id),
461                task_id as HummockCompactionTaskId,
462                &group_config,
463                &mut stats,
464                selector,
465                &table_id_to_option,
466                developer_config.clone(),
467                &version.latest_version().table_watermarks,
468                &version.latest_version().state_table_info,
469            ) {
470                let target_level_id = compact_task.input.target_level as u32;
471                let compaction_group_version_id = version
472                    .latest_version()
473                    .get_compaction_group_levels(compaction_group_id)
474                    .compaction_group_version_id;
475                let compression_algorithm = match compact_task.compression_algorithm.as_str() {
476                    "Lz4" => 1,
477                    "Zstd" => 2,
478                    _ => 0,
479                };
480                let vnode_partition_count = compact_task.input.vnode_partition_count;
481                let mut compact_task = CompactTask {
482                    input_ssts: compact_task.input.input_levels,
483                    splits: vec![KeyRange::inf()],
484                    sorted_output_ssts: vec![],
485                    task_id,
486                    target_level: target_level_id,
487                    // only gc delete keys in last level because there may be older version in more bottom
488                    // level.
489                    gc_delete_keys: version
490                        .latest_version()
491                        .get_compaction_group_levels(compaction_group_id)
492                        .is_last_level(target_level_id),
493                    base_level: compact_task.base_level as u32,
494                    task_status: TaskStatus::Pending,
495                    compaction_group_id: group_config.group_id,
496                    compaction_group_version_id,
497                    existing_table_ids: member_table_ids.clone(),
498                    compression_algorithm,
499                    target_file_size: compact_task.target_file_size,
500                    table_options: table_id_to_option
501                        .iter()
502                        .map(|(table_id, table_option)| {
503                            (*table_id, TableOption::from(table_option))
504                        })
505                        .collect(),
506                    current_epoch_time: Epoch::now().0,
507                    compaction_filter_mask: group_config.compaction_config.compaction_filter_mask,
508                    target_sub_level_id: compact_task.input.target_sub_level_id,
509                    task_type: compact_task.compaction_task_type,
510                    split_weight_by_vnode: vnode_partition_count,
511                    max_sub_compaction: group_config.compaction_config.max_sub_compaction,
512                    ..Default::default()
513                };
514
515                let is_trivial_reclaim = compact_task.is_trivial_reclaim();
516                let is_trivial_move = compact_task.is_trivial_move_task();
517                if is_trivial_reclaim || (is_trivial_move && can_trivial_move) {
518                    let log_label = if is_trivial_reclaim {
519                        "TrivialReclaim"
520                    } else {
521                        "TrivialMove"
522                    };
523                    let label = if is_trivial_reclaim {
524                        "trivial-space-reclaim"
525                    } else {
526                        "trivial-move"
527                    };
528
529                    tracing::debug!(
530                        "{} for compaction group {}: input: {:?}, cost time: {:?}",
531                        log_label,
532                        compact_task.compaction_group_id,
533                        compact_task.input_ssts,
534                        start_time.elapsed()
535                    );
536                    compact_task.task_status = TaskStatus::Success;
537                    compact_status.report_compact_task(&compact_task);
538                    if !is_trivial_reclaim {
539                        compact_task
540                            .sorted_output_ssts
541                            .clone_from(&compact_task.input_ssts[0].table_infos);
542                    }
543                    update_table_stats_for_vnode_watermark_trivial_reclaim(
544                        &mut version_stats.table_stats,
545                        &compact_task,
546                    );
547                    self.metrics
548                        .compact_frequency
549                        .with_label_values(&[
550                            label,
551                            &compact_task.compaction_group_id.to_string(),
552                            selector.task_type().as_str_name(),
553                            "SUCCESS",
554                        ])
555                        .inc();
556
557                    version.apply_compact_task(&compact_task);
558                    trivial_tasks.push(compact_task);
559                    if trivial_tasks.len() >= self.env.opts.max_trivial_move_task_count_per_loop {
560                        break 'outside;
561                    }
562                } else {
563                    self.calculate_vnode_partition(
564                        &mut compact_task,
565                        group_config.compaction_config.as_ref(),
566                    );
567
568                    let table_ids_to_be_compacted = compact_task.build_compact_table_ids();
569
570                    let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks) = version
571                        .latest_version()
572                        .safe_epoch_table_watermarks(&table_ids_to_be_compacted)
573                        .into_iter()
574                        .partition(|(_table_id, table_watermarke)| {
575                            matches!(
576                                table_watermarke.watermark_type,
577                                WatermarkSerdeType::PkPrefix
578                            )
579                        });
580
581                    compact_task.pk_prefix_table_watermarks = pk_prefix_table_watermarks;
582                    compact_task.non_pk_prefix_table_watermarks = non_pk_prefix_table_watermarks;
583
584                    compact_task.table_schemas = compact_task
585                        .existing_table_ids
586                        .iter()
587                        .filter_map(|table_id| {
588                            all_versioned_table_schemas.get(table_id).map(|column_ids| {
589                                (
590                                    *table_id,
591                                    TableSchema {
592                                        column_ids: column_ids.clone(),
593                                    },
594                                )
595                            })
596                        })
597                        .collect();
598
599                    compact_task_assignment.insert(
600                        compact_task.task_id,
601                        CompactTaskAssignment {
602                            compact_task: Some(compact_task.clone().into()),
603                            context_id: META_NODE_ID, // deprecated
604                        },
605                    );
606
607                    pick_tasks.push(compact_task);
608                    break;
609                }
610
611                stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
612                stats = LocalSelectorStatistic::default();
613            }
614            if pick_tasks
615                .last()
616                .map(|task| task.compaction_group_id != compaction_group_id)
617                .unwrap_or(true)
618            {
619                unschedule_groups.push(compaction_group_id);
620            }
621            stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
622        }
623
624        if !trivial_tasks.is_empty() {
625            commit_multi_var!(
626                self.meta_store_ref(),
627                compaction_statuses,
628                compact_task_assignment,
629                version,
630                version_stats
631            )?;
632            self.metrics
633                .compact_task_batch_count
634                .with_label_values(&["batch_trivial_move"])
635                .observe(trivial_tasks.len() as f64);
636
637            for trivial_task in &trivial_tasks {
638                self.metrics
639                    .compact_task_trivial_move_sst_count
640                    .with_label_values(&[&trivial_task.compaction_group_id.to_string()])
641                    .observe(trivial_task.input_ssts[0].table_infos.len() as _);
642            }
643
644            drop(versioning_guard);
645        } else {
646            // We are using a single transaction to ensure that each task has progress when it is
647            // created.
648            drop(versioning_guard);
649            commit_multi_var!(
650                self.meta_store_ref(),
651                compaction_statuses,
652                compact_task_assignment
653            )?;
654        }
655        drop(compaction_guard);
656        if !pick_tasks.is_empty() {
657            self.metrics
658                .compact_task_batch_count
659                .with_label_values(&["batch_get_compact_task"])
660                .observe(pick_tasks.len() as f64);
661        }
662
663        for compact_task in &mut pick_tasks {
664            let compaction_group_id = compact_task.compaction_group_id;
665
666            // Initiate heartbeat for the task to track its progress.
667            self.compactor_manager
668                .initiate_task_heartbeat(compact_task.clone());
669
670            // this task has been finished.
671            compact_task.task_status = TaskStatus::Pending;
672            let compact_task_statistics = statistics_compact_task(compact_task);
673
674            let level_type_label = build_compact_task_level_type_metrics_label(
675                compact_task.input_ssts[0].level_idx as usize,
676                compact_task.input_ssts.last().unwrap().level_idx as usize,
677            );
678
679            let level_count = compact_task.input_ssts.len();
680            if compact_task.input_ssts[0].level_idx == 0 {
681                self.metrics
682                    .l0_compact_level_count
683                    .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
684                    .observe(level_count as _);
685            }
686
687            self.metrics
688                .compact_task_size
689                .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
690                .observe(compact_task_statistics.total_file_size as _);
691
692            self.metrics
693                .compact_task_size
694                .with_label_values(&[
695                    &compaction_group_id.to_string(),
696                    &format!("{} uncompressed", level_type_label),
697                ])
698                .observe(compact_task_statistics.total_uncompressed_file_size as _);
699
700            self.metrics
701                .compact_task_file_count
702                .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
703                .observe(compact_task_statistics.total_file_count as _);
704
705            tracing::trace!(
706                "For compaction group {}: pick up {} {} sub_level in level {} to compact to target {}. cost time: {:?} compact_task_statistics {:?}",
707                compaction_group_id,
708                level_count,
709                compact_task.input_ssts[0].level_type.as_str_name(),
710                compact_task.input_ssts[0].level_idx,
711                compact_task.target_level,
712                start_time.elapsed(),
713                compact_task_statistics
714            );
715        }
716
717        #[cfg(test)]
718        {
719            self.check_state_consistency().await;
720        }
721        pick_tasks.extend(trivial_tasks);
722        Ok((pick_tasks, unschedule_groups))
723    }
724
725    /// Cancels a compaction task no matter it's assigned or unassigned.
726    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
727        fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore(
728            anyhow::anyhow!("failpoint metastore err")
729        )));
730        let ret = self
731            .cancel_compact_task_impl(vec![task_id], task_status)
732            .await?;
733        Ok(ret[0])
734    }
735
736    pub async fn cancel_compact_tasks(
737        &self,
738        tasks: Vec<u64>,
739        task_status: TaskStatus,
740    ) -> Result<Vec<bool>> {
741        self.cancel_compact_task_impl(tasks, task_status).await
742    }
743
744    async fn cancel_compact_task_impl(
745        &self,
746        task_ids: Vec<u64>,
747        task_status: TaskStatus,
748    ) -> Result<Vec<bool>> {
749        assert!(CANCEL_STATUS_SET.contains(&task_status));
750        let tasks = task_ids
751            .into_iter()
752            .map(|task_id| ReportTask {
753                task_id,
754                task_status,
755                sorted_output_ssts: vec![],
756                table_stats_change: HashMap::default(),
757                object_timestamps: HashMap::default(),
758            })
759            .collect_vec();
760        let rets = self.report_compact_tasks(tasks).await?;
761        #[cfg(test)]
762        {
763            self.check_state_consistency().await;
764        }
765        Ok(rets)
766    }
767
768    async fn get_compact_tasks(
769        &self,
770        mut compaction_groups: Vec<CompactionGroupId>,
771        max_select_count: usize,
772        selector: &mut Box<dyn CompactionSelector>,
773    ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
774        fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
775            anyhow::anyhow!("failpoint metastore error")
776        )));
777        compaction_groups.shuffle(&mut thread_rng());
778        let (mut tasks, groups) = self
779            .get_compact_tasks_impl(compaction_groups, max_select_count, selector)
780            .await?;
781        tasks.retain(|task| {
782            if task.task_status == TaskStatus::Success {
783                debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
784                false
785            } else {
786                true
787            }
788        });
789        Ok((tasks, groups))
790    }
791
792    pub async fn get_compact_task(
793        &self,
794        compaction_group_id: CompactionGroupId,
795        selector: &mut Box<dyn CompactionSelector>,
796    ) -> Result<Option<CompactTask>> {
797        fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
798            anyhow::anyhow!("failpoint metastore error")
799        )));
800
801        let (normal_tasks, _) = self
802            .get_compact_tasks_impl(vec![compaction_group_id], 1, selector)
803            .await?;
804        for task in normal_tasks {
805            if task.task_status != TaskStatus::Success {
806                return Ok(Some(task));
807            }
808            debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
809        }
810        Ok(None)
811    }
812
813    pub async fn manual_get_compact_task(
814        &self,
815        compaction_group_id: CompactionGroupId,
816        manual_compaction_option: ManualCompactionOption,
817    ) -> Result<Option<CompactTask>> {
818        let mut selector: Box<dyn CompactionSelector> =
819            Box::new(ManualCompactionSelector::new(manual_compaction_option));
820        self.get_compact_task(compaction_group_id, &mut selector)
821            .await
822    }
823
824    pub async fn report_compact_task(
825        &self,
826        task_id: u64,
827        task_status: TaskStatus,
828        sorted_output_ssts: Vec<SstableInfo>,
829        table_stats_change: Option<PbTableStatsMap>,
830        object_timestamps: HashMap<HummockSstableObjectId, u64>,
831    ) -> Result<bool> {
832        let rets = self
833            .report_compact_tasks(vec![ReportTask {
834                task_id,
835                task_status,
836                sorted_output_ssts,
837                table_stats_change: table_stats_change.unwrap_or_default(),
838                object_timestamps,
839            }])
840            .await?;
841        Ok(rets[0])
842    }
843
844    pub async fn report_compact_tasks(&self, report_tasks: Vec<ReportTask>) -> Result<Vec<bool>> {
845        let compaction_guard = self.compaction.write().await;
846        let versioning_guard = self.versioning.write().await;
847
848        self.report_compact_tasks_impl(report_tasks, compaction_guard, versioning_guard)
849            .await
850    }
851
852    /// Finishes or cancels a compaction task, according to `task_status`.
853    ///
854    /// If `context_id` is not None, its validity will be checked when writing meta store.
855    /// Its ownership of the task is checked as well.
856    ///
857    /// Return Ok(false) indicates either the task is not found,
858    /// or the task is not owned by `context_id` when `context_id` is not None.
859    pub async fn report_compact_tasks_impl(
860        &self,
861        report_tasks: Vec<ReportTask>,
862        mut compaction_guard: RwLockWriteGuard<'_, Compaction>,
863        mut versioning_guard: RwLockWriteGuard<'_, Versioning>,
864    ) -> Result<Vec<bool>> {
865        let deterministic_mode = self.env.opts.compaction_deterministic_test;
866        let compaction: &mut Compaction = &mut compaction_guard;
867        let start_time = Instant::now();
868        let original_keys = compaction.compaction_statuses.keys().cloned().collect_vec();
869        let mut compact_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
870        let mut rets = vec![false; report_tasks.len()];
871        let mut compact_task_assignment =
872            BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
873        // The compaction task is finished.
874        let versioning: &mut Versioning = &mut versioning_guard;
875        let _timer = start_measure_real_process_timer!(self, "report_compact_tasks");
876
877        // purge stale compact_status
878        for group_id in original_keys {
879            if !versioning.current_version.levels.contains_key(&group_id) {
880                compact_statuses.remove(group_id);
881            }
882        }
883        let mut tasks = vec![];
884
885        let mut version = HummockVersionTransaction::new(
886            &mut versioning.current_version,
887            &mut versioning.hummock_version_deltas,
888            self.env.notification_manager(),
889            None,
890            &self.metrics,
891        );
892
893        if deterministic_mode {
894            version.disable_apply_to_txn();
895        }
896
897        let mut version_stats = HummockVersionStatsTransaction::new(
898            &mut versioning.version_stats,
899            self.env.notification_manager(),
900        );
901        let mut success_count = 0;
902        for (idx, task) in report_tasks.into_iter().enumerate() {
903            rets[idx] = true;
904            let mut compact_task = match compact_task_assignment.remove(task.task_id) {
905                Some(compact_task) => CompactTask::from(compact_task.compact_task.unwrap()),
906                None => {
907                    tracing::warn!("{}", format!("compact task {} not found", task.task_id));
908                    rets[idx] = false;
909                    continue;
910                }
911            };
912
913            {
914                // apply result
915                compact_task.task_status = task.task_status;
916                compact_task.sorted_output_ssts = task.sorted_output_ssts;
917            }
918
919            match compact_statuses.get_mut(compact_task.compaction_group_id) {
920                Some(mut compact_status) => {
921                    compact_status.report_compact_task(&compact_task);
922                }
923                None => {
924                    // When the group_id is not found in the compaction_statuses, it means the group has been removed.
925                    // The task is invalid and should be canceled.
926                    // e.g.
927                    // 1. The group is removed by the user unregistering the tables
928                    // 2. The group is removed by the group scheduling algorithm
929                    compact_task.task_status = TaskStatus::InvalidGroupCanceled;
930                }
931            }
932
933            let is_success = if let TaskStatus::Success = compact_task.task_status {
934                match self
935                    .report_compaction_sanity_check(&task.object_timestamps)
936                    .await
937                {
938                    Err(e) => {
939                        warn!(
940                            "failed to commit compaction task {} {}",
941                            compact_task.task_id,
942                            e.as_report()
943                        );
944                        compact_task.task_status = TaskStatus::RetentionTimeRejected;
945                        false
946                    }
947                    _ => {
948                        let group = version
949                            .latest_version()
950                            .levels
951                            .get(&compact_task.compaction_group_id)
952                            .unwrap();
953                        let is_expired = compact_task.is_expired(group.compaction_group_version_id);
954                        if is_expired {
955                            compact_task.task_status = TaskStatus::InputOutdatedCanceled;
956                            warn!(
957                                "The task may be expired because of group split, task:\n {:?}",
958                                compact_task_to_string(&compact_task)
959                            );
960                        }
961                        !is_expired
962                    }
963                }
964            } else {
965                false
966            };
967            if is_success {
968                success_count += 1;
969                version.apply_compact_task(&compact_task);
970                if purge_prost_table_stats(&mut version_stats.table_stats, version.latest_version())
971                {
972                    self.metrics.version_stats.reset();
973                    versioning.local_metrics.clear();
974                }
975                add_prost_table_stats_map(&mut version_stats.table_stats, &task.table_stats_change);
976                trigger_local_table_stat(
977                    &self.metrics,
978                    &mut versioning.local_metrics,
979                    &version_stats,
980                    &task.table_stats_change,
981                );
982            }
983            tasks.push(compact_task);
984        }
985        if success_count > 0 {
986            commit_multi_var!(
987                self.meta_store_ref(),
988                compact_statuses,
989                compact_task_assignment,
990                version,
991                version_stats
992            )?;
993
994            self.metrics
995                .compact_task_batch_count
996                .with_label_values(&["batch_report_task"])
997                .observe(success_count as f64);
998        } else {
999            // The compaction task is cancelled or failed.
1000            commit_multi_var!(
1001                self.meta_store_ref(),
1002                compact_statuses,
1003                compact_task_assignment
1004            )?;
1005        }
1006
1007        let mut success_groups = vec![];
1008        for compact_task in &tasks {
1009            self.compactor_manager
1010                .remove_task_heartbeat(compact_task.task_id);
1011            tracing::trace!(
1012                "Reported compaction task. {}. cost time: {:?}",
1013                compact_task_to_string(compact_task),
1014                start_time.elapsed(),
1015            );
1016
1017            if !deterministic_mode
1018                && (matches!(compact_task.task_type, compact_task::TaskType::Dynamic)
1019                    || matches!(compact_task.task_type, compact_task::TaskType::Emergency))
1020            {
1021                // only try send Dynamic compaction
1022                self.try_send_compaction_request(
1023                    compact_task.compaction_group_id,
1024                    compact_task::TaskType::Dynamic,
1025                );
1026            }
1027
1028            if compact_task.task_status == TaskStatus::Success {
1029                success_groups.push(compact_task.compaction_group_id);
1030            }
1031        }
1032
1033        trigger_compact_tasks_stat(
1034            &self.metrics,
1035            &tasks,
1036            &compaction.compaction_statuses,
1037            &versioning_guard.current_version,
1038        );
1039        drop(versioning_guard);
1040        if !success_groups.is_empty() {
1041            self.try_update_write_limits(&success_groups).await;
1042        }
1043        Ok(rets)
1044    }
1045
1046    /// Triggers compacitons to specified compaction groups.
1047    /// Don't wait for compaction finish
1048    pub async fn trigger_compaction_deterministic(
1049        &self,
1050        _base_version_id: HummockVersionId,
1051        compaction_groups: Vec<CompactionGroupId>,
1052    ) -> Result<()> {
1053        self.on_current_version(|old_version| {
1054            tracing::info!(
1055                "Trigger compaction for version {}, groups {:?}",
1056                old_version.id,
1057                compaction_groups
1058            );
1059        })
1060        .await;
1061
1062        if compaction_groups.is_empty() {
1063            return Ok(());
1064        }
1065        for compaction_group in compaction_groups {
1066            self.try_send_compaction_request(compaction_group, compact_task::TaskType::Dynamic);
1067        }
1068        Ok(())
1069    }
1070
1071    pub async fn trigger_manual_compaction(
1072        &self,
1073        compaction_group: CompactionGroupId,
1074        manual_compaction_option: ManualCompactionOption,
1075    ) -> Result<()> {
1076        let start_time = Instant::now();
1077
1078        // 1. Get idle compactor.
1079        let compactor = match self.compactor_manager.next_compactor() {
1080            Some(compactor) => compactor,
1081            None => {
1082                tracing::warn!("trigger_manual_compaction No compactor is available.");
1083                return Err(anyhow::anyhow!(
1084                    "trigger_manual_compaction No compactor is available. compaction_group {}",
1085                    compaction_group
1086                )
1087                .into());
1088            }
1089        };
1090
1091        // 2. Get manual compaction task.
1092        let compact_task = self
1093            .manual_get_compact_task(compaction_group, manual_compaction_option)
1094            .await;
1095        let compact_task = match compact_task {
1096            Ok(Some(compact_task)) => compact_task,
1097            Ok(None) => {
1098                // No compaction task available.
1099                return Err(anyhow::anyhow!(
1100                    "trigger_manual_compaction No compaction_task is available. compaction_group {}",
1101                    compaction_group
1102                )
1103                    .into());
1104            }
1105            Err(err) => {
1106                tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
1107
1108                return Err(anyhow::anyhow!(err)
1109                    .context(format!(
1110                        "Failed to get compaction task for compaction_group {}",
1111                        compaction_group,
1112                    ))
1113                    .into());
1114            }
1115        };
1116
1117        // 3. send task to compactor
1118        let compact_task_string = compact_task_to_string(&compact_task);
1119        // TODO: shall we need to cancel on meta ?
1120        compactor
1121            .send_event(ResponseEvent::CompactTask(compact_task.into()))
1122            .with_context(|| {
1123                format!(
1124                    "Failed to trigger compaction task for compaction_group {}",
1125                    compaction_group,
1126                )
1127            })?;
1128
1129        tracing::info!(
1130            "Trigger manual compaction task. {}. cost time: {:?}",
1131            &compact_task_string,
1132            start_time.elapsed(),
1133        );
1134
1135        Ok(())
1136    }
1137
1138    /// Sends a compaction request.
1139    pub fn try_send_compaction_request(
1140        &self,
1141        compaction_group: CompactionGroupId,
1142        task_type: compact_task::TaskType,
1143    ) -> bool {
1144        match self
1145            .compaction_state
1146            .try_sched_compaction(compaction_group, task_type)
1147        {
1148            Ok(_) => true,
1149            Err(e) => {
1150                tracing::error!(
1151                    error = %e.as_report(),
1152                    "failed to send compaction request for compaction group {}",
1153                    compaction_group,
1154                );
1155                false
1156            }
1157        }
1158    }
1159
1160    pub(crate) fn calculate_vnode_partition(
1161        &self,
1162        compact_task: &mut CompactTask,
1163        compaction_config: &CompactionConfig,
1164    ) {
1165        // do not split sst by vnode partition when target_level > base_level
1166        // The purpose of data alignment is mainly to improve the parallelism of base level compaction and reduce write amplification.
1167        // However, at high level, the size of the sst file is often larger and only contains the data of a single table_id, so there is no need to cut it.
1168        if compact_task.target_level > compact_task.base_level {
1169            return;
1170        }
1171        if compaction_config.split_weight_by_vnode > 0 {
1172            for table_id in &compact_task.existing_table_ids {
1173                compact_task
1174                    .table_vnode_partition
1175                    .insert(*table_id, compact_task.split_weight_by_vnode);
1176            }
1177        } else {
1178            let mut table_size_info: HashMap<TableId, u64> = HashMap::default();
1179            let mut existing_table_ids: HashSet<TableId> = HashSet::default();
1180            for input_ssts in &compact_task.input_ssts {
1181                for sst in &input_ssts.table_infos {
1182                    existing_table_ids.extend(sst.table_ids.iter());
1183                    for table_id in &sst.table_ids {
1184                        *table_size_info.entry(*table_id).or_default() +=
1185                            sst.sst_size / (sst.table_ids.len() as u64);
1186                    }
1187                }
1188            }
1189            compact_task
1190                .existing_table_ids
1191                .retain(|table_id| existing_table_ids.contains(table_id));
1192
1193            let hybrid_vnode_count = self.env.opts.hybrid_partition_node_count;
1194            let default_partition_count = self.env.opts.partition_vnode_count;
1195            // We must ensure the partition threshold large enough to avoid too many small files.
1196            let compact_task_table_size_partition_threshold_low = self
1197                .env
1198                .opts
1199                .compact_task_table_size_partition_threshold_low;
1200            let compact_task_table_size_partition_threshold_high = self
1201                .env
1202                .opts
1203                .compact_task_table_size_partition_threshold_high;
1204            // check latest write throughput
1205            let table_write_throughput_statistic_manager =
1206                self.table_write_throughput_statistic_manager.read();
1207            let timestamp = chrono::Utc::now().timestamp();
1208            for (table_id, compact_table_size) in table_size_info {
1209                let write_throughput = table_write_throughput_statistic_manager
1210                    .get_table_throughput_descending(table_id, timestamp)
1211                    .peekable()
1212                    .peek()
1213                    .map(|item| item.throughput)
1214                    .unwrap_or(0);
1215                if compact_table_size > compact_task_table_size_partition_threshold_high
1216                    && default_partition_count > 0
1217                {
1218                    compact_task
1219                        .table_vnode_partition
1220                        .insert(table_id, default_partition_count);
1221                } else if (compact_table_size > compact_task_table_size_partition_threshold_low
1222                    || (write_throughput > self.env.opts.table_high_write_throughput_threshold
1223                        && compact_table_size > compaction_config.target_file_size_base))
1224                    && hybrid_vnode_count > 0
1225                {
1226                    // partition for large write throughput table. But we also need to make sure that it can not be too small.
1227                    compact_task
1228                        .table_vnode_partition
1229                        .insert(table_id, hybrid_vnode_count);
1230                } else if compact_table_size > compaction_config.target_file_size_base {
1231                    // partition for small table
1232                    compact_task.table_vnode_partition.insert(table_id, 1);
1233                }
1234            }
1235            compact_task
1236                .table_vnode_partition
1237                .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id));
1238        }
1239    }
1240
1241    pub fn compactor_manager_ref(&self) -> crate::hummock::CompactorManagerRef {
1242        self.compactor_manager.clone()
1243    }
1244}
1245
1246#[cfg(any(test, feature = "test"))]
1247impl HummockManager {
1248    pub async fn compaction_task_from_assignment_for_test(
1249        &self,
1250        task_id: u64,
1251    ) -> Option<CompactTaskAssignment> {
1252        let compaction_guard = self.compaction.read().await;
1253        let assignment_ref = &compaction_guard.compact_task_assignment;
1254        assignment_ref.get(&task_id).cloned()
1255    }
1256
1257    pub async fn report_compact_task_for_test(
1258        &self,
1259        task_id: u64,
1260        compact_task: Option<CompactTask>,
1261        task_status: TaskStatus,
1262        sorted_output_ssts: Vec<SstableInfo>,
1263        table_stats_change: Option<PbTableStatsMap>,
1264    ) -> Result<()> {
1265        if let Some(task) = compact_task {
1266            let mut guard = self.compaction.write().await;
1267            guard.compact_task_assignment.insert(
1268                task_id,
1269                CompactTaskAssignment {
1270                    compact_task: Some(task.into()),
1271                    context_id: 0,
1272                },
1273            );
1274        }
1275
1276        // In the test, the contents of the compact task may have been modified directly, while the contents of compact_task_assignment were not modified.
1277        // So we pass the modified compact_task directly into the `report_compact_task_impl`
1278        self.report_compact_tasks(vec![ReportTask {
1279            task_id,
1280            task_status,
1281            sorted_output_ssts,
1282            table_stats_change: table_stats_change.unwrap_or_default(),
1283            object_timestamps: HashMap::default(),
1284        }])
1285        .await?;
1286        Ok(())
1287    }
1288}
1289
1290#[derive(Debug, Default)]
1291pub struct CompactionState {
1292    scheduled: Mutex<HashSet<(CompactionGroupId, compact_task::TaskType)>>,
1293}
1294
1295impl CompactionState {
1296    pub fn new() -> Self {
1297        Self {
1298            scheduled: Default::default(),
1299        }
1300    }
1301
1302    /// Enqueues only if the target is not yet in queue.
1303    pub fn try_sched_compaction(
1304        &self,
1305        compaction_group: CompactionGroupId,
1306        task_type: TaskType,
1307    ) -> std::result::Result<bool, SendError<CompactionRequestChannelItem>> {
1308        let mut guard = self.scheduled.lock();
1309        let key = (compaction_group, task_type);
1310        if guard.contains(&key) {
1311            return Ok(false);
1312        }
1313        guard.insert(key);
1314        Ok(true)
1315    }
1316
1317    pub fn unschedule(
1318        &self,
1319        compaction_group: CompactionGroupId,
1320        task_type: compact_task::TaskType,
1321    ) {
1322        self.scheduled.lock().remove(&(compaction_group, task_type));
1323    }
1324
1325    pub fn auto_pick_type(&self, group: CompactionGroupId) -> Option<TaskType> {
1326        let guard = self.scheduled.lock();
1327        if guard.contains(&(group, compact_task::TaskType::Dynamic)) {
1328            Some(compact_task::TaskType::Dynamic)
1329        } else if guard.contains(&(group, compact_task::TaskType::SpaceReclaim)) {
1330            Some(compact_task::TaskType::SpaceReclaim)
1331        } else if guard.contains(&(group, compact_task::TaskType::Ttl)) {
1332            Some(compact_task::TaskType::Ttl)
1333        } else if guard.contains(&(group, compact_task::TaskType::Tombstone)) {
1334            Some(compact_task::TaskType::Tombstone)
1335        } else if guard.contains(&(group, compact_task::TaskType::VnodeWatermark)) {
1336            Some(compact_task::TaskType::VnodeWatermark)
1337        } else {
1338            None
1339        }
1340    }
1341}
1342
1343impl Compaction {
1344    pub fn get_compact_task_assignments_by_group_id(
1345        &self,
1346        compaction_group_id: CompactionGroupId,
1347    ) -> Vec<CompactTaskAssignment> {
1348        self.compact_task_assignment
1349            .iter()
1350            .filter_map(|(_, assignment)| {
1351                if assignment
1352                    .compact_task
1353                    .as_ref()
1354                    .is_some_and(|task| task.compaction_group_id == compaction_group_id)
1355                {
1356                    Some(CompactTaskAssignment {
1357                        compact_task: assignment.compact_task.clone(),
1358                        context_id: assignment.context_id,
1359                    })
1360                } else {
1361                    None
1362                }
1363            })
1364            .collect()
1365    }
1366}
1367
1368#[derive(Clone, Default)]
1369pub struct CompactionGroupStatistic {
1370    pub group_id: CompactionGroupId,
1371    pub group_size: u64,
1372    pub table_statistic: BTreeMap<StateTableId, u64>,
1373    pub compaction_group_config: CompactionGroup,
1374}
1375
1376/// Updates table stats caused by vnode watermark trivial reclaim compaction.
1377fn update_table_stats_for_vnode_watermark_trivial_reclaim(
1378    table_stats: &mut PbTableStatsMap,
1379    task: &CompactTask,
1380) {
1381    if task.task_type != TaskType::VnodeWatermark {
1382        return;
1383    }
1384    let mut deleted_table_keys: HashMap<TableId, u64> = HashMap::default();
1385    for s in task.input_ssts.iter().flat_map(|l| l.table_infos.iter()) {
1386        assert_eq!(s.table_ids.len(), 1);
1387        let e = deleted_table_keys.entry(s.table_ids[0]).or_insert(0);
1388        *e += s.total_key_count;
1389    }
1390    for (table_id, delete_count) in deleted_table_keys {
1391        let Some(stats) = table_stats.get_mut(&table_id.as_raw_id()) else {
1392            continue;
1393        };
1394        if stats.total_key_count == 0 {
1395            continue;
1396        }
1397        let new_total_key_count = stats.total_key_count.saturating_sub(delete_count as i64);
1398        let ratio = new_total_key_count as f64 / stats.total_key_count as f64;
1399        // total_key_count is updated accurately.
1400        stats.total_key_count = new_total_key_count;
1401        // others are updated approximately.
1402        stats.total_key_size = (stats.total_key_size as f64 * ratio).ceil() as i64;
1403        stats.total_value_size = (stats.total_value_size as f64 * ratio).ceil() as i64;
1404    }
1405}
1406
1407#[derive(Debug, Clone)]
1408pub enum GroupState {
1409    /// The compaction group is not in emergency state.
1410    Normal,
1411
1412    /// The compaction group is in emergency state.
1413    Emergency(String), // reason
1414
1415    /// The compaction group is in write stop state.
1416    WriteStop(String), // reason
1417}
1418
1419impl GroupState {
1420    pub fn is_write_stop(&self) -> bool {
1421        matches!(self, Self::WriteStop(_))
1422    }
1423
1424    pub fn is_emergency(&self) -> bool {
1425        matches!(self, Self::Emergency(_))
1426    }
1427
1428    pub fn reason(&self) -> Option<&str> {
1429        match self {
1430            Self::Emergency(reason) | Self::WriteStop(reason) => Some(reason),
1431            _ => None,
1432        }
1433    }
1434}
1435
1436#[derive(Clone, Default)]
1437pub struct GroupStateValidator;
1438
1439impl GroupStateValidator {
1440    pub fn write_stop_sub_level_count(
1441        level_count: usize,
1442        compaction_config: &CompactionConfig,
1443    ) -> bool {
1444        let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize;
1445        level_count > threshold
1446    }
1447
1448    pub fn write_stop_l0_size(l0_size: u64, compaction_config: &CompactionConfig) -> bool {
1449        l0_size
1450            > compaction_config
1451                .level0_stop_write_threshold_max_size
1452                .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1453    }
1454
1455    pub fn write_stop_l0_file_count(
1456        l0_file_count: usize,
1457        compaction_config: &CompactionConfig,
1458    ) -> bool {
1459        l0_file_count
1460            > compaction_config
1461                .level0_stop_write_threshold_max_sst_count
1462                .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1463                as usize
1464    }
1465
1466    pub fn emergency_l0_file_count(
1467        l0_file_count: usize,
1468        compaction_config: &CompactionConfig,
1469    ) -> bool {
1470        l0_file_count
1471            > compaction_config
1472                .emergency_level0_sst_file_count
1473                .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1474                as usize
1475    }
1476
1477    pub fn emergency_l0_partition_count(
1478        last_l0_sub_level_partition_count: usize,
1479        compaction_config: &CompactionConfig,
1480    ) -> bool {
1481        last_l0_sub_level_partition_count
1482            > compaction_config
1483                .emergency_level0_sub_level_partition
1484                .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1485                as usize
1486    }
1487
1488    pub fn check_single_group_write_stop(
1489        levels: &Levels,
1490        compaction_config: &CompactionConfig,
1491    ) -> GroupState {
1492        if Self::write_stop_sub_level_count(levels.l0.sub_levels.len(), compaction_config) {
1493            return GroupState::WriteStop(format!(
1494                "WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels",
1495                levels.l0.sub_levels.len(),
1496                compaction_config.level0_stop_write_threshold_sub_level_number
1497            ));
1498        }
1499
1500        if Self::write_stop_l0_file_count(
1501            levels
1502                .l0
1503                .sub_levels
1504                .iter()
1505                .map(|l| l.table_infos.len())
1506                .sum(),
1507            compaction_config,
1508        ) {
1509            return GroupState::WriteStop(format!(
1510                "WriteStop(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1511                levels
1512                    .l0
1513                    .sub_levels
1514                    .iter()
1515                    .map(|l| l.table_infos.len())
1516                    .sum::<usize>(),
1517                compaction_config
1518                    .level0_stop_write_threshold_max_sst_count
1519                    .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1520            ));
1521        }
1522
1523        if Self::write_stop_l0_size(levels.l0.total_file_size, compaction_config) {
1524            return GroupState::WriteStop(format!(
1525                "WriteStop(l0_size: {}, threshold: {}) too large L0 size",
1526                levels.l0.total_file_size,
1527                compaction_config
1528                    .level0_stop_write_threshold_max_size
1529                    .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1530            ));
1531        }
1532
1533        GroupState::Normal
1534    }
1535
1536    pub fn check_single_group_emergency(
1537        levels: &Levels,
1538        compaction_config: &CompactionConfig,
1539    ) -> GroupState {
1540        if Self::emergency_l0_file_count(
1541            levels
1542                .l0
1543                .sub_levels
1544                .iter()
1545                .map(|l| l.table_infos.len())
1546                .sum(),
1547            compaction_config,
1548        ) {
1549            return GroupState::Emergency(format!(
1550                "Emergency(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1551                levels
1552                    .l0
1553                    .sub_levels
1554                    .iter()
1555                    .map(|l| l.table_infos.len())
1556                    .sum::<usize>(),
1557                compaction_config
1558                    .emergency_level0_sst_file_count
1559                    .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1560            ));
1561        }
1562
1563        if Self::emergency_l0_partition_count(
1564            levels
1565                .l0
1566                .sub_levels
1567                .first()
1568                .map(|l| l.table_infos.len())
1569                .unwrap_or(0),
1570            compaction_config,
1571        ) {
1572            return GroupState::Emergency(format!(
1573                "Emergency(l0_partition_count: {}, threshold: {}) too many L0 partitions",
1574                levels
1575                    .l0
1576                    .sub_levels
1577                    .first()
1578                    .map(|l| l.table_infos.len())
1579                    .unwrap_or(0),
1580                compaction_config
1581                    .emergency_level0_sub_level_partition
1582                    .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1583            ));
1584        }
1585
1586        GroupState::Normal
1587    }
1588
1589    pub fn group_state(levels: &Levels, compaction_config: &CompactionConfig) -> GroupState {
1590        let state = Self::check_single_group_write_stop(levels, compaction_config);
1591        if state.is_write_stop() {
1592            return state;
1593        }
1594
1595        Self::check_single_group_emergency(levels, compaction_config)
1596    }
1597}