risingwave_meta/hummock/manager/compaction/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Copyright 2025 RisingWave Labs
16//
17// Licensed under the Apache License, Version 2.0 (the "License");
18// you may not use this file except in compliance with the License.
19// You may obtain a copy of the License at
20//
21//     http://www.apache.org/licenses/LICENSE-2.0
22//
23// Unless required by applicable law or agreed to in writing, software
24// distributed under the License is distributed on an "AS IS" BASIS,
25// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
26// See the License for the specific language governing permissions and
27// limitations under the License.
28
29use std::collections::{BTreeMap, HashMap, HashSet};
30use std::sync::{Arc, LazyLock};
31use std::time::Instant;
32
33use anyhow::Context;
34use compaction_event_loop::{
35    HummockCompactionEventDispatcher, HummockCompactionEventHandler, HummockCompactionEventLoop,
36    HummockCompactorDedicatedEventLoop,
37};
38use fail::fail_point;
39use itertools::Itertools;
40use parking_lot::Mutex;
41use rand::rng as thread_rng;
42use rand::seq::SliceRandom;
43use risingwave_common::config::meta::default::compaction_config;
44use risingwave_common::util::epoch::Epoch;
45use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask};
46use risingwave_hummock_sdk::compaction_group::StateTableId;
47use risingwave_hummock_sdk::key_range::KeyRange;
48use risingwave_hummock_sdk::level::Levels;
49use risingwave_hummock_sdk::sstable_info::SstableInfo;
50use risingwave_hummock_sdk::table_stats::{
51    PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats,
52};
53use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
54use risingwave_hummock_sdk::version::{GroupDelta, IntraLevelDelta};
55use risingwave_hummock_sdk::{
56    CompactionGroupId, HummockCompactionTaskId, HummockSstableId, HummockSstableObjectId,
57    HummockVersionId, compact_task_to_string, statistics_compact_task,
58};
59use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
60use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
61use risingwave_pb::hummock::{
62    CompactTaskAssignment, CompactionConfig, PbCompactStatus, PbCompactTaskAssignment,
63    SubscribeCompactionEventRequest, TableOption, TableSchema, compact_task,
64};
65use thiserror_ext::AsReport;
66use tokio::sync::RwLockWriteGuard;
67use tokio::sync::mpsc::UnboundedReceiver;
68use tokio::sync::mpsc::error::SendError;
69use tokio::sync::oneshot::Sender;
70use tokio::task::JoinHandle;
71use tonic::Streaming;
72use tracing::warn;
73
74use crate::hummock::compaction::selector::level_selector::PickerInfo;
75use crate::hummock::compaction::selector::{
76    DynamicLevelSelector, DynamicLevelSelectorCore, LocalSelectorStatistic, ManualCompactionOption,
77    ManualCompactionSelector, SpaceReclaimCompactionSelector, TombstoneCompactionSelector,
78    TtlCompactionSelector, VnodeWatermarkCompactionSelector,
79};
80use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig, CompactionSelector};
81use crate::hummock::error::{Error, Result};
82use crate::hummock::manager::transaction::{
83    HummockVersionStatsTransaction, HummockVersionTransaction,
84};
85use crate::hummock::manager::versioning::Versioning;
86use crate::hummock::metrics_utils::{
87    build_compact_task_level_type_metrics_label, trigger_compact_tasks_stat,
88    trigger_local_table_stat,
89};
90use crate::hummock::model::CompactionGroup;
91use crate::hummock::sequence::next_compaction_task_id;
92use crate::hummock::{HummockManager, commit_multi_var, start_measure_real_process_timer};
93use crate::manager::META_NODE_ID;
94use crate::model::BTreeMapTransaction;
95
96pub mod compaction_event_loop;
97pub mod compaction_group_manager;
98pub mod compaction_group_schedule;
99
100static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
101    [
102        TaskStatus::ManualCanceled,
103        TaskStatus::SendFailCanceled,
104        TaskStatus::AssignFailCanceled,
105        TaskStatus::HeartbeatCanceled,
106        TaskStatus::InvalidGroupCanceled,
107        TaskStatus::NoAvailMemoryResourceCanceled,
108        TaskStatus::NoAvailCpuResourceCanceled,
109        TaskStatus::HeartbeatProgressCanceled,
110    ]
111    .into_iter()
112    .collect()
113});
114
115type CompactionRequestChannelItem = (CompactionGroupId, compact_task::TaskType);
116
117fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> {
118    let mut compaction_selectors: HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> =
119        HashMap::default();
120    compaction_selectors.insert(
121        compact_task::TaskType::Dynamic,
122        Box::<DynamicLevelSelector>::default(),
123    );
124    compaction_selectors.insert(
125        compact_task::TaskType::SpaceReclaim,
126        Box::<SpaceReclaimCompactionSelector>::default(),
127    );
128    compaction_selectors.insert(
129        compact_task::TaskType::Ttl,
130        Box::<TtlCompactionSelector>::default(),
131    );
132    compaction_selectors.insert(
133        compact_task::TaskType::Tombstone,
134        Box::<TombstoneCompactionSelector>::default(),
135    );
136    compaction_selectors.insert(
137        compact_task::TaskType::VnodeWatermark,
138        Box::<VnodeWatermarkCompactionSelector>::default(),
139    );
140    compaction_selectors
141}
142
143impl HummockVersionTransaction<'_> {
144    fn apply_compact_task(&mut self, compact_task: &CompactTask) {
145        let mut version_delta = self.new_delta();
146        let trivial_move = compact_task.is_trivial_move_task();
147        version_delta.trivial_move = trivial_move;
148
149        let group_deltas = &mut version_delta
150            .group_deltas
151            .entry(compact_task.compaction_group_id)
152            .or_default()
153            .group_deltas;
154        let mut removed_table_ids_map: BTreeMap<u32, HashSet<HummockSstableId>> =
155            BTreeMap::default();
156
157        for level in &compact_task.input_ssts {
158            let level_idx = level.level_idx;
159
160            removed_table_ids_map
161                .entry(level_idx)
162                .or_default()
163                .extend(level.table_infos.iter().map(|sst| sst.sst_id));
164        }
165
166        for (level_idx, removed_table_ids) in removed_table_ids_map {
167            let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
168                level_idx,
169                0, // default
170                removed_table_ids,
171                vec![], // default
172                0,      // default
173                compact_task.compaction_group_version_id,
174            ));
175
176            group_deltas.push(group_delta);
177        }
178
179        let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
180            compact_task.target_level,
181            compact_task.target_sub_level_id,
182            HashSet::new(), // default
183            compact_task.sorted_output_ssts.clone(),
184            compact_task.split_weight_by_vnode,
185            compact_task.compaction_group_version_id,
186        ));
187
188        group_deltas.push(group_delta);
189        version_delta.pre_apply();
190    }
191}
192
193#[derive(Default)]
194pub struct Compaction {
195    /// Compaction task that is already assigned to a compactor
196    pub compact_task_assignment: BTreeMap<HummockCompactionTaskId, PbCompactTaskAssignment>,
197    /// `CompactStatus` of each compaction group
198    pub compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus>,
199
200    pub _deterministic_mode: bool,
201}
202
203impl HummockManager {
204    pub async fn get_assigned_compact_task_num(&self) -> u64 {
205        self.compaction.read().await.compact_task_assignment.len() as u64
206    }
207
208    pub async fn list_compaction_status(
209        &self,
210    ) -> (Vec<PbCompactStatus>, Vec<CompactTaskAssignment>) {
211        let compaction = self.compaction.read().await;
212        (
213            compaction.compaction_statuses.values().map_into().collect(),
214            compaction
215                .compact_task_assignment
216                .values()
217                .cloned()
218                .collect(),
219        )
220    }
221
222    pub async fn get_compaction_scores(
223        &self,
224        compaction_group_id: CompactionGroupId,
225    ) -> Vec<PickerInfo> {
226        let (status, levels, group) = {
227            let compaction = self.compaction.read().await;
228            let versioning = self.versioning.read().await;
229            let config_manager = self.compaction_group_manager.read().await;
230            match (
231                compaction.compaction_statuses.get(&compaction_group_id),
232                versioning.current_version.levels.get(&compaction_group_id),
233                config_manager.try_get_compaction_group_config(compaction_group_id),
234            ) {
235                (Some(cs), Some(v), Some(cf)) => (cs.to_owned(), v.to_owned(), cf),
236                _ => {
237                    return vec![];
238                }
239            }
240        };
241        let dynamic_level_core = DynamicLevelSelectorCore::new(
242            group.compaction_config,
243            Arc::new(CompactionDeveloperConfig::default()),
244        );
245        let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers);
246        ctx.score_levels
247    }
248}
249
250impl HummockManager {
251    pub fn compaction_event_loop(
252        hummock_manager: Arc<Self>,
253        compactor_streams_change_rx: UnboundedReceiver<(
254            u32,
255            Streaming<SubscribeCompactionEventRequest>,
256        )>,
257    ) -> Vec<(JoinHandle<()>, Sender<()>)> {
258        let mut join_handle_vec = Vec::default();
259
260        let hummock_compaction_event_handler =
261            HummockCompactionEventHandler::new(hummock_manager.clone());
262
263        let dedicated_event_loop = HummockCompactorDedicatedEventLoop::new(
264            hummock_manager.clone(),
265            hummock_compaction_event_handler.clone(),
266        );
267
268        let (dedicated_event_loop_join_handle, event_tx, shutdown_tx) = dedicated_event_loop.run();
269        join_handle_vec.push((dedicated_event_loop_join_handle, shutdown_tx));
270
271        let hummock_compaction_event_dispatcher = HummockCompactionEventDispatcher::new(
272            hummock_manager.env.opts.clone(),
273            hummock_compaction_event_handler,
274            Some(event_tx),
275        );
276
277        let event_loop = HummockCompactionEventLoop::new(
278            hummock_compaction_event_dispatcher,
279            hummock_manager.metrics.clone(),
280            compactor_streams_change_rx,
281        );
282
283        let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
284        join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
285
286        join_handle_vec
287    }
288
289    pub fn add_compactor_stream(
290        &self,
291        context_id: u32,
292        req_stream: Streaming<SubscribeCompactionEventRequest>,
293    ) {
294        self.compactor_streams_change_tx
295            .send((context_id, req_stream))
296            .unwrap();
297    }
298
299    pub async fn auto_pick_compaction_group_and_type(
300        &self,
301    ) -> Option<(CompactionGroupId, compact_task::TaskType)> {
302        let mut compaction_group_ids = self.compaction_group_ids().await;
303        compaction_group_ids.shuffle(&mut thread_rng());
304
305        for cg_id in compaction_group_ids {
306            if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) {
307                return Some((cg_id, pick_type));
308            }
309        }
310
311        None
312    }
313
314    /// This method will return all compaction group id in a random order and task type. If there are any group block by `write_limit`, it will return a single array with `TaskType::Emergency`.
315    /// If these groups get different task-type, it will return all group id with `TaskType::Dynamic` if the first group get `TaskType::Dynamic`, otherwise it will return the single group with other task type.
316    async fn auto_pick_compaction_groups_and_type(
317        &self,
318    ) -> (Vec<CompactionGroupId>, compact_task::TaskType) {
319        let mut compaction_group_ids = self.compaction_group_ids().await;
320        compaction_group_ids.shuffle(&mut thread_rng());
321
322        let mut normal_groups = vec![];
323        for cg_id in compaction_group_ids {
324            if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) {
325                if pick_type == TaskType::Dynamic {
326                    normal_groups.push(cg_id);
327                } else if normal_groups.is_empty() {
328                    return (vec![cg_id], pick_type);
329                }
330            }
331        }
332        (normal_groups, TaskType::Dynamic)
333    }
334}
335
336impl HummockManager {
337    pub async fn get_compact_tasks_impl(
338        &self,
339        compaction_groups: Vec<CompactionGroupId>,
340        max_select_count: usize,
341        selector: &mut Box<dyn CompactionSelector>,
342    ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
343        let deterministic_mode = self.env.opts.compaction_deterministic_test;
344
345        let mut compaction_guard = self.compaction.write().await;
346        let compaction: &mut Compaction = &mut compaction_guard;
347        let mut versioning_guard = self.versioning.write().await;
348        let versioning: &mut Versioning = &mut versioning_guard;
349
350        let _timer = start_measure_real_process_timer!(self, "get_compact_tasks_impl");
351
352        let start_time = Instant::now();
353        let mut compaction_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
354
355        let mut compact_task_assignment =
356            BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
357
358        let mut version = HummockVersionTransaction::new(
359            &mut versioning.current_version,
360            &mut versioning.hummock_version_deltas,
361            self.env.notification_manager(),
362            None,
363            &self.metrics,
364        );
365        // Apply stats changes.
366        let mut version_stats = HummockVersionStatsTransaction::new(
367            &mut versioning.version_stats,
368            self.env.notification_manager(),
369        );
370
371        if deterministic_mode {
372            version.disable_apply_to_txn();
373        }
374        let all_versioned_table_schemas = if self.env.opts.enable_dropped_column_reclaim {
375            self.metadata_manager
376                .catalog_controller
377                .get_versioned_table_schemas()
378                .await
379                .map_err(|e| Error::Internal(e.into()))?
380        } else {
381            HashMap::default()
382        };
383        let mut unschedule_groups = vec![];
384        let mut trivial_tasks = vec![];
385        let mut pick_tasks = vec![];
386        let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts(
387            &self.env.opts,
388        ));
389        'outside: for compaction_group_id in compaction_groups {
390            if pick_tasks.len() >= max_select_count {
391                break;
392            }
393
394            if !version
395                .latest_version()
396                .levels
397                .contains_key(&compaction_group_id)
398            {
399                continue;
400            }
401
402            // When the last table of a compaction group is deleted, the compaction group (and its
403            // config) is destroyed as well. Then a compaction task for this group may come later and
404            // cannot find its config.
405            let group_config = {
406                let config_manager = self.compaction_group_manager.read().await;
407
408                match config_manager.try_get_compaction_group_config(compaction_group_id) {
409                    Some(config) => config,
410                    None => continue,
411                }
412            };
413
414            // StoredIdGenerator already implements ids pre-allocation by ID_PREALLOCATE_INTERVAL.
415            let task_id = next_compaction_task_id(&self.env).await?;
416
417            if !compaction_statuses.contains_key(&compaction_group_id) {
418                // lazy initialize.
419                compaction_statuses.insert(
420                    compaction_group_id,
421                    CompactStatus::new(
422                        compaction_group_id,
423                        group_config.compaction_config.max_level,
424                    ),
425                );
426            }
427            let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap();
428
429            let can_trivial_move = matches!(selector.task_type(), TaskType::Dynamic)
430                || matches!(selector.task_type(), TaskType::Emergency);
431
432            let mut stats = LocalSelectorStatistic::default();
433            let member_table_ids: Vec<_> = version
434                .latest_version()
435                .state_table_info
436                .compaction_group_member_table_ids(compaction_group_id)
437                .iter()
438                .map(|table_id| table_id.table_id)
439                .collect();
440
441            let mut table_id_to_option: HashMap<u32, _> = HashMap::default();
442
443            {
444                let guard = self.table_id_to_table_option.read();
445                for table_id in &member_table_ids {
446                    if let Some(opts) = guard.get(table_id) {
447                        table_id_to_option.insert(*table_id, *opts);
448                    }
449                }
450            }
451
452            while let Some(compact_task) = compact_status.get_compact_task(
453                version
454                    .latest_version()
455                    .get_compaction_group_levels(compaction_group_id),
456                version
457                    .latest_version()
458                    .state_table_info
459                    .compaction_group_member_table_ids(compaction_group_id),
460                task_id as HummockCompactionTaskId,
461                &group_config,
462                &mut stats,
463                selector,
464                &table_id_to_option,
465                developer_config.clone(),
466                &version.latest_version().table_watermarks,
467                &version.latest_version().state_table_info,
468            ) {
469                let target_level_id = compact_task.input.target_level as u32;
470                let compaction_group_version_id = version
471                    .latest_version()
472                    .get_compaction_group_levels(compaction_group_id)
473                    .compaction_group_version_id;
474                let compression_algorithm = match compact_task.compression_algorithm.as_str() {
475                    "Lz4" => 1,
476                    "Zstd" => 2,
477                    _ => 0,
478                };
479                let vnode_partition_count = compact_task.input.vnode_partition_count;
480                let mut compact_task = CompactTask {
481                    input_ssts: compact_task.input.input_levels,
482                    splits: vec![KeyRange::inf()],
483                    sorted_output_ssts: vec![],
484                    task_id,
485                    target_level: target_level_id,
486                    // only gc delete keys in last level because there may be older version in more bottom
487                    // level.
488                    gc_delete_keys: version
489                        .latest_version()
490                        .get_compaction_group_levels(compaction_group_id)
491                        .is_last_level(target_level_id),
492                    base_level: compact_task.base_level as u32,
493                    task_status: TaskStatus::Pending,
494                    compaction_group_id: group_config.group_id,
495                    compaction_group_version_id,
496                    existing_table_ids: member_table_ids.clone(),
497                    compression_algorithm,
498                    target_file_size: compact_task.target_file_size,
499                    table_options: table_id_to_option
500                        .iter()
501                        .map(|(table_id, table_option)| {
502                            (*table_id, TableOption::from(table_option))
503                        })
504                        .collect(),
505                    current_epoch_time: Epoch::now().0,
506                    compaction_filter_mask: group_config.compaction_config.compaction_filter_mask,
507                    target_sub_level_id: compact_task.input.target_sub_level_id,
508                    task_type: compact_task.compaction_task_type,
509                    split_weight_by_vnode: vnode_partition_count,
510                    max_sub_compaction: group_config.compaction_config.max_sub_compaction,
511                    ..Default::default()
512                };
513
514                let is_trivial_reclaim = compact_task.is_trivial_reclaim();
515                let is_trivial_move = compact_task.is_trivial_move_task();
516                if is_trivial_reclaim || (is_trivial_move && can_trivial_move) {
517                    let log_label = if is_trivial_reclaim {
518                        "TrivialReclaim"
519                    } else {
520                        "TrivialMove"
521                    };
522                    let label = if is_trivial_reclaim {
523                        "trivial-space-reclaim"
524                    } else {
525                        "trivial-move"
526                    };
527
528                    tracing::debug!(
529                        "{} for compaction group {}: input: {:?}, cost time: {:?}",
530                        log_label,
531                        compact_task.compaction_group_id,
532                        compact_task.input_ssts,
533                        start_time.elapsed()
534                    );
535                    compact_task.task_status = TaskStatus::Success;
536                    compact_status.report_compact_task(&compact_task);
537                    if !is_trivial_reclaim {
538                        compact_task
539                            .sorted_output_ssts
540                            .clone_from(&compact_task.input_ssts[0].table_infos);
541                    }
542                    update_table_stats_for_vnode_watermark_trivial_reclaim(
543                        &mut version_stats.table_stats,
544                        &compact_task,
545                    );
546                    self.metrics
547                        .compact_frequency
548                        .with_label_values(&[
549                            label,
550                            &compact_task.compaction_group_id.to_string(),
551                            selector.task_type().as_str_name(),
552                            "SUCCESS",
553                        ])
554                        .inc();
555
556                    version.apply_compact_task(&compact_task);
557                    trivial_tasks.push(compact_task);
558                    if trivial_tasks.len() >= self.env.opts.max_trivial_move_task_count_per_loop {
559                        break 'outside;
560                    }
561                } else {
562                    self.calculate_vnode_partition(
563                        &mut compact_task,
564                        group_config.compaction_config.as_ref(),
565                    );
566
567                    let table_ids_to_be_compacted = compact_task.build_compact_table_ids();
568
569                    let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks) = version
570                        .latest_version()
571                        .safe_epoch_table_watermarks(&table_ids_to_be_compacted)
572                        .into_iter()
573                        .partition(|(_table_id, table_watermarke)| {
574                            matches!(
575                                table_watermarke.watermark_type,
576                                WatermarkSerdeType::PkPrefix
577                            )
578                        });
579
580                    compact_task.pk_prefix_table_watermarks = pk_prefix_table_watermarks;
581                    compact_task.non_pk_prefix_table_watermarks = non_pk_prefix_table_watermarks;
582
583                    compact_task.table_schemas = compact_task
584                        .existing_table_ids
585                        .iter()
586                        .filter_map(|table_id| {
587                            let id = (*table_id).try_into().unwrap();
588                            all_versioned_table_schemas.get(&id).map(|column_ids| {
589                                (
590                                    *table_id,
591                                    TableSchema {
592                                        column_ids: column_ids.clone(),
593                                    },
594                                )
595                            })
596                        })
597                        .collect();
598
599                    compact_task_assignment.insert(
600                        compact_task.task_id,
601                        CompactTaskAssignment {
602                            compact_task: Some(compact_task.clone().into()),
603                            context_id: META_NODE_ID, // deprecated
604                        },
605                    );
606
607                    pick_tasks.push(compact_task);
608                    break;
609                }
610
611                stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
612                stats = LocalSelectorStatistic::default();
613            }
614            if pick_tasks
615                .last()
616                .map(|task| task.compaction_group_id != compaction_group_id)
617                .unwrap_or(true)
618            {
619                unschedule_groups.push(compaction_group_id);
620            }
621            stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
622        }
623
624        if !trivial_tasks.is_empty() {
625            commit_multi_var!(
626                self.meta_store_ref(),
627                compaction_statuses,
628                compact_task_assignment,
629                version,
630                version_stats
631            )?;
632            self.metrics
633                .compact_task_batch_count
634                .with_label_values(&["batch_trivial_move"])
635                .observe(trivial_tasks.len() as f64);
636
637            for trivial_task in &trivial_tasks {
638                self.metrics
639                    .compact_task_trivial_move_sst_count
640                    .with_label_values(&[&trivial_task.compaction_group_id.to_string()])
641                    .observe(trivial_task.input_ssts[0].table_infos.len() as _);
642            }
643
644            drop(versioning_guard);
645        } else {
646            // We are using a single transaction to ensure that each task has progress when it is
647            // created.
648            drop(versioning_guard);
649            commit_multi_var!(
650                self.meta_store_ref(),
651                compaction_statuses,
652                compact_task_assignment
653            )?;
654        }
655        drop(compaction_guard);
656        if !pick_tasks.is_empty() {
657            self.metrics
658                .compact_task_batch_count
659                .with_label_values(&["batch_get_compact_task"])
660                .observe(pick_tasks.len() as f64);
661        }
662
663        for compact_task in &mut pick_tasks {
664            let compaction_group_id = compact_task.compaction_group_id;
665
666            // Initiate heartbeat for the task to track its progress.
667            self.compactor_manager
668                .initiate_task_heartbeat(compact_task.clone());
669
670            // this task has been finished.
671            compact_task.task_status = TaskStatus::Pending;
672            let compact_task_statistics = statistics_compact_task(compact_task);
673
674            let level_type_label = build_compact_task_level_type_metrics_label(
675                compact_task.input_ssts[0].level_idx as usize,
676                compact_task.input_ssts.last().unwrap().level_idx as usize,
677            );
678
679            let level_count = compact_task.input_ssts.len();
680            if compact_task.input_ssts[0].level_idx == 0 {
681                self.metrics
682                    .l0_compact_level_count
683                    .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
684                    .observe(level_count as _);
685            }
686
687            self.metrics
688                .compact_task_size
689                .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
690                .observe(compact_task_statistics.total_file_size as _);
691
692            self.metrics
693                .compact_task_size
694                .with_label_values(&[
695                    &compaction_group_id.to_string(),
696                    &format!("{} uncompressed", level_type_label),
697                ])
698                .observe(compact_task_statistics.total_uncompressed_file_size as _);
699
700            self.metrics
701                .compact_task_file_count
702                .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
703                .observe(compact_task_statistics.total_file_count as _);
704
705            tracing::trace!(
706                "For compaction group {}: pick up {} {} sub_level in level {} to compact to target {}. cost time: {:?} compact_task_statistics {:?}",
707                compaction_group_id,
708                level_count,
709                compact_task.input_ssts[0].level_type.as_str_name(),
710                compact_task.input_ssts[0].level_idx,
711                compact_task.target_level,
712                start_time.elapsed(),
713                compact_task_statistics
714            );
715        }
716
717        #[cfg(test)]
718        {
719            self.check_state_consistency().await;
720        }
721        pick_tasks.extend(trivial_tasks);
722        Ok((pick_tasks, unschedule_groups))
723    }
724
725    /// Cancels a compaction task no matter it's assigned or unassigned.
726    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
727        fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore(
728            anyhow::anyhow!("failpoint metastore err")
729        )));
730        let ret = self
731            .cancel_compact_task_impl(vec![task_id], task_status)
732            .await?;
733        Ok(ret[0])
734    }
735
736    pub async fn cancel_compact_tasks(
737        &self,
738        tasks: Vec<u64>,
739        task_status: TaskStatus,
740    ) -> Result<Vec<bool>> {
741        self.cancel_compact_task_impl(tasks, task_status).await
742    }
743
744    async fn cancel_compact_task_impl(
745        &self,
746        task_ids: Vec<u64>,
747        task_status: TaskStatus,
748    ) -> Result<Vec<bool>> {
749        assert!(CANCEL_STATUS_SET.contains(&task_status));
750        let tasks = task_ids
751            .into_iter()
752            .map(|task_id| ReportTask {
753                task_id,
754                task_status,
755                sorted_output_ssts: vec![],
756                table_stats_change: HashMap::default(),
757                object_timestamps: HashMap::default(),
758            })
759            .collect_vec();
760        let rets = self.report_compact_tasks(tasks).await?;
761        #[cfg(test)]
762        {
763            self.check_state_consistency().await;
764        }
765        Ok(rets)
766    }
767
768    async fn get_compact_tasks(
769        &self,
770        mut compaction_groups: Vec<CompactionGroupId>,
771        max_select_count: usize,
772        selector: &mut Box<dyn CompactionSelector>,
773    ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
774        fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
775            anyhow::anyhow!("failpoint metastore error")
776        )));
777        compaction_groups.shuffle(&mut thread_rng());
778        let (mut tasks, groups) = self
779            .get_compact_tasks_impl(compaction_groups, max_select_count, selector)
780            .await?;
781        tasks.retain(|task| {
782            if task.task_status == TaskStatus::Success {
783                debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
784                false
785            } else {
786                true
787            }
788        });
789        Ok((tasks, groups))
790    }
791
792    pub async fn get_compact_task(
793        &self,
794        compaction_group_id: CompactionGroupId,
795        selector: &mut Box<dyn CompactionSelector>,
796    ) -> Result<Option<CompactTask>> {
797        fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
798            anyhow::anyhow!("failpoint metastore error")
799        )));
800
801        let (normal_tasks, _) = self
802            .get_compact_tasks_impl(vec![compaction_group_id], 1, selector)
803            .await?;
804        for task in normal_tasks {
805            if task.task_status != TaskStatus::Success {
806                return Ok(Some(task));
807            }
808            debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
809        }
810        Ok(None)
811    }
812
813    pub async fn manual_get_compact_task(
814        &self,
815        compaction_group_id: CompactionGroupId,
816        manual_compaction_option: ManualCompactionOption,
817    ) -> Result<Option<CompactTask>> {
818        let mut selector: Box<dyn CompactionSelector> =
819            Box::new(ManualCompactionSelector::new(manual_compaction_option));
820        self.get_compact_task(compaction_group_id, &mut selector)
821            .await
822    }
823
824    pub async fn report_compact_task(
825        &self,
826        task_id: u64,
827        task_status: TaskStatus,
828        sorted_output_ssts: Vec<SstableInfo>,
829        table_stats_change: Option<PbTableStatsMap>,
830        object_timestamps: HashMap<HummockSstableObjectId, u64>,
831    ) -> Result<bool> {
832        let rets = self
833            .report_compact_tasks(vec![ReportTask {
834                task_id,
835                task_status,
836                sorted_output_ssts,
837                table_stats_change: table_stats_change.unwrap_or_default(),
838                object_timestamps,
839            }])
840            .await?;
841        Ok(rets[0])
842    }
843
844    pub async fn report_compact_tasks(&self, report_tasks: Vec<ReportTask>) -> Result<Vec<bool>> {
845        let compaction_guard = self.compaction.write().await;
846        let versioning_guard = self.versioning.write().await;
847
848        self.report_compact_tasks_impl(report_tasks, compaction_guard, versioning_guard)
849            .await
850    }
851
852    /// Finishes or cancels a compaction task, according to `task_status`.
853    ///
854    /// If `context_id` is not None, its validity will be checked when writing meta store.
855    /// Its ownership of the task is checked as well.
856    ///
857    /// Return Ok(false) indicates either the task is not found,
858    /// or the task is not owned by `context_id` when `context_id` is not None.
859    pub async fn report_compact_tasks_impl(
860        &self,
861        report_tasks: Vec<ReportTask>,
862        mut compaction_guard: RwLockWriteGuard<'_, Compaction>,
863        mut versioning_guard: RwLockWriteGuard<'_, Versioning>,
864    ) -> Result<Vec<bool>> {
865        let deterministic_mode = self.env.opts.compaction_deterministic_test;
866        let compaction: &mut Compaction = &mut compaction_guard;
867        let start_time = Instant::now();
868        let original_keys = compaction.compaction_statuses.keys().cloned().collect_vec();
869        let mut compact_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
870        let mut rets = vec![false; report_tasks.len()];
871        let mut compact_task_assignment =
872            BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
873        // The compaction task is finished.
874        let versioning: &mut Versioning = &mut versioning_guard;
875        let _timer = start_measure_real_process_timer!(self, "report_compact_tasks");
876
877        // purge stale compact_status
878        for group_id in original_keys {
879            if !versioning.current_version.levels.contains_key(&group_id) {
880                compact_statuses.remove(group_id);
881            }
882        }
883        let mut tasks = vec![];
884
885        let mut version = HummockVersionTransaction::new(
886            &mut versioning.current_version,
887            &mut versioning.hummock_version_deltas,
888            self.env.notification_manager(),
889            None,
890            &self.metrics,
891        );
892
893        if deterministic_mode {
894            version.disable_apply_to_txn();
895        }
896
897        let mut version_stats = HummockVersionStatsTransaction::new(
898            &mut versioning.version_stats,
899            self.env.notification_manager(),
900        );
901        let mut success_count = 0;
902        for (idx, task) in report_tasks.into_iter().enumerate() {
903            rets[idx] = true;
904            let mut compact_task = match compact_task_assignment.remove(task.task_id) {
905                Some(compact_task) => CompactTask::from(compact_task.compact_task.unwrap()),
906                None => {
907                    tracing::warn!("{}", format!("compact task {} not found", task.task_id));
908                    rets[idx] = false;
909                    continue;
910                }
911            };
912
913            {
914                // apply result
915                compact_task.task_status = task.task_status;
916                compact_task.sorted_output_ssts = task.sorted_output_ssts;
917            }
918
919            match compact_statuses.get_mut(compact_task.compaction_group_id) {
920                Some(mut compact_status) => {
921                    compact_status.report_compact_task(&compact_task);
922                }
923                None => {
924                    // When the group_id is not found in the compaction_statuses, it means the group has been removed.
925                    // The task is invalid and should be canceled.
926                    // e.g.
927                    // 1. The group is removed by the user unregistering the tables
928                    // 2. The group is removed by the group scheduling algorithm
929                    compact_task.task_status = TaskStatus::InvalidGroupCanceled;
930                }
931            }
932
933            let is_success = if let TaskStatus::Success = compact_task.task_status {
934                match self
935                    .report_compaction_sanity_check(&task.object_timestamps)
936                    .await
937                {
938                    Err(e) => {
939                        warn!(
940                            "failed to commit compaction task {} {}",
941                            compact_task.task_id,
942                            e.as_report()
943                        );
944                        compact_task.task_status = TaskStatus::RetentionTimeRejected;
945                        false
946                    }
947                    _ => {
948                        let group = version
949                            .latest_version()
950                            .levels
951                            .get(&compact_task.compaction_group_id)
952                            .unwrap();
953                        let is_expired = compact_task.is_expired(group.compaction_group_version_id);
954                        if is_expired {
955                            compact_task.task_status = TaskStatus::InputOutdatedCanceled;
956                            warn!(
957                                "The task may be expired because of group split, task:\n {:?}",
958                                compact_task_to_string(&compact_task)
959                            );
960                        }
961                        !is_expired
962                    }
963                }
964            } else {
965                false
966            };
967            if is_success {
968                success_count += 1;
969                version.apply_compact_task(&compact_task);
970                if purge_prost_table_stats(&mut version_stats.table_stats, version.latest_version())
971                {
972                    self.metrics.version_stats.reset();
973                    versioning.local_metrics.clear();
974                }
975                add_prost_table_stats_map(&mut version_stats.table_stats, &task.table_stats_change);
976                trigger_local_table_stat(
977                    &self.metrics,
978                    &mut versioning.local_metrics,
979                    &version_stats,
980                    &task.table_stats_change,
981                );
982            }
983            tasks.push(compact_task);
984        }
985        if success_count > 0 {
986            commit_multi_var!(
987                self.meta_store_ref(),
988                compact_statuses,
989                compact_task_assignment,
990                version,
991                version_stats
992            )?;
993
994            self.metrics
995                .compact_task_batch_count
996                .with_label_values(&["batch_report_task"])
997                .observe(success_count as f64);
998        } else {
999            // The compaction task is cancelled or failed.
1000            commit_multi_var!(
1001                self.meta_store_ref(),
1002                compact_statuses,
1003                compact_task_assignment
1004            )?;
1005        }
1006
1007        let mut success_groups = vec![];
1008        for compact_task in &tasks {
1009            self.compactor_manager
1010                .remove_task_heartbeat(compact_task.task_id);
1011            tracing::trace!(
1012                "Reported compaction task. {}. cost time: {:?}",
1013                compact_task_to_string(compact_task),
1014                start_time.elapsed(),
1015            );
1016
1017            if !deterministic_mode
1018                && (matches!(compact_task.task_type, compact_task::TaskType::Dynamic)
1019                    || matches!(compact_task.task_type, compact_task::TaskType::Emergency))
1020            {
1021                // only try send Dynamic compaction
1022                self.try_send_compaction_request(
1023                    compact_task.compaction_group_id,
1024                    compact_task::TaskType::Dynamic,
1025                );
1026            }
1027
1028            if compact_task.task_status == TaskStatus::Success {
1029                success_groups.push(compact_task.compaction_group_id);
1030            }
1031        }
1032
1033        trigger_compact_tasks_stat(
1034            &self.metrics,
1035            &tasks,
1036            &compaction.compaction_statuses,
1037            &versioning_guard.current_version,
1038        );
1039        drop(versioning_guard);
1040        if !success_groups.is_empty() {
1041            self.try_update_write_limits(&success_groups).await;
1042        }
1043        Ok(rets)
1044    }
1045
1046    /// Triggers compacitons to specified compaction groups.
1047    /// Don't wait for compaction finish
1048    pub async fn trigger_compaction_deterministic(
1049        &self,
1050        _base_version_id: HummockVersionId,
1051        compaction_groups: Vec<CompactionGroupId>,
1052    ) -> Result<()> {
1053        self.on_current_version(|old_version| {
1054            tracing::info!(
1055                "Trigger compaction for version {}, groups {:?}",
1056                old_version.id,
1057                compaction_groups
1058            );
1059        })
1060        .await;
1061
1062        if compaction_groups.is_empty() {
1063            return Ok(());
1064        }
1065        for compaction_group in compaction_groups {
1066            self.try_send_compaction_request(compaction_group, compact_task::TaskType::Dynamic);
1067        }
1068        Ok(())
1069    }
1070
1071    pub async fn trigger_manual_compaction(
1072        &self,
1073        compaction_group: CompactionGroupId,
1074        manual_compaction_option: ManualCompactionOption,
1075    ) -> Result<()> {
1076        let start_time = Instant::now();
1077
1078        // 1. Get idle compactor.
1079        let compactor = match self.compactor_manager.next_compactor() {
1080            Some(compactor) => compactor,
1081            None => {
1082                tracing::warn!("trigger_manual_compaction No compactor is available.");
1083                return Err(anyhow::anyhow!(
1084                    "trigger_manual_compaction No compactor is available. compaction_group {}",
1085                    compaction_group
1086                )
1087                .into());
1088            }
1089        };
1090
1091        // 2. Get manual compaction task.
1092        let compact_task = self
1093            .manual_get_compact_task(compaction_group, manual_compaction_option)
1094            .await;
1095        let compact_task = match compact_task {
1096            Ok(Some(compact_task)) => compact_task,
1097            Ok(None) => {
1098                // No compaction task available.
1099                return Err(anyhow::anyhow!(
1100                    "trigger_manual_compaction No compaction_task is available. compaction_group {}",
1101                    compaction_group
1102                )
1103                    .into());
1104            }
1105            Err(err) => {
1106                tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
1107
1108                return Err(anyhow::anyhow!(err)
1109                    .context(format!(
1110                        "Failed to get compaction task for compaction_group {}",
1111                        compaction_group,
1112                    ))
1113                    .into());
1114            }
1115        };
1116
1117        // 3. send task to compactor
1118        let compact_task_string = compact_task_to_string(&compact_task);
1119        // TODO: shall we need to cancel on meta ?
1120        compactor
1121            .send_event(ResponseEvent::CompactTask(compact_task.into()))
1122            .with_context(|| {
1123                format!(
1124                    "Failed to trigger compaction task for compaction_group {}",
1125                    compaction_group,
1126                )
1127            })?;
1128
1129        tracing::info!(
1130            "Trigger manual compaction task. {}. cost time: {:?}",
1131            &compact_task_string,
1132            start_time.elapsed(),
1133        );
1134
1135        Ok(())
1136    }
1137
1138    /// Sends a compaction request.
1139    pub fn try_send_compaction_request(
1140        &self,
1141        compaction_group: CompactionGroupId,
1142        task_type: compact_task::TaskType,
1143    ) -> bool {
1144        match self
1145            .compaction_state
1146            .try_sched_compaction(compaction_group, task_type)
1147        {
1148            Ok(_) => true,
1149            Err(e) => {
1150                tracing::error!(
1151                    error = %e.as_report(),
1152                    "failed to send compaction request for compaction group {}",
1153                    compaction_group,
1154                );
1155                false
1156            }
1157        }
1158    }
1159
1160    pub(crate) fn calculate_vnode_partition(
1161        &self,
1162        compact_task: &mut CompactTask,
1163        compaction_config: &CompactionConfig,
1164    ) {
1165        // do not split sst by vnode partition when target_level > base_level
1166        // The purpose of data alignment is mainly to improve the parallelism of base level compaction and reduce write amplification.
1167        // However, at high level, the size of the sst file is often larger and only contains the data of a single table_id, so there is no need to cut it.
1168        if compact_task.target_level > compact_task.base_level {
1169            return;
1170        }
1171        if compaction_config.split_weight_by_vnode > 0 {
1172            for table_id in &compact_task.existing_table_ids {
1173                compact_task
1174                    .table_vnode_partition
1175                    .insert(*table_id, compact_task.split_weight_by_vnode);
1176            }
1177        } else {
1178            let mut table_size_info: HashMap<u32, u64> = HashMap::default();
1179            let mut existing_table_ids: HashSet<u32> = HashSet::default();
1180            for input_ssts in &compact_task.input_ssts {
1181                for sst in &input_ssts.table_infos {
1182                    existing_table_ids.extend(sst.table_ids.iter());
1183                    for table_id in &sst.table_ids {
1184                        *table_size_info.entry(*table_id).or_default() +=
1185                            sst.sst_size / (sst.table_ids.len() as u64);
1186                    }
1187                }
1188            }
1189            compact_task
1190                .existing_table_ids
1191                .retain(|table_id| existing_table_ids.contains(table_id));
1192
1193            let hybrid_vnode_count = self.env.opts.hybrid_partition_node_count;
1194            let default_partition_count = self.env.opts.partition_vnode_count;
1195            // We must ensure the partition threshold large enough to avoid too many small files.
1196            let compact_task_table_size_partition_threshold_low = self
1197                .env
1198                .opts
1199                .compact_task_table_size_partition_threshold_low;
1200            let compact_task_table_size_partition_threshold_high = self
1201                .env
1202                .opts
1203                .compact_task_table_size_partition_threshold_high;
1204            // check latest write throughput
1205            let table_write_throughput_statistic_manager =
1206                self.table_write_throughput_statistic_manager.read();
1207            let timestamp = chrono::Utc::now().timestamp();
1208            for (table_id, compact_table_size) in table_size_info {
1209                let write_throughput = table_write_throughput_statistic_manager
1210                    .get_table_throughput_descending(table_id, timestamp)
1211                    .peekable()
1212                    .peek()
1213                    .map(|item| item.throughput)
1214                    .unwrap_or(0);
1215                if compact_table_size > compact_task_table_size_partition_threshold_high
1216                    && default_partition_count > 0
1217                {
1218                    compact_task
1219                        .table_vnode_partition
1220                        .insert(table_id, default_partition_count);
1221                } else if (compact_table_size > compact_task_table_size_partition_threshold_low
1222                    || (write_throughput > self.env.opts.table_high_write_throughput_threshold
1223                        && compact_table_size > compaction_config.target_file_size_base))
1224                    && hybrid_vnode_count > 0
1225                {
1226                    // partition for large write throughput table. But we also need to make sure that it can not be too small.
1227                    compact_task
1228                        .table_vnode_partition
1229                        .insert(table_id, hybrid_vnode_count);
1230                } else if compact_table_size > compaction_config.target_file_size_base {
1231                    // partition for small table
1232                    compact_task.table_vnode_partition.insert(table_id, 1);
1233                }
1234            }
1235            compact_task
1236                .table_vnode_partition
1237                .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id));
1238        }
1239    }
1240
1241    pub fn compactor_manager_ref(&self) -> crate::hummock::CompactorManagerRef {
1242        self.compactor_manager.clone()
1243    }
1244}
1245
1246#[cfg(any(test, feature = "test"))]
1247impl HummockManager {
1248    pub async fn compaction_task_from_assignment_for_test(
1249        &self,
1250        task_id: u64,
1251    ) -> Option<CompactTaskAssignment> {
1252        let compaction_guard = self.compaction.read().await;
1253        let assignment_ref = &compaction_guard.compact_task_assignment;
1254        assignment_ref.get(&task_id).cloned()
1255    }
1256
1257    pub async fn report_compact_task_for_test(
1258        &self,
1259        task_id: u64,
1260        compact_task: Option<CompactTask>,
1261        task_status: TaskStatus,
1262        sorted_output_ssts: Vec<SstableInfo>,
1263        table_stats_change: Option<PbTableStatsMap>,
1264    ) -> Result<()> {
1265        if let Some(task) = compact_task {
1266            let mut guard = self.compaction.write().await;
1267            guard.compact_task_assignment.insert(
1268                task_id,
1269                CompactTaskAssignment {
1270                    compact_task: Some(task.into()),
1271                    context_id: 0,
1272                },
1273            );
1274        }
1275
1276        // In the test, the contents of the compact task may have been modified directly, while the contents of compact_task_assignment were not modified.
1277        // So we pass the modified compact_task directly into the `report_compact_task_impl`
1278        self.report_compact_tasks(vec![ReportTask {
1279            task_id,
1280            task_status,
1281            sorted_output_ssts,
1282            table_stats_change: table_stats_change.unwrap_or_default(),
1283            object_timestamps: HashMap::default(),
1284        }])
1285        .await?;
1286        Ok(())
1287    }
1288}
1289
1290#[derive(Debug, Default)]
1291pub struct CompactionState {
1292    scheduled: Mutex<HashSet<(CompactionGroupId, compact_task::TaskType)>>,
1293}
1294
1295impl CompactionState {
1296    pub fn new() -> Self {
1297        Self {
1298            scheduled: Default::default(),
1299        }
1300    }
1301
1302    /// Enqueues only if the target is not yet in queue.
1303    pub fn try_sched_compaction(
1304        &self,
1305        compaction_group: CompactionGroupId,
1306        task_type: TaskType,
1307    ) -> std::result::Result<bool, SendError<CompactionRequestChannelItem>> {
1308        let mut guard = self.scheduled.lock();
1309        let key = (compaction_group, task_type);
1310        if guard.contains(&key) {
1311            return Ok(false);
1312        }
1313        guard.insert(key);
1314        Ok(true)
1315    }
1316
1317    pub fn unschedule(
1318        &self,
1319        compaction_group: CompactionGroupId,
1320        task_type: compact_task::TaskType,
1321    ) {
1322        self.scheduled.lock().remove(&(compaction_group, task_type));
1323    }
1324
1325    pub fn auto_pick_type(&self, group: CompactionGroupId) -> Option<TaskType> {
1326        let guard = self.scheduled.lock();
1327        if guard.contains(&(group, compact_task::TaskType::Dynamic)) {
1328            Some(compact_task::TaskType::Dynamic)
1329        } else if guard.contains(&(group, compact_task::TaskType::SpaceReclaim)) {
1330            Some(compact_task::TaskType::SpaceReclaim)
1331        } else if guard.contains(&(group, compact_task::TaskType::Ttl)) {
1332            Some(compact_task::TaskType::Ttl)
1333        } else if guard.contains(&(group, compact_task::TaskType::Tombstone)) {
1334            Some(compact_task::TaskType::Tombstone)
1335        } else if guard.contains(&(group, compact_task::TaskType::VnodeWatermark)) {
1336            Some(compact_task::TaskType::VnodeWatermark)
1337        } else {
1338            None
1339        }
1340    }
1341}
1342
1343impl Compaction {
1344    pub fn get_compact_task_assignments_by_group_id(
1345        &self,
1346        compaction_group_id: CompactionGroupId,
1347    ) -> Vec<CompactTaskAssignment> {
1348        self.compact_task_assignment
1349            .iter()
1350            .filter_map(|(_, assignment)| {
1351                if assignment
1352                    .compact_task
1353                    .as_ref()
1354                    .is_some_and(|task| task.compaction_group_id == compaction_group_id)
1355                {
1356                    Some(CompactTaskAssignment {
1357                        compact_task: assignment.compact_task.clone(),
1358                        context_id: assignment.context_id,
1359                    })
1360                } else {
1361                    None
1362                }
1363            })
1364            .collect()
1365    }
1366}
1367
1368#[derive(Clone, Default)]
1369pub struct CompactionGroupStatistic {
1370    pub group_id: CompactionGroupId,
1371    pub group_size: u64,
1372    pub table_statistic: BTreeMap<StateTableId, u64>,
1373    pub compaction_group_config: CompactionGroup,
1374}
1375
1376/// Updates table stats caused by vnode watermark trivial reclaim compaction.
1377fn update_table_stats_for_vnode_watermark_trivial_reclaim(
1378    table_stats: &mut PbTableStatsMap,
1379    task: &CompactTask,
1380) {
1381    if task.task_type != TaskType::VnodeWatermark {
1382        return;
1383    }
1384    let mut deleted_table_keys: HashMap<u32, u64> = HashMap::default();
1385    for s in task.input_ssts.iter().flat_map(|l| l.table_infos.iter()) {
1386        assert_eq!(s.table_ids.len(), 1);
1387        let e = deleted_table_keys.entry(s.table_ids[0]).or_insert(0);
1388        *e += s.total_key_count;
1389    }
1390    for (table_id, delete_count) in deleted_table_keys {
1391        let Some(stats) = table_stats.get_mut(&table_id) else {
1392            continue;
1393        };
1394        if stats.total_key_count == 0 {
1395            continue;
1396        }
1397        let new_total_key_count = stats.total_key_count.saturating_sub(delete_count as i64);
1398        let ratio = new_total_key_count as f64 / stats.total_key_count as f64;
1399        // total_key_count is updated accurately.
1400        stats.total_key_count = new_total_key_count;
1401        // others are updated approximately.
1402        stats.total_key_size = (stats.total_key_size as f64 * ratio).ceil() as i64;
1403        stats.total_value_size = (stats.total_value_size as f64 * ratio).ceil() as i64;
1404    }
1405}
1406
1407#[derive(Debug, Clone)]
1408pub enum GroupState {
1409    /// The compaction group is not in emergency state.
1410    Normal,
1411
1412    /// The compaction group is in emergency state.
1413    Emergency(String), // reason
1414
1415    /// The compaction group is in write stop state.
1416    WriteStop(String), // reason
1417}
1418
1419impl GroupState {
1420    pub fn is_write_stop(&self) -> bool {
1421        matches!(self, Self::WriteStop(_))
1422    }
1423
1424    pub fn is_emergency(&self) -> bool {
1425        matches!(self, Self::Emergency(_))
1426    }
1427
1428    pub fn reason(&self) -> Option<&str> {
1429        match self {
1430            Self::Emergency(reason) | Self::WriteStop(reason) => Some(reason),
1431            _ => None,
1432        }
1433    }
1434}
1435
1436#[derive(Clone, Default)]
1437pub struct GroupStateValidator;
1438
1439impl GroupStateValidator {
1440    pub fn write_stop_sub_level_count(
1441        level_count: usize,
1442        compaction_config: &CompactionConfig,
1443    ) -> bool {
1444        let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize;
1445        level_count > threshold
1446    }
1447
1448    pub fn write_stop_l0_size(l0_size: u64, compaction_config: &CompactionConfig) -> bool {
1449        l0_size
1450            > compaction_config
1451                .level0_stop_write_threshold_max_size
1452                .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1453    }
1454
1455    pub fn write_stop_l0_file_count(
1456        l0_file_count: usize,
1457        compaction_config: &CompactionConfig,
1458    ) -> bool {
1459        l0_file_count
1460            > compaction_config
1461                .level0_stop_write_threshold_max_sst_count
1462                .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1463                as usize
1464    }
1465
1466    pub fn emergency_l0_file_count(
1467        l0_file_count: usize,
1468        compaction_config: &CompactionConfig,
1469    ) -> bool {
1470        l0_file_count
1471            > compaction_config
1472                .emergency_level0_sst_file_count
1473                .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1474                as usize
1475    }
1476
1477    pub fn emergency_l0_partition_count(
1478        last_l0_sub_level_partition_count: usize,
1479        compaction_config: &CompactionConfig,
1480    ) -> bool {
1481        last_l0_sub_level_partition_count
1482            > compaction_config
1483                .emergency_level0_sub_level_partition
1484                .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1485                as usize
1486    }
1487
1488    pub fn check_single_group_write_stop(
1489        levels: &Levels,
1490        compaction_config: &CompactionConfig,
1491    ) -> GroupState {
1492        if Self::write_stop_sub_level_count(levels.l0.sub_levels.len(), compaction_config) {
1493            return GroupState::WriteStop(format!(
1494                "WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels",
1495                levels.l0.sub_levels.len(),
1496                compaction_config.level0_stop_write_threshold_sub_level_number
1497            ));
1498        }
1499
1500        if Self::write_stop_l0_file_count(
1501            levels
1502                .l0
1503                .sub_levels
1504                .iter()
1505                .map(|l| l.table_infos.len())
1506                .sum(),
1507            compaction_config,
1508        ) {
1509            return GroupState::WriteStop(format!(
1510                "WriteStop(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1511                levels
1512                    .l0
1513                    .sub_levels
1514                    .iter()
1515                    .map(|l| l.table_infos.len())
1516                    .sum::<usize>(),
1517                compaction_config
1518                    .level0_stop_write_threshold_max_sst_count
1519                    .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1520            ));
1521        }
1522
1523        if Self::write_stop_l0_size(levels.l0.total_file_size, compaction_config) {
1524            return GroupState::WriteStop(format!(
1525                "WriteStop(l0_size: {}, threshold: {}) too large L0 size",
1526                levels.l0.total_file_size,
1527                compaction_config
1528                    .level0_stop_write_threshold_max_size
1529                    .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1530            ));
1531        }
1532
1533        GroupState::Normal
1534    }
1535
1536    pub fn check_single_group_emergency(
1537        levels: &Levels,
1538        compaction_config: &CompactionConfig,
1539    ) -> GroupState {
1540        if Self::emergency_l0_file_count(
1541            levels
1542                .l0
1543                .sub_levels
1544                .iter()
1545                .map(|l| l.table_infos.len())
1546                .sum(),
1547            compaction_config,
1548        ) {
1549            return GroupState::Emergency(format!(
1550                "Emergency(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1551                levels
1552                    .l0
1553                    .sub_levels
1554                    .iter()
1555                    .map(|l| l.table_infos.len())
1556                    .sum::<usize>(),
1557                compaction_config
1558                    .emergency_level0_sst_file_count
1559                    .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1560            ));
1561        }
1562
1563        if Self::emergency_l0_partition_count(
1564            levels
1565                .l0
1566                .sub_levels
1567                .first()
1568                .map(|l| l.table_infos.len())
1569                .unwrap_or(0),
1570            compaction_config,
1571        ) {
1572            return GroupState::Emergency(format!(
1573                "Emergency(l0_partition_count: {}, threshold: {}) too many L0 partitions",
1574                levels
1575                    .l0
1576                    .sub_levels
1577                    .first()
1578                    .map(|l| l.table_infos.len())
1579                    .unwrap_or(0),
1580                compaction_config
1581                    .emergency_level0_sub_level_partition
1582                    .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1583            ));
1584        }
1585
1586        GroupState::Normal
1587    }
1588
1589    pub fn group_state(levels: &Levels, compaction_config: &CompactionConfig) -> GroupState {
1590        let state = Self::check_single_group_write_stop(levels, compaction_config);
1591        if state.is_write_stop() {
1592            return state;
1593        }
1594
1595        Self::check_single_group_emergency(levels, compaction_config)
1596    }
1597}