risingwave_meta/hummock/manager/compaction/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Copyright 2025 RisingWave Labs
16//
17// Licensed under the Apache License, Version 2.0 (the "License");
18// you may not use this file except in compliance with the License.
19// You may obtain a copy of the License at
20//
21//     http://www.apache.org/licenses/LICENSE-2.0
22//
23// Unless required by applicable law or agreed to in writing, software
24// distributed under the License is distributed on an "AS IS" BASIS,
25// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
26// See the License for the specific language governing permissions and
27// limitations under the License.
28
29use std::collections::{BTreeMap, HashMap, HashSet};
30use std::sync::{Arc, LazyLock};
31use std::time::Instant;
32
33use anyhow::Context;
34use compaction_event_loop::{
35    HummockCompactionEventDispatcher, HummockCompactionEventHandler, HummockCompactionEventLoop,
36    HummockCompactorDedicatedEventLoop,
37};
38use fail::fail_point;
39use itertools::Itertools;
40use parking_lot::Mutex;
41use rand::rng as thread_rng;
42use rand::seq::SliceRandom;
43use risingwave_common::config::default::compaction_config;
44use risingwave_common::util::epoch::Epoch;
45use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask};
46use risingwave_hummock_sdk::compaction_group::StateTableId;
47use risingwave_hummock_sdk::key_range::KeyRange;
48use risingwave_hummock_sdk::level::Levels;
49use risingwave_hummock_sdk::sstable_info::SstableInfo;
50use risingwave_hummock_sdk::table_stats::{
51    PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats,
52};
53use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
54use risingwave_hummock_sdk::version::{GroupDelta, IntraLevelDelta};
55use risingwave_hummock_sdk::{
56    CompactionGroupId, HummockCompactionTaskId, HummockSstableId, HummockSstableObjectId,
57    HummockVersionId, compact_task_to_string, statistics_compact_task,
58};
59use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
60use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
61use risingwave_pb::hummock::{
62    CompactTaskAssignment, CompactionConfig, PbCompactStatus, PbCompactTaskAssignment,
63    SubscribeCompactionEventRequest, TableOption, TableSchema, compact_task,
64};
65use thiserror_ext::AsReport;
66use tokio::sync::RwLockWriteGuard;
67use tokio::sync::mpsc::UnboundedReceiver;
68use tokio::sync::mpsc::error::SendError;
69use tokio::sync::oneshot::Sender;
70use tokio::task::JoinHandle;
71use tonic::Streaming;
72use tracing::warn;
73
74use crate::hummock::compaction::selector::level_selector::PickerInfo;
75use crate::hummock::compaction::selector::{
76    DynamicLevelSelector, DynamicLevelSelectorCore, LocalSelectorStatistic, ManualCompactionOption,
77    ManualCompactionSelector, SpaceReclaimCompactionSelector, TombstoneCompactionSelector,
78    TtlCompactionSelector, VnodeWatermarkCompactionSelector,
79};
80use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig, CompactionSelector};
81use crate::hummock::error::{Error, Result};
82use crate::hummock::manager::transaction::{
83    HummockVersionStatsTransaction, HummockVersionTransaction,
84};
85use crate::hummock::manager::versioning::Versioning;
86use crate::hummock::metrics_utils::{
87    build_compact_task_level_type_metrics_label, trigger_compact_tasks_stat,
88    trigger_local_table_stat,
89};
90use crate::hummock::model::CompactionGroup;
91use crate::hummock::sequence::next_compaction_task_id;
92use crate::hummock::{HummockManager, commit_multi_var, start_measure_real_process_timer};
93use crate::manager::META_NODE_ID;
94use crate::model::BTreeMapTransaction;
95
96pub mod compaction_event_loop;
97pub mod compaction_group_manager;
98pub mod compaction_group_schedule;
99
100static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
101    [
102        TaskStatus::ManualCanceled,
103        TaskStatus::SendFailCanceled,
104        TaskStatus::AssignFailCanceled,
105        TaskStatus::HeartbeatCanceled,
106        TaskStatus::InvalidGroupCanceled,
107        TaskStatus::NoAvailMemoryResourceCanceled,
108        TaskStatus::NoAvailCpuResourceCanceled,
109        TaskStatus::HeartbeatProgressCanceled,
110    ]
111    .into_iter()
112    .collect()
113});
114
115type CompactionRequestChannelItem = (CompactionGroupId, compact_task::TaskType);
116
117fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> {
118    let mut compaction_selectors: HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> =
119        HashMap::default();
120    compaction_selectors.insert(
121        compact_task::TaskType::Dynamic,
122        Box::<DynamicLevelSelector>::default(),
123    );
124    compaction_selectors.insert(
125        compact_task::TaskType::SpaceReclaim,
126        Box::<SpaceReclaimCompactionSelector>::default(),
127    );
128    compaction_selectors.insert(
129        compact_task::TaskType::Ttl,
130        Box::<TtlCompactionSelector>::default(),
131    );
132    compaction_selectors.insert(
133        compact_task::TaskType::Tombstone,
134        Box::<TombstoneCompactionSelector>::default(),
135    );
136    compaction_selectors.insert(
137        compact_task::TaskType::VnodeWatermark,
138        Box::<VnodeWatermarkCompactionSelector>::default(),
139    );
140    compaction_selectors
141}
142
143impl HummockVersionTransaction<'_> {
144    fn apply_compact_task(&mut self, compact_task: &CompactTask) {
145        let mut version_delta = self.new_delta();
146        let trivial_move = compact_task.is_trivial_move_task();
147        version_delta.trivial_move = trivial_move;
148
149        let group_deltas = &mut version_delta
150            .group_deltas
151            .entry(compact_task.compaction_group_id)
152            .or_default()
153            .group_deltas;
154        let mut removed_table_ids_map: BTreeMap<u32, HashSet<HummockSstableId>> =
155            BTreeMap::default();
156
157        for level in &compact_task.input_ssts {
158            let level_idx = level.level_idx;
159
160            removed_table_ids_map
161                .entry(level_idx)
162                .or_default()
163                .extend(level.table_infos.iter().map(|sst| sst.sst_id));
164        }
165
166        for (level_idx, removed_table_ids) in removed_table_ids_map {
167            let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
168                level_idx,
169                0, // default
170                removed_table_ids,
171                vec![], // default
172                0,      // default
173                compact_task.compaction_group_version_id,
174            ));
175
176            group_deltas.push(group_delta);
177        }
178
179        let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
180            compact_task.target_level,
181            compact_task.target_sub_level_id,
182            HashSet::new(), // default
183            compact_task.sorted_output_ssts.clone(),
184            compact_task.split_weight_by_vnode,
185            compact_task.compaction_group_version_id,
186        ));
187
188        group_deltas.push(group_delta);
189        version_delta.pre_apply();
190    }
191}
192
193#[derive(Default)]
194pub struct Compaction {
195    /// Compaction task that is already assigned to a compactor
196    pub compact_task_assignment: BTreeMap<HummockCompactionTaskId, PbCompactTaskAssignment>,
197    /// `CompactStatus` of each compaction group
198    pub compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus>,
199
200    pub _deterministic_mode: bool,
201}
202
203impl HummockManager {
204    pub async fn get_assigned_compact_task_num(&self) -> u64 {
205        self.compaction.read().await.compact_task_assignment.len() as u64
206    }
207
208    pub async fn list_compaction_status(
209        &self,
210    ) -> (Vec<PbCompactStatus>, Vec<CompactTaskAssignment>) {
211        let compaction = self.compaction.read().await;
212        (
213            compaction.compaction_statuses.values().map_into().collect(),
214            compaction
215                .compact_task_assignment
216                .values()
217                .cloned()
218                .collect(),
219        )
220    }
221
222    pub async fn get_compaction_scores(
223        &self,
224        compaction_group_id: CompactionGroupId,
225    ) -> Vec<PickerInfo> {
226        let (status, levels, group) = {
227            let compaction = self.compaction.read().await;
228            let versioning = self.versioning.read().await;
229            let config_manager = self.compaction_group_manager.read().await;
230            match (
231                compaction.compaction_statuses.get(&compaction_group_id),
232                versioning.current_version.levels.get(&compaction_group_id),
233                config_manager.try_get_compaction_group_config(compaction_group_id),
234            ) {
235                (Some(cs), Some(v), Some(cf)) => (cs.to_owned(), v.to_owned(), cf),
236                _ => {
237                    return vec![];
238                }
239            }
240        };
241        let dynamic_level_core = DynamicLevelSelectorCore::new(
242            group.compaction_config,
243            Arc::new(CompactionDeveloperConfig::default()),
244        );
245        let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers);
246        ctx.score_levels
247    }
248}
249
250impl HummockManager {
251    pub fn compaction_event_loop(
252        hummock_manager: Arc<Self>,
253        compactor_streams_change_rx: UnboundedReceiver<(
254            u32,
255            Streaming<SubscribeCompactionEventRequest>,
256        )>,
257    ) -> Vec<(JoinHandle<()>, Sender<()>)> {
258        let mut join_handle_vec = Vec::default();
259
260        let hummock_compaction_event_handler =
261            HummockCompactionEventHandler::new(hummock_manager.clone());
262
263        let dedicated_event_loop = HummockCompactorDedicatedEventLoop::new(
264            hummock_manager.clone(),
265            hummock_compaction_event_handler.clone(),
266        );
267
268        let (dedicated_event_loop_join_handle, event_tx, shutdown_tx) = dedicated_event_loop.run();
269        join_handle_vec.push((dedicated_event_loop_join_handle, shutdown_tx));
270
271        let hummock_compaction_event_dispatcher = HummockCompactionEventDispatcher::new(
272            hummock_manager.env.opts.clone(),
273            hummock_compaction_event_handler,
274            Some(event_tx),
275        );
276
277        let event_loop = HummockCompactionEventLoop::new(
278            hummock_compaction_event_dispatcher,
279            hummock_manager.metrics.clone(),
280            compactor_streams_change_rx,
281        );
282
283        let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
284        join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
285
286        join_handle_vec
287    }
288
289    pub fn add_compactor_stream(
290        &self,
291        context_id: u32,
292        req_stream: Streaming<SubscribeCompactionEventRequest>,
293    ) {
294        self.compactor_streams_change_tx
295            .send((context_id, req_stream))
296            .unwrap();
297    }
298
299    pub async fn auto_pick_compaction_group_and_type(
300        &self,
301    ) -> Option<(CompactionGroupId, compact_task::TaskType)> {
302        let mut compaction_group_ids = self.compaction_group_ids().await;
303        compaction_group_ids.shuffle(&mut thread_rng());
304
305        for cg_id in compaction_group_ids {
306            if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) {
307                return Some((cg_id, pick_type));
308            }
309        }
310
311        None
312    }
313
314    /// This method will return all compaction group id in a random order and task type. If there are any group block by `write_limit`, it will return a single array with `TaskType::Emergency`.
315    /// If these groups get different task-type, it will return all group id with `TaskType::Dynamic` if the first group get `TaskType::Dynamic`, otherwise it will return the single group with other task type.
316    async fn auto_pick_compaction_groups_and_type(
317        &self,
318    ) -> (Vec<CompactionGroupId>, compact_task::TaskType) {
319        let mut compaction_group_ids = self.compaction_group_ids().await;
320        compaction_group_ids.shuffle(&mut thread_rng());
321
322        let mut normal_groups = vec![];
323        for cg_id in compaction_group_ids {
324            if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) {
325                if pick_type == TaskType::Dynamic {
326                    normal_groups.push(cg_id);
327                } else if normal_groups.is_empty() {
328                    return (vec![cg_id], pick_type);
329                }
330            }
331        }
332        (normal_groups, TaskType::Dynamic)
333    }
334}
335
336impl HummockManager {
337    pub async fn get_compact_tasks_impl(
338        &self,
339        compaction_groups: Vec<CompactionGroupId>,
340        max_select_count: usize,
341        selector: &mut Box<dyn CompactionSelector>,
342    ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
343        let deterministic_mode = self.env.opts.compaction_deterministic_test;
344
345        let mut compaction_guard = self.compaction.write().await;
346        let compaction: &mut Compaction = &mut compaction_guard;
347        let mut versioning_guard = self.versioning.write().await;
348        let versioning: &mut Versioning = &mut versioning_guard;
349
350        let _timer = start_measure_real_process_timer!(self, "get_compact_tasks_impl");
351
352        let start_time = Instant::now();
353        let mut compaction_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
354
355        let mut compact_task_assignment =
356            BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
357
358        let mut version = HummockVersionTransaction::new(
359            &mut versioning.current_version,
360            &mut versioning.hummock_version_deltas,
361            self.env.notification_manager(),
362            None,
363            &self.metrics,
364        );
365        // Apply stats changes.
366        let mut version_stats = HummockVersionStatsTransaction::new(
367            &mut versioning.version_stats,
368            self.env.notification_manager(),
369        );
370
371        if deterministic_mode {
372            version.disable_apply_to_txn();
373        }
374        let all_versioned_table_schemas = if self.env.opts.enable_dropped_column_reclaim {
375            self.metadata_manager
376                .catalog_controller
377                .get_versioned_table_schemas()
378                .await
379                .map_err(|e| Error::Internal(e.into()))?
380        } else {
381            HashMap::default()
382        };
383        let mut unschedule_groups = vec![];
384        let mut trivial_tasks = vec![];
385        let mut pick_tasks = vec![];
386        let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts(
387            &self.env.opts,
388        ));
389        'outside: for compaction_group_id in compaction_groups {
390            if pick_tasks.len() >= max_select_count {
391                break;
392            }
393
394            if !version
395                .latest_version()
396                .levels
397                .contains_key(&compaction_group_id)
398            {
399                continue;
400            }
401
402            // When the last table of a compaction group is deleted, the compaction group (and its
403            // config) is destroyed as well. Then a compaction task for this group may come later and
404            // cannot find its config.
405            let group_config = {
406                let config_manager = self.compaction_group_manager.read().await;
407
408                match config_manager.try_get_compaction_group_config(compaction_group_id) {
409                    Some(config) => config,
410                    None => continue,
411                }
412            };
413
414            // StoredIdGenerator already implements ids pre-allocation by ID_PREALLOCATE_INTERVAL.
415            let task_id = next_compaction_task_id(&self.env).await?;
416
417            if !compaction_statuses.contains_key(&compaction_group_id) {
418                // lazy initialize.
419                compaction_statuses.insert(
420                    compaction_group_id,
421                    CompactStatus::new(
422                        compaction_group_id,
423                        group_config.compaction_config.max_level,
424                    ),
425                );
426            }
427            let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap();
428
429            let can_trivial_move = matches!(selector.task_type(), TaskType::Dynamic)
430                || matches!(selector.task_type(), TaskType::Emergency);
431
432            let mut stats = LocalSelectorStatistic::default();
433            let member_table_ids: Vec<_> = version
434                .latest_version()
435                .state_table_info
436                .compaction_group_member_table_ids(compaction_group_id)
437                .iter()
438                .map(|table_id| table_id.table_id)
439                .collect();
440
441            let mut table_id_to_option: HashMap<u32, _> = HashMap::default();
442
443            {
444                let guard = self.table_id_to_table_option.read();
445                for table_id in &member_table_ids {
446                    if let Some(opts) = guard.get(table_id) {
447                        table_id_to_option.insert(*table_id, *opts);
448                    }
449                }
450            }
451
452            while let Some(compact_task) = compact_status.get_compact_task(
453                version
454                    .latest_version()
455                    .get_compaction_group_levels(compaction_group_id),
456                version
457                    .latest_version()
458                    .state_table_info
459                    .compaction_group_member_table_ids(compaction_group_id),
460                task_id as HummockCompactionTaskId,
461                &group_config,
462                &mut stats,
463                selector,
464                &table_id_to_option,
465                developer_config.clone(),
466                &version.latest_version().table_watermarks,
467                &version.latest_version().state_table_info,
468            ) {
469                let target_level_id = compact_task.input.target_level as u32;
470                let compaction_group_version_id = version
471                    .latest_version()
472                    .get_compaction_group_levels(compaction_group_id)
473                    .compaction_group_version_id;
474                let compression_algorithm = match compact_task.compression_algorithm.as_str() {
475                    "Lz4" => 1,
476                    "Zstd" => 2,
477                    _ => 0,
478                };
479                let vnode_partition_count = compact_task.input.vnode_partition_count;
480                let mut compact_task = CompactTask {
481                    input_ssts: compact_task.input.input_levels,
482                    splits: vec![KeyRange::inf()],
483                    sorted_output_ssts: vec![],
484                    task_id,
485                    target_level: target_level_id,
486                    // only gc delete keys in last level because there may be older version in more bottom
487                    // level.
488                    gc_delete_keys: version
489                        .latest_version()
490                        .get_compaction_group_levels(compaction_group_id)
491                        .is_last_level(target_level_id),
492                    base_level: compact_task.base_level as u32,
493                    task_status: TaskStatus::Pending,
494                    compaction_group_id: group_config.group_id,
495                    compaction_group_version_id,
496                    existing_table_ids: member_table_ids.clone(),
497                    compression_algorithm,
498                    target_file_size: compact_task.target_file_size,
499                    table_options: table_id_to_option
500                        .iter()
501                        .map(|(table_id, table_option)| {
502                            (*table_id, TableOption::from(table_option))
503                        })
504                        .collect(),
505                    current_epoch_time: Epoch::now().0,
506                    compaction_filter_mask: group_config.compaction_config.compaction_filter_mask,
507                    target_sub_level_id: compact_task.input.target_sub_level_id,
508                    task_type: compact_task.compaction_task_type,
509                    split_weight_by_vnode: vnode_partition_count,
510                    max_sub_compaction: group_config.compaction_config.max_sub_compaction,
511                    ..Default::default()
512                };
513
514                let is_trivial_reclaim = compact_task.is_trivial_reclaim();
515                let is_trivial_move = compact_task.is_trivial_move_task();
516                if is_trivial_reclaim || (is_trivial_move && can_trivial_move) {
517                    let log_label = if is_trivial_reclaim {
518                        "TrivialReclaim"
519                    } else {
520                        "TrivialMove"
521                    };
522                    let label = if is_trivial_reclaim {
523                        "trivial-space-reclaim"
524                    } else {
525                        "trivial-move"
526                    };
527
528                    tracing::debug!(
529                        "{} for compaction group {}: input: {:?}, cost time: {:?}",
530                        log_label,
531                        compact_task.compaction_group_id,
532                        compact_task.input_ssts,
533                        start_time.elapsed()
534                    );
535                    compact_task.task_status = TaskStatus::Success;
536                    compact_status.report_compact_task(&compact_task);
537                    if !is_trivial_reclaim {
538                        compact_task
539                            .sorted_output_ssts
540                            .clone_from(&compact_task.input_ssts[0].table_infos);
541                    }
542                    update_table_stats_for_vnode_watermark_trivial_reclaim(
543                        &mut version_stats.table_stats,
544                        &compact_task,
545                    );
546                    self.metrics
547                        .compact_frequency
548                        .with_label_values(&[
549                            label,
550                            &compact_task.compaction_group_id.to_string(),
551                            selector.task_type().as_str_name(),
552                            "SUCCESS",
553                        ])
554                        .inc();
555
556                    version.apply_compact_task(&compact_task);
557                    trivial_tasks.push(compact_task);
558                    if trivial_tasks.len() >= self.env.opts.max_trivial_move_task_count_per_loop {
559                        break 'outside;
560                    }
561                } else {
562                    self.calculate_vnode_partition(
563                        &mut compact_task,
564                        group_config.compaction_config.as_ref(),
565                    );
566                    let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks) = version
567                        .latest_version()
568                        .safe_epoch_table_watermarks(&compact_task.existing_table_ids)
569                        .into_iter()
570                        .partition(|(_table_id, table_watermarke)| {
571                            matches!(
572                                table_watermarke.watermark_type,
573                                WatermarkSerdeType::PkPrefix
574                            )
575                        });
576
577                    compact_task.pk_prefix_table_watermarks = pk_prefix_table_watermarks;
578                    compact_task.non_pk_prefix_table_watermarks = non_pk_prefix_table_watermarks;
579
580                    compact_task.table_schemas = compact_task
581                        .existing_table_ids
582                        .iter()
583                        .filter_map(|table_id| {
584                            let id = (*table_id).try_into().unwrap();
585                            all_versioned_table_schemas.get(&id).map(|column_ids| {
586                                (
587                                    *table_id,
588                                    TableSchema {
589                                        column_ids: column_ids.clone(),
590                                    },
591                                )
592                            })
593                        })
594                        .collect();
595
596                    compact_task_assignment.insert(
597                        compact_task.task_id,
598                        CompactTaskAssignment {
599                            compact_task: Some(compact_task.clone().into()),
600                            context_id: META_NODE_ID, // deprecated
601                        },
602                    );
603
604                    pick_tasks.push(compact_task);
605                    break;
606                }
607
608                stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
609                stats = LocalSelectorStatistic::default();
610            }
611            if pick_tasks
612                .last()
613                .map(|task| task.compaction_group_id != compaction_group_id)
614                .unwrap_or(true)
615            {
616                unschedule_groups.push(compaction_group_id);
617            }
618            stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
619        }
620
621        if !trivial_tasks.is_empty() {
622            commit_multi_var!(
623                self.meta_store_ref(),
624                compaction_statuses,
625                compact_task_assignment,
626                version,
627                version_stats
628            )?;
629            self.metrics
630                .compact_task_batch_count
631                .with_label_values(&["batch_trivial_move"])
632                .observe(trivial_tasks.len() as f64);
633
634            for trivial_task in &trivial_tasks {
635                self.metrics
636                    .compact_task_trivial_move_sst_count
637                    .with_label_values(&[&trivial_task.compaction_group_id.to_string()])
638                    .observe(trivial_task.input_ssts[0].table_infos.len() as _);
639            }
640
641            drop(versioning_guard);
642        } else {
643            // We are using a single transaction to ensure that each task has progress when it is
644            // created.
645            drop(versioning_guard);
646            commit_multi_var!(
647                self.meta_store_ref(),
648                compaction_statuses,
649                compact_task_assignment
650            )?;
651        }
652        drop(compaction_guard);
653        if !pick_tasks.is_empty() {
654            self.metrics
655                .compact_task_batch_count
656                .with_label_values(&["batch_get_compact_task"])
657                .observe(pick_tasks.len() as f64);
658        }
659
660        for compact_task in &mut pick_tasks {
661            let compaction_group_id = compact_task.compaction_group_id;
662
663            // Initiate heartbeat for the task to track its progress.
664            self.compactor_manager
665                .initiate_task_heartbeat(compact_task.clone());
666
667            // this task has been finished.
668            compact_task.task_status = TaskStatus::Pending;
669            let compact_task_statistics = statistics_compact_task(compact_task);
670
671            let level_type_label = build_compact_task_level_type_metrics_label(
672                compact_task.input_ssts[0].level_idx as usize,
673                compact_task.input_ssts.last().unwrap().level_idx as usize,
674            );
675
676            let level_count = compact_task.input_ssts.len();
677            if compact_task.input_ssts[0].level_idx == 0 {
678                self.metrics
679                    .l0_compact_level_count
680                    .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
681                    .observe(level_count as _);
682            }
683
684            self.metrics
685                .compact_task_size
686                .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
687                .observe(compact_task_statistics.total_file_size as _);
688
689            self.metrics
690                .compact_task_size
691                .with_label_values(&[
692                    &compaction_group_id.to_string(),
693                    &format!("{} uncompressed", level_type_label),
694                ])
695                .observe(compact_task_statistics.total_uncompressed_file_size as _);
696
697            self.metrics
698                .compact_task_file_count
699                .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
700                .observe(compact_task_statistics.total_file_count as _);
701
702            tracing::trace!(
703                "For compaction group {}: pick up {} {} sub_level in level {} to compact to target {}. cost time: {:?} compact_task_statistics {:?}",
704                compaction_group_id,
705                level_count,
706                compact_task.input_ssts[0].level_type.as_str_name(),
707                compact_task.input_ssts[0].level_idx,
708                compact_task.target_level,
709                start_time.elapsed(),
710                compact_task_statistics
711            );
712        }
713
714        #[cfg(test)]
715        {
716            self.check_state_consistency().await;
717        }
718        pick_tasks.extend(trivial_tasks);
719        Ok((pick_tasks, unschedule_groups))
720    }
721
722    /// Cancels a compaction task no matter it's assigned or unassigned.
723    pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
724        fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore(
725            anyhow::anyhow!("failpoint metastore err")
726        )));
727        let ret = self
728            .cancel_compact_task_impl(vec![task_id], task_status)
729            .await?;
730        Ok(ret[0])
731    }
732
733    pub async fn cancel_compact_tasks(
734        &self,
735        tasks: Vec<u64>,
736        task_status: TaskStatus,
737    ) -> Result<Vec<bool>> {
738        self.cancel_compact_task_impl(tasks, task_status).await
739    }
740
741    async fn cancel_compact_task_impl(
742        &self,
743        task_ids: Vec<u64>,
744        task_status: TaskStatus,
745    ) -> Result<Vec<bool>> {
746        assert!(CANCEL_STATUS_SET.contains(&task_status));
747        let tasks = task_ids
748            .into_iter()
749            .map(|task_id| ReportTask {
750                task_id,
751                task_status,
752                sorted_output_ssts: vec![],
753                table_stats_change: HashMap::default(),
754                object_timestamps: HashMap::default(),
755            })
756            .collect_vec();
757        let rets = self.report_compact_tasks(tasks).await?;
758        #[cfg(test)]
759        {
760            self.check_state_consistency().await;
761        }
762        Ok(rets)
763    }
764
765    async fn get_compact_tasks(
766        &self,
767        mut compaction_groups: Vec<CompactionGroupId>,
768        max_select_count: usize,
769        selector: &mut Box<dyn CompactionSelector>,
770    ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
771        fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
772            anyhow::anyhow!("failpoint metastore error")
773        )));
774        compaction_groups.shuffle(&mut thread_rng());
775        let (mut tasks, groups) = self
776            .get_compact_tasks_impl(compaction_groups, max_select_count, selector)
777            .await?;
778        tasks.retain(|task| {
779            if task.task_status == TaskStatus::Success {
780                debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
781                false
782            } else {
783                true
784            }
785        });
786        Ok((tasks, groups))
787    }
788
789    pub async fn get_compact_task(
790        &self,
791        compaction_group_id: CompactionGroupId,
792        selector: &mut Box<dyn CompactionSelector>,
793    ) -> Result<Option<CompactTask>> {
794        fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
795            anyhow::anyhow!("failpoint metastore error")
796        )));
797
798        let (normal_tasks, _) = self
799            .get_compact_tasks_impl(vec![compaction_group_id], 1, selector)
800            .await?;
801        for task in normal_tasks {
802            if task.task_status != TaskStatus::Success {
803                return Ok(Some(task));
804            }
805            debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
806        }
807        Ok(None)
808    }
809
810    pub async fn manual_get_compact_task(
811        &self,
812        compaction_group_id: CompactionGroupId,
813        manual_compaction_option: ManualCompactionOption,
814    ) -> Result<Option<CompactTask>> {
815        let mut selector: Box<dyn CompactionSelector> =
816            Box::new(ManualCompactionSelector::new(manual_compaction_option));
817        self.get_compact_task(compaction_group_id, &mut selector)
818            .await
819    }
820
821    pub async fn report_compact_task(
822        &self,
823        task_id: u64,
824        task_status: TaskStatus,
825        sorted_output_ssts: Vec<SstableInfo>,
826        table_stats_change: Option<PbTableStatsMap>,
827        object_timestamps: HashMap<HummockSstableObjectId, u64>,
828    ) -> Result<bool> {
829        let rets = self
830            .report_compact_tasks(vec![ReportTask {
831                task_id,
832                task_status,
833                sorted_output_ssts,
834                table_stats_change: table_stats_change.unwrap_or_default(),
835                object_timestamps,
836            }])
837            .await?;
838        Ok(rets[0])
839    }
840
841    pub async fn report_compact_tasks(&self, report_tasks: Vec<ReportTask>) -> Result<Vec<bool>> {
842        let compaction_guard = self.compaction.write().await;
843        let versioning_guard = self.versioning.write().await;
844
845        self.report_compact_tasks_impl(report_tasks, compaction_guard, versioning_guard)
846            .await
847    }
848
849    /// Finishes or cancels a compaction task, according to `task_status`.
850    ///
851    /// If `context_id` is not None, its validity will be checked when writing meta store.
852    /// Its ownership of the task is checked as well.
853    ///
854    /// Return Ok(false) indicates either the task is not found,
855    /// or the task is not owned by `context_id` when `context_id` is not None.
856    pub async fn report_compact_tasks_impl(
857        &self,
858        report_tasks: Vec<ReportTask>,
859        mut compaction_guard: RwLockWriteGuard<'_, Compaction>,
860        mut versioning_guard: RwLockWriteGuard<'_, Versioning>,
861    ) -> Result<Vec<bool>> {
862        let deterministic_mode = self.env.opts.compaction_deterministic_test;
863        let compaction: &mut Compaction = &mut compaction_guard;
864        let start_time = Instant::now();
865        let original_keys = compaction.compaction_statuses.keys().cloned().collect_vec();
866        let mut compact_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
867        let mut rets = vec![false; report_tasks.len()];
868        let mut compact_task_assignment =
869            BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
870        // The compaction task is finished.
871        let versioning: &mut Versioning = &mut versioning_guard;
872        let _timer = start_measure_real_process_timer!(self, "report_compact_tasks");
873
874        // purge stale compact_status
875        for group_id in original_keys {
876            if !versioning.current_version.levels.contains_key(&group_id) {
877                compact_statuses.remove(group_id);
878            }
879        }
880        let mut tasks = vec![];
881
882        let mut version = HummockVersionTransaction::new(
883            &mut versioning.current_version,
884            &mut versioning.hummock_version_deltas,
885            self.env.notification_manager(),
886            None,
887            &self.metrics,
888        );
889
890        if deterministic_mode {
891            version.disable_apply_to_txn();
892        }
893
894        let mut version_stats = HummockVersionStatsTransaction::new(
895            &mut versioning.version_stats,
896            self.env.notification_manager(),
897        );
898        let mut success_count = 0;
899        for (idx, task) in report_tasks.into_iter().enumerate() {
900            rets[idx] = true;
901            let mut compact_task = match compact_task_assignment.remove(task.task_id) {
902                Some(compact_task) => CompactTask::from(compact_task.compact_task.unwrap()),
903                None => {
904                    tracing::warn!("{}", format!("compact task {} not found", task.task_id));
905                    rets[idx] = false;
906                    continue;
907                }
908            };
909
910            {
911                // apply result
912                compact_task.task_status = task.task_status;
913                compact_task.sorted_output_ssts = task.sorted_output_ssts;
914            }
915
916            match compact_statuses.get_mut(compact_task.compaction_group_id) {
917                Some(mut compact_status) => {
918                    compact_status.report_compact_task(&compact_task);
919                }
920                None => {
921                    // When the group_id is not found in the compaction_statuses, it means the group has been removed.
922                    // The task is invalid and should be canceled.
923                    // e.g.
924                    // 1. The group is removed by the user unregistering the tables
925                    // 2. The group is removed by the group scheduling algorithm
926                    compact_task.task_status = TaskStatus::InvalidGroupCanceled;
927                }
928            }
929
930            let is_success = if let TaskStatus::Success = compact_task.task_status {
931                match self
932                    .report_compaction_sanity_check(&task.object_timestamps)
933                    .await
934                {
935                    Err(e) => {
936                        warn!(
937                            "failed to commit compaction task {} {}",
938                            compact_task.task_id,
939                            e.as_report()
940                        );
941                        compact_task.task_status = TaskStatus::RetentionTimeRejected;
942                        false
943                    }
944                    _ => {
945                        let group = version
946                            .latest_version()
947                            .levels
948                            .get(&compact_task.compaction_group_id)
949                            .unwrap();
950                        let is_expired = compact_task.is_expired(group.compaction_group_version_id);
951                        if is_expired {
952                            compact_task.task_status = TaskStatus::InputOutdatedCanceled;
953                            warn!(
954                                "The task may be expired because of group split, task:\n {:?}",
955                                compact_task_to_string(&compact_task)
956                            );
957                        }
958                        !is_expired
959                    }
960                }
961            } else {
962                false
963            };
964            if is_success {
965                success_count += 1;
966                version.apply_compact_task(&compact_task);
967                if purge_prost_table_stats(&mut version_stats.table_stats, version.latest_version())
968                {
969                    self.metrics.version_stats.reset();
970                    versioning.local_metrics.clear();
971                }
972                add_prost_table_stats_map(&mut version_stats.table_stats, &task.table_stats_change);
973                trigger_local_table_stat(
974                    &self.metrics,
975                    &mut versioning.local_metrics,
976                    &version_stats,
977                    &task.table_stats_change,
978                );
979            }
980            tasks.push(compact_task);
981        }
982        if success_count > 0 {
983            commit_multi_var!(
984                self.meta_store_ref(),
985                compact_statuses,
986                compact_task_assignment,
987                version,
988                version_stats
989            )?;
990
991            self.metrics
992                .compact_task_batch_count
993                .with_label_values(&["batch_report_task"])
994                .observe(success_count as f64);
995        } else {
996            // The compaction task is cancelled or failed.
997            commit_multi_var!(
998                self.meta_store_ref(),
999                compact_statuses,
1000                compact_task_assignment
1001            )?;
1002        }
1003
1004        let mut success_groups = vec![];
1005        for compact_task in &tasks {
1006            self.compactor_manager
1007                .remove_task_heartbeat(compact_task.task_id);
1008            tracing::trace!(
1009                "Reported compaction task. {}. cost time: {:?}",
1010                compact_task_to_string(compact_task),
1011                start_time.elapsed(),
1012            );
1013
1014            if !deterministic_mode
1015                && (matches!(compact_task.task_type, compact_task::TaskType::Dynamic)
1016                    || matches!(compact_task.task_type, compact_task::TaskType::Emergency))
1017            {
1018                // only try send Dynamic compaction
1019                self.try_send_compaction_request(
1020                    compact_task.compaction_group_id,
1021                    compact_task::TaskType::Dynamic,
1022                );
1023            }
1024
1025            if compact_task.task_status == TaskStatus::Success {
1026                success_groups.push(compact_task.compaction_group_id);
1027            }
1028        }
1029
1030        trigger_compact_tasks_stat(
1031            &self.metrics,
1032            &tasks,
1033            &compaction.compaction_statuses,
1034            &versioning_guard.current_version,
1035        );
1036        drop(versioning_guard);
1037        if !success_groups.is_empty() {
1038            self.try_update_write_limits(&success_groups).await;
1039        }
1040        Ok(rets)
1041    }
1042
1043    /// Triggers compacitons to specified compaction groups.
1044    /// Don't wait for compaction finish
1045    pub async fn trigger_compaction_deterministic(
1046        &self,
1047        _base_version_id: HummockVersionId,
1048        compaction_groups: Vec<CompactionGroupId>,
1049    ) -> Result<()> {
1050        self.on_current_version(|old_version| {
1051            tracing::info!(
1052                "Trigger compaction for version {}, groups {:?}",
1053                old_version.id,
1054                compaction_groups
1055            );
1056        })
1057        .await;
1058
1059        if compaction_groups.is_empty() {
1060            return Ok(());
1061        }
1062        for compaction_group in compaction_groups {
1063            self.try_send_compaction_request(compaction_group, compact_task::TaskType::Dynamic);
1064        }
1065        Ok(())
1066    }
1067
1068    pub async fn trigger_manual_compaction(
1069        &self,
1070        compaction_group: CompactionGroupId,
1071        manual_compaction_option: ManualCompactionOption,
1072    ) -> Result<()> {
1073        let start_time = Instant::now();
1074
1075        // 1. Get idle compactor.
1076        let compactor = match self.compactor_manager.next_compactor() {
1077            Some(compactor) => compactor,
1078            None => {
1079                tracing::warn!("trigger_manual_compaction No compactor is available.");
1080                return Err(anyhow::anyhow!(
1081                    "trigger_manual_compaction No compactor is available. compaction_group {}",
1082                    compaction_group
1083                )
1084                .into());
1085            }
1086        };
1087
1088        // 2. Get manual compaction task.
1089        let compact_task = self
1090            .manual_get_compact_task(compaction_group, manual_compaction_option)
1091            .await;
1092        let compact_task = match compact_task {
1093            Ok(Some(compact_task)) => compact_task,
1094            Ok(None) => {
1095                // No compaction task available.
1096                return Err(anyhow::anyhow!(
1097                    "trigger_manual_compaction No compaction_task is available. compaction_group {}",
1098                    compaction_group
1099                )
1100                    .into());
1101            }
1102            Err(err) => {
1103                tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
1104
1105                return Err(anyhow::anyhow!(err)
1106                    .context(format!(
1107                        "Failed to get compaction task for compaction_group {}",
1108                        compaction_group,
1109                    ))
1110                    .into());
1111            }
1112        };
1113
1114        // 3. send task to compactor
1115        let compact_task_string = compact_task_to_string(&compact_task);
1116        // TODO: shall we need to cancel on meta ?
1117        compactor
1118            .send_event(ResponseEvent::CompactTask(compact_task.into()))
1119            .with_context(|| {
1120                format!(
1121                    "Failed to trigger compaction task for compaction_group {}",
1122                    compaction_group,
1123                )
1124            })?;
1125
1126        tracing::info!(
1127            "Trigger manual compaction task. {}. cost time: {:?}",
1128            &compact_task_string,
1129            start_time.elapsed(),
1130        );
1131
1132        Ok(())
1133    }
1134
1135    /// Sends a compaction request.
1136    pub fn try_send_compaction_request(
1137        &self,
1138        compaction_group: CompactionGroupId,
1139        task_type: compact_task::TaskType,
1140    ) -> bool {
1141        match self
1142            .compaction_state
1143            .try_sched_compaction(compaction_group, task_type)
1144        {
1145            Ok(_) => true,
1146            Err(e) => {
1147                tracing::error!(
1148                    error = %e.as_report(),
1149                    "failed to send compaction request for compaction group {}",
1150                    compaction_group,
1151                );
1152                false
1153            }
1154        }
1155    }
1156
1157    pub(crate) fn calculate_vnode_partition(
1158        &self,
1159        compact_task: &mut CompactTask,
1160        compaction_config: &CompactionConfig,
1161    ) {
1162        // do not split sst by vnode partition when target_level > base_level
1163        // The purpose of data alignment is mainly to improve the parallelism of base level compaction and reduce write amplification.
1164        // However, at high level, the size of the sst file is often larger and only contains the data of a single table_id, so there is no need to cut it.
1165        if compact_task.target_level > compact_task.base_level {
1166            return;
1167        }
1168        if compaction_config.split_weight_by_vnode > 0 {
1169            for table_id in &compact_task.existing_table_ids {
1170                compact_task
1171                    .table_vnode_partition
1172                    .insert(*table_id, compact_task.split_weight_by_vnode);
1173            }
1174        } else {
1175            let mut table_size_info: HashMap<u32, u64> = HashMap::default();
1176            let mut existing_table_ids: HashSet<u32> = HashSet::default();
1177            for input_ssts in &compact_task.input_ssts {
1178                for sst in &input_ssts.table_infos {
1179                    existing_table_ids.extend(sst.table_ids.iter());
1180                    for table_id in &sst.table_ids {
1181                        *table_size_info.entry(*table_id).or_default() +=
1182                            sst.sst_size / (sst.table_ids.len() as u64);
1183                    }
1184                }
1185            }
1186            compact_task
1187                .existing_table_ids
1188                .retain(|table_id| existing_table_ids.contains(table_id));
1189
1190            let hybrid_vnode_count = self.env.opts.hybrid_partition_node_count;
1191            let default_partition_count = self.env.opts.partition_vnode_count;
1192            // We must ensure the partition threshold large enough to avoid too many small files.
1193            let compact_task_table_size_partition_threshold_low = self
1194                .env
1195                .opts
1196                .compact_task_table_size_partition_threshold_low;
1197            let compact_task_table_size_partition_threshold_high = self
1198                .env
1199                .opts
1200                .compact_task_table_size_partition_threshold_high;
1201            // check latest write throughput
1202            let table_write_throughput_statistic_manager =
1203                self.table_write_throughput_statistic_manager.read();
1204            let timestamp = chrono::Utc::now().timestamp();
1205            for (table_id, compact_table_size) in table_size_info {
1206                let write_throughput = table_write_throughput_statistic_manager
1207                    .get_table_throughput_descending(table_id, timestamp)
1208                    .peekable()
1209                    .peek()
1210                    .map(|item| item.throughput)
1211                    .unwrap_or(0);
1212                if compact_table_size > compact_task_table_size_partition_threshold_high
1213                    && default_partition_count > 0
1214                {
1215                    compact_task
1216                        .table_vnode_partition
1217                        .insert(table_id, default_partition_count);
1218                } else if (compact_table_size > compact_task_table_size_partition_threshold_low
1219                    || (write_throughput > self.env.opts.table_high_write_throughput_threshold
1220                        && compact_table_size > compaction_config.target_file_size_base))
1221                    && hybrid_vnode_count > 0
1222                {
1223                    // partition for large write throughput table. But we also need to make sure that it can not be too small.
1224                    compact_task
1225                        .table_vnode_partition
1226                        .insert(table_id, hybrid_vnode_count);
1227                } else if compact_table_size > compaction_config.target_file_size_base {
1228                    // partition for small table
1229                    compact_task.table_vnode_partition.insert(table_id, 1);
1230                }
1231            }
1232            compact_task
1233                .table_vnode_partition
1234                .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id));
1235        }
1236    }
1237
1238    pub fn compactor_manager_ref(&self) -> crate::hummock::CompactorManagerRef {
1239        self.compactor_manager.clone()
1240    }
1241}
1242
1243#[cfg(any(test, feature = "test"))]
1244impl HummockManager {
1245    pub async fn compaction_task_from_assignment_for_test(
1246        &self,
1247        task_id: u64,
1248    ) -> Option<CompactTaskAssignment> {
1249        let compaction_guard = self.compaction.read().await;
1250        let assignment_ref = &compaction_guard.compact_task_assignment;
1251        assignment_ref.get(&task_id).cloned()
1252    }
1253
1254    pub async fn report_compact_task_for_test(
1255        &self,
1256        task_id: u64,
1257        compact_task: Option<CompactTask>,
1258        task_status: TaskStatus,
1259        sorted_output_ssts: Vec<SstableInfo>,
1260        table_stats_change: Option<PbTableStatsMap>,
1261    ) -> Result<()> {
1262        if let Some(task) = compact_task {
1263            let mut guard = self.compaction.write().await;
1264            guard.compact_task_assignment.insert(
1265                task_id,
1266                CompactTaskAssignment {
1267                    compact_task: Some(task.into()),
1268                    context_id: 0,
1269                },
1270            );
1271        }
1272
1273        // In the test, the contents of the compact task may have been modified directly, while the contents of compact_task_assignment were not modified.
1274        // So we pass the modified compact_task directly into the `report_compact_task_impl`
1275        self.report_compact_tasks(vec![ReportTask {
1276            task_id,
1277            task_status,
1278            sorted_output_ssts,
1279            table_stats_change: table_stats_change.unwrap_or_default(),
1280            object_timestamps: HashMap::default(),
1281        }])
1282        .await?;
1283        Ok(())
1284    }
1285}
1286
1287#[derive(Debug, Default)]
1288pub struct CompactionState {
1289    scheduled: Mutex<HashSet<(CompactionGroupId, compact_task::TaskType)>>,
1290}
1291
1292impl CompactionState {
1293    pub fn new() -> Self {
1294        Self {
1295            scheduled: Default::default(),
1296        }
1297    }
1298
1299    /// Enqueues only if the target is not yet in queue.
1300    pub fn try_sched_compaction(
1301        &self,
1302        compaction_group: CompactionGroupId,
1303        task_type: TaskType,
1304    ) -> std::result::Result<bool, SendError<CompactionRequestChannelItem>> {
1305        let mut guard = self.scheduled.lock();
1306        let key = (compaction_group, task_type);
1307        if guard.contains(&key) {
1308            return Ok(false);
1309        }
1310        guard.insert(key);
1311        Ok(true)
1312    }
1313
1314    pub fn unschedule(
1315        &self,
1316        compaction_group: CompactionGroupId,
1317        task_type: compact_task::TaskType,
1318    ) {
1319        self.scheduled.lock().remove(&(compaction_group, task_type));
1320    }
1321
1322    pub fn auto_pick_type(&self, group: CompactionGroupId) -> Option<TaskType> {
1323        let guard = self.scheduled.lock();
1324        if guard.contains(&(group, compact_task::TaskType::Dynamic)) {
1325            Some(compact_task::TaskType::Dynamic)
1326        } else if guard.contains(&(group, compact_task::TaskType::SpaceReclaim)) {
1327            Some(compact_task::TaskType::SpaceReclaim)
1328        } else if guard.contains(&(group, compact_task::TaskType::Ttl)) {
1329            Some(compact_task::TaskType::Ttl)
1330        } else if guard.contains(&(group, compact_task::TaskType::Tombstone)) {
1331            Some(compact_task::TaskType::Tombstone)
1332        } else if guard.contains(&(group, compact_task::TaskType::VnodeWatermark)) {
1333            Some(compact_task::TaskType::VnodeWatermark)
1334        } else {
1335            None
1336        }
1337    }
1338}
1339
1340impl Compaction {
1341    pub fn get_compact_task_assignments_by_group_id(
1342        &self,
1343        compaction_group_id: CompactionGroupId,
1344    ) -> Vec<CompactTaskAssignment> {
1345        self.compact_task_assignment
1346            .iter()
1347            .filter_map(|(_, assignment)| {
1348                if assignment
1349                    .compact_task
1350                    .as_ref()
1351                    .is_some_and(|task| task.compaction_group_id == compaction_group_id)
1352                {
1353                    Some(CompactTaskAssignment {
1354                        compact_task: assignment.compact_task.clone(),
1355                        context_id: assignment.context_id,
1356                    })
1357                } else {
1358                    None
1359                }
1360            })
1361            .collect()
1362    }
1363}
1364
1365#[derive(Clone, Default)]
1366pub struct CompactionGroupStatistic {
1367    pub group_id: CompactionGroupId,
1368    pub group_size: u64,
1369    pub table_statistic: BTreeMap<StateTableId, u64>,
1370    pub compaction_group_config: CompactionGroup,
1371}
1372
1373/// Updates table stats caused by vnode watermark trivial reclaim compaction.
1374fn update_table_stats_for_vnode_watermark_trivial_reclaim(
1375    table_stats: &mut PbTableStatsMap,
1376    task: &CompactTask,
1377) {
1378    if task.task_type != TaskType::VnodeWatermark {
1379        return;
1380    }
1381    let mut deleted_table_keys: HashMap<u32, u64> = HashMap::default();
1382    for s in task.input_ssts.iter().flat_map(|l| l.table_infos.iter()) {
1383        assert_eq!(s.table_ids.len(), 1);
1384        let e = deleted_table_keys.entry(s.table_ids[0]).or_insert(0);
1385        *e += s.total_key_count;
1386    }
1387    for (table_id, delete_count) in deleted_table_keys {
1388        let Some(stats) = table_stats.get_mut(&table_id) else {
1389            continue;
1390        };
1391        if stats.total_key_count == 0 {
1392            continue;
1393        }
1394        let new_total_key_count = stats.total_key_count.saturating_sub(delete_count as i64);
1395        let ratio = new_total_key_count as f64 / stats.total_key_count as f64;
1396        // total_key_count is updated accurately.
1397        stats.total_key_count = new_total_key_count;
1398        // others are updated approximately.
1399        stats.total_key_size = (stats.total_key_size as f64 * ratio).ceil() as i64;
1400        stats.total_value_size = (stats.total_value_size as f64 * ratio).ceil() as i64;
1401    }
1402}
1403
1404#[derive(Debug, Clone)]
1405pub enum GroupState {
1406    /// The compaction group is not in emergency state.
1407    Normal,
1408
1409    /// The compaction group is in emergency state.
1410    Emergency(String), // reason
1411
1412    /// The compaction group is in write stop state.
1413    WriteStop(String), // reason
1414}
1415
1416impl GroupState {
1417    pub fn is_write_stop(&self) -> bool {
1418        matches!(self, Self::WriteStop(_))
1419    }
1420
1421    pub fn is_emergency(&self) -> bool {
1422        matches!(self, Self::Emergency(_))
1423    }
1424
1425    pub fn reason(&self) -> Option<&str> {
1426        match self {
1427            Self::Emergency(reason) | Self::WriteStop(reason) => Some(reason),
1428            _ => None,
1429        }
1430    }
1431}
1432
1433#[derive(Clone, Default)]
1434pub struct GroupStateValidator;
1435
1436impl GroupStateValidator {
1437    pub fn write_stop_sub_level_count(
1438        level_count: usize,
1439        compaction_config: &CompactionConfig,
1440    ) -> bool {
1441        let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize;
1442        level_count > threshold
1443    }
1444
1445    pub fn write_stop_l0_size(l0_size: u64, compaction_config: &CompactionConfig) -> bool {
1446        l0_size
1447            > compaction_config
1448                .level0_stop_write_threshold_max_size
1449                .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1450    }
1451
1452    pub fn write_stop_l0_file_count(
1453        l0_file_count: usize,
1454        compaction_config: &CompactionConfig,
1455    ) -> bool {
1456        l0_file_count
1457            > compaction_config
1458                .level0_stop_write_threshold_max_sst_count
1459                .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1460                as usize
1461    }
1462
1463    pub fn emergency_l0_file_count(
1464        l0_file_count: usize,
1465        compaction_config: &CompactionConfig,
1466    ) -> bool {
1467        l0_file_count
1468            > compaction_config
1469                .emergency_level0_sst_file_count
1470                .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1471                as usize
1472    }
1473
1474    pub fn emergency_l0_partition_count(
1475        last_l0_sub_level_partition_count: usize,
1476        compaction_config: &CompactionConfig,
1477    ) -> bool {
1478        last_l0_sub_level_partition_count
1479            > compaction_config
1480                .emergency_level0_sub_level_partition
1481                .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1482                as usize
1483    }
1484
1485    pub fn check_single_group_write_stop(
1486        levels: &Levels,
1487        compaction_config: &CompactionConfig,
1488    ) -> GroupState {
1489        if Self::write_stop_sub_level_count(levels.l0.sub_levels.len(), compaction_config) {
1490            return GroupState::WriteStop(format!(
1491                "WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels",
1492                levels.l0.sub_levels.len(),
1493                compaction_config.level0_stop_write_threshold_sub_level_number
1494            ));
1495        }
1496
1497        if Self::write_stop_l0_file_count(
1498            levels
1499                .l0
1500                .sub_levels
1501                .iter()
1502                .map(|l| l.table_infos.len())
1503                .sum(),
1504            compaction_config,
1505        ) {
1506            return GroupState::WriteStop(format!(
1507                "WriteStop(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1508                levels
1509                    .l0
1510                    .sub_levels
1511                    .iter()
1512                    .map(|l| l.table_infos.len())
1513                    .sum::<usize>(),
1514                compaction_config
1515                    .level0_stop_write_threshold_max_sst_count
1516                    .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1517            ));
1518        }
1519
1520        if Self::write_stop_l0_size(levels.l0.total_file_size, compaction_config) {
1521            return GroupState::WriteStop(format!(
1522                "WriteStop(l0_size: {}, threshold: {}) too large L0 size",
1523                levels.l0.total_file_size,
1524                compaction_config
1525                    .level0_stop_write_threshold_max_size
1526                    .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1527            ));
1528        }
1529
1530        GroupState::Normal
1531    }
1532
1533    pub fn check_single_group_emergency(
1534        levels: &Levels,
1535        compaction_config: &CompactionConfig,
1536    ) -> GroupState {
1537        if Self::emergency_l0_file_count(
1538            levels
1539                .l0
1540                .sub_levels
1541                .iter()
1542                .map(|l| l.table_infos.len())
1543                .sum(),
1544            compaction_config,
1545        ) {
1546            return GroupState::Emergency(format!(
1547                "Emergency(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1548                levels
1549                    .l0
1550                    .sub_levels
1551                    .iter()
1552                    .map(|l| l.table_infos.len())
1553                    .sum::<usize>(),
1554                compaction_config
1555                    .emergency_level0_sst_file_count
1556                    .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1557            ));
1558        }
1559
1560        if Self::emergency_l0_partition_count(
1561            levels
1562                .l0
1563                .sub_levels
1564                .first()
1565                .map(|l| l.table_infos.len())
1566                .unwrap_or(0),
1567            compaction_config,
1568        ) {
1569            return GroupState::Emergency(format!(
1570                "Emergency(l0_partition_count: {}, threshold: {}) too many L0 partitions",
1571                levels
1572                    .l0
1573                    .sub_levels
1574                    .first()
1575                    .map(|l| l.table_infos.len())
1576                    .unwrap_or(0),
1577                compaction_config
1578                    .emergency_level0_sub_level_partition
1579                    .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1580            ));
1581        }
1582
1583        GroupState::Normal
1584    }
1585
1586    pub fn group_state(levels: &Levels, compaction_config: &CompactionConfig) -> GroupState {
1587        let state = Self::check_single_group_write_stop(levels, compaction_config);
1588        if state.is_write_stop() {
1589            return state;
1590        }
1591
1592        Self::check_single_group_emergency(levels, compaction_config)
1593    }
1594}