risingwave_meta/hummock/
metrics_utils.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17use std::sync::atomic::Ordering;
18use std::time::{SystemTime, UNIX_EPOCH};
19
20use itertools::{Itertools, enumerate};
21use prometheus::IntGauge;
22use prometheus::core::{AtomicU64, Collector, GenericCounter, MetricVec, MetricVecBuilder};
23use risingwave_common::id::{JobId, TableId};
24use risingwave_hummock_sdk::change_log::TableChangeLog;
25use risingwave_hummock_sdk::compact_task::CompactTask;
26use risingwave_hummock_sdk::compaction_group::hummock_version_ext::version_object_size_map;
27use risingwave_hummock_sdk::level::Levels;
28use risingwave_hummock_sdk::table_stats::PbTableStatsMap;
29use risingwave_hummock_sdk::version::HummockVersion;
30use risingwave_hummock_sdk::{
31    CompactionGroupId, HummockContextId, HummockObjectId, HummockVersionId,
32};
33use risingwave_pb::hummock::write_limits::WriteLimit;
34use risingwave_pb::hummock::{
35    CompactionConfig, HummockPinnedVersion, HummockVersionStats, LevelType,
36};
37
38use super::compaction::selector::DynamicLevelSelectorCore;
39use super::compaction::{CompactionDeveloperConfig, get_compression_algorithm};
40use crate::hummock::checkpoint::HummockVersionCheckpoint;
41use crate::hummock::compaction::CompactStatus;
42use crate::rpc::metrics::MetaMetrics;
43
44pub struct LocalTableMetrics {
45    total_key_count: IntGauge,
46    total_key_size: IntGauge,
47    total_value_size: IntGauge,
48    write_throughput: GenericCounter<AtomicU64>,
49    cal_count: usize,
50    write_size: u64,
51}
52
53const MIN_FLUSH_COUNT: usize = 16;
54const MIN_FLUSH_DATA_SIZE: u64 = 128 * 1024 * 1024;
55const COMPACTION_GROUP_LABEL_NAME: &str = "group";
56
57impl LocalTableMetrics {
58    pub fn inc_write_throughput(&mut self, val: u64) {
59        self.write_size += val;
60        self.cal_count += 1;
61        if self.write_size > MIN_FLUSH_DATA_SIZE || self.cal_count > MIN_FLUSH_COUNT {
62            self.write_throughput.inc_by(self.write_size / 1024 / 1024);
63            self.write_size = 0;
64            self.cal_count = 0;
65        }
66    }
67}
68
69pub fn get_or_create_local_table_stat<'a>(
70    metrics: &MetaMetrics,
71    table_id: TableId,
72    local_metrics: &'a mut HashMap<TableId, LocalTableMetrics>,
73) -> &'a mut LocalTableMetrics {
74    local_metrics.entry(table_id).or_insert_with(|| {
75        let table_label = format!("{}", table_id);
76        LocalTableMetrics {
77            total_key_count: metrics
78                .version_stats
79                .with_label_values(&[table_label.as_str(), "total_key_count"]),
80            total_key_size: metrics
81                .version_stats
82                .with_label_values(&[table_label.as_str(), "total_key_size"]),
83            total_value_size: metrics
84                .version_stats
85                .with_label_values(&[table_label.as_str(), "total_value_size"]),
86            write_throughput: metrics
87                .table_write_throughput
88                .with_label_values(&[&table_label]),
89            cal_count: 0,
90            write_size: 0,
91        }
92    })
93}
94
95pub fn trigger_local_table_stat(
96    metrics: &MetaMetrics,
97    local_metrics: &mut HashMap<TableId, LocalTableMetrics>,
98    version_stats: &HummockVersionStats,
99    table_stats_change: &PbTableStatsMap,
100) {
101    for (table_id, stats) in table_stats_change {
102        if stats.total_key_size == 0 && stats.total_value_size == 0 && stats.total_key_count == 0 {
103            continue;
104        }
105        let table_metrics = get_or_create_local_table_stat(metrics, *table_id, local_metrics);
106        if let Some(table_stats) = version_stats.table_stats.get(table_id) {
107            table_metrics
108                .total_key_count
109                .set(table_stats.total_key_count);
110            table_metrics
111                .total_value_size
112                .set(table_stats.total_value_size);
113            table_metrics.total_key_size.set(table_stats.total_key_size);
114        }
115    }
116}
117
118pub fn trigger_mv_stat(
119    metrics: &MetaMetrics,
120    version_stats: &HummockVersionStats,
121    mv_id_to_all_table_ids: Vec<(JobId, Vec<TableId>)>,
122) {
123    metrics.materialized_view_stats.reset();
124    for (mv_id, all_table_ids) in mv_id_to_all_table_ids {
125        let total_size = all_table_ids
126            .iter()
127            .filter_map(|&table_id| version_stats.table_stats.get(&table_id))
128            .map(|stats| stats.total_key_size + stats.total_value_size)
129            .sum();
130
131        metrics
132            .materialized_view_stats
133            .with_label_values(&[mv_id.to_string().as_str(), "materialized_view_total_size"])
134            .set(total_size);
135    }
136}
137
138pub fn trigger_sst_stat(
139    metrics: &MetaMetrics,
140    compact_status: Option<&CompactStatus>,
141    current_version: &HummockVersion,
142    compaction_group_id: CompactionGroupId,
143) {
144    let level_sst_cnt = |level_idx: usize| {
145        let mut sst_num = 0;
146        current_version.level_iter(compaction_group_id, |level| {
147            if level.level_idx == level_idx as u32 {
148                sst_num += level.table_infos.len();
149            }
150            true
151        });
152        sst_num
153    };
154    let level_sst_size = |level_idx: usize| {
155        let mut level_sst_size = 0;
156        current_version.level_iter(compaction_group_id, |level| {
157            if level.level_idx == level_idx as u32 {
158                level_sst_size += level.total_file_size;
159            }
160            true
161        });
162        level_sst_size / 1024
163    };
164
165    let mut compacting_task_stat: BTreeMap<(usize, usize), usize> = BTreeMap::default();
166    for idx in 0..current_version.num_levels(compaction_group_id) {
167        let sst_num = level_sst_cnt(idx);
168        let level_label = build_level_metrics_label(compaction_group_id, idx);
169        metrics
170            .level_sst_num
171            .with_label_values(&[&level_label])
172            .set(sst_num as i64);
173        metrics
174            .level_file_size
175            .with_label_values(&[&level_label])
176            .set(level_sst_size(idx) as i64);
177        if let Some(compact_status) = compact_status {
178            let compact_cnt = compact_status.level_handlers[idx].pending_file_count();
179            metrics
180                .level_compact_cnt
181                .with_label_values(&[&level_label])
182                .set(compact_cnt as i64);
183
184            let compacting_task = compact_status.level_handlers[idx].pending_tasks();
185            let mut pending_task_ids: HashSet<u64> = HashSet::default();
186            for task in compacting_task {
187                if pending_task_ids.contains(&task.task_id) {
188                    continue;
189                }
190
191                if idx != 0 && idx == task.target_level as usize {
192                    continue;
193                }
194
195                let key = (idx, task.target_level as usize);
196                let count = compacting_task_stat.entry(key).or_insert(0);
197                *count += 1;
198
199                pending_task_ids.insert(task.task_id);
200            }
201        }
202    }
203
204    tracing::debug!("LSM Compacting STAT {:?}", compacting_task_stat);
205    for ((select, target), compacting_task_count) in &compacting_task_stat {
206        let label_str =
207            build_compact_task_stat_metrics_label(compaction_group_id, *select, *target);
208        metrics
209            .level_compact_task_cnt
210            .with_label_values(&[&label_str])
211            .set(*compacting_task_count as _);
212    }
213
214    if compacting_task_stat.is_empty()
215        && let Some(levels) = current_version.levels.get(&compaction_group_id)
216    {
217        let max_level = levels.levels.len();
218        remove_compacting_task_stat(metrics, compaction_group_id, max_level);
219    }
220
221    {
222        // sub level stat
223        let overlapping_level_label = L0SubLevelMetricKind::Overlapping.label(compaction_group_id);
224        let non_overlap_level_label =
225            L0SubLevelMetricKind::NonOverlapping.label(compaction_group_id);
226        let partition_level_label = L0SubLevelMetricKind::VnodePartition.label(compaction_group_id);
227
228        let overlapping_sst_num = current_version
229            .levels
230            .get(&compaction_group_id)
231            .map(|level| {
232                level
233                    .l0
234                    .sub_levels
235                    .iter()
236                    .filter(|sub_level| sub_level.level_type == LevelType::Overlapping)
237                    .count()
238            })
239            .unwrap_or(0);
240
241        let non_overlap_sst_num = current_version
242            .levels
243            .get(&compaction_group_id)
244            .map(|level| {
245                level
246                    .l0
247                    .sub_levels
248                    .iter()
249                    .filter(|sub_level| sub_level.level_type == LevelType::Nonoverlapping)
250                    .count()
251            })
252            .unwrap_or(0);
253
254        let partition_level_num = current_version
255            .levels
256            .get(&compaction_group_id)
257            .map(|level| {
258                level
259                    .l0
260                    .sub_levels
261                    .iter()
262                    .filter(|sub_level| {
263                        sub_level.level_type == LevelType::Nonoverlapping
264                            && sub_level.vnode_partition_count > 0
265                    })
266                    .count()
267            })
268            .unwrap_or(0);
269        metrics
270            .level_sst_num
271            .with_label_values(&[&overlapping_level_label])
272            .set(overlapping_sst_num as i64);
273
274        metrics
275            .level_sst_num
276            .with_label_values(&[&non_overlap_level_label])
277            .set(non_overlap_sst_num as i64);
278
279        metrics
280            .level_sst_num
281            .with_label_values(&[&partition_level_label])
282            .set(partition_level_num as i64);
283    }
284
285    let previous_time = metrics.time_after_last_observation.load(Ordering::Relaxed);
286    let current_time = SystemTime::now()
287        .duration_since(UNIX_EPOCH)
288        .unwrap()
289        .as_secs();
290    if current_time > 600 + previous_time
291        && metrics
292            .time_after_last_observation
293            .compare_exchange(
294                previous_time,
295                current_time,
296                Ordering::Relaxed,
297                Ordering::Relaxed,
298            )
299            .is_ok()
300        && let Some(compact_status) = compact_status
301    {
302        for (idx, level_handler) in enumerate(compact_status.level_handlers.iter()) {
303            let sst_num = level_sst_cnt(idx);
304            let sst_size = level_sst_size(idx);
305            let compact_cnt = level_handler.pending_file_count();
306            tracing::info!(
307                "Level {} has {} SSTs, the total size of which is {}KB, while {} of those are being compacted to bottom levels",
308                idx,
309                sst_num,
310                sst_size,
311                compact_cnt,
312            );
313        }
314    }
315}
316
317pub fn trigger_epoch_stat(metrics: &MetaMetrics, version: &HummockVersion) {
318    metrics.max_committed_epoch.set(
319        version
320            .state_table_info
321            .info()
322            .values()
323            .map(|i| i.committed_epoch)
324            .max()
325            .unwrap_or(0) as _,
326    );
327    metrics.min_committed_epoch.set(
328        version
329            .state_table_info
330            .info()
331            .values()
332            .map(|i| i.committed_epoch)
333            .min()
334            .unwrap_or(0) as _,
335    );
336}
337
338pub fn remove_compaction_group_metrics(
339    metrics: &MetaMetrics,
340    compaction_group_id: CompactionGroupId,
341    max_level: usize,
342) {
343    let group_label = compaction_group_id.to_string();
344    let mut idx = 0;
345    loop {
346        let level_label = build_level_metrics_label(compaction_group_id, idx);
347        let should_continue = metrics
348            .level_sst_num
349            .remove_label_values(&[&level_label])
350            .is_ok();
351        metrics
352            .level_file_size
353            .remove_label_values(&[&level_label])
354            .ok();
355        metrics
356            .level_compact_cnt
357            .remove_label_values(&[&level_label])
358            .ok();
359        if !should_continue {
360            break;
361        }
362        idx += 1;
363    }
364
365    for kind in L0SubLevelMetricKind::ALL {
366        let level_label = kind.label(compaction_group_id);
367        metrics
368            .level_sst_num
369            .remove_label_values(&[&level_label])
370            .ok();
371    }
372
373    remove_compacting_task_stat(metrics, compaction_group_id, max_level);
374    remove_compact_skip_frequency(metrics, compaction_group_id, max_level);
375    remove_lsm_stat(metrics, &group_label);
376    remove_split_stat(metrics, compaction_group_id);
377    remove_compact_task_metrics(metrics, &group_label);
378    remove_compaction_group_stat(metrics, compaction_group_id);
379}
380
381fn remove_metric_series_with_label<T>(
382    metric_vec: &MetricVec<T>,
383    target_label_name: &str,
384    target_label_value: &str,
385) where
386    T: MetricVecBuilder,
387{
388    for metric_family in metric_vec.collect() {
389        for metric in metric_family.get_metric() {
390            let labels = metric.get_label();
391            if labels.iter().any(|label| {
392                label.name() == target_label_name && label.value() == target_label_value
393            }) {
394                let labels: HashMap<_, _> = labels
395                    .iter()
396                    .map(|label| (label.name(), label.value()))
397                    .collect();
398                metric_vec.remove(&labels).ok();
399            }
400        }
401    }
402}
403
404fn remove_lsm_stat(metrics: &MetaMetrics, group_label: &str) {
405    metrics
406        .write_stop_compaction_groups
407        .remove_label_values(&[group_label])
408        .ok();
409    metrics
410        .compact_pending_bytes
411        .remove_label_values(&[group_label])
412        .ok();
413
414    remove_metric_series_with_label(
415        &metrics.compact_frequency,
416        COMPACTION_GROUP_LABEL_NAME,
417        group_label,
418    );
419    remove_metric_series_with_label(
420        &metrics.compact_level_compression_ratio,
421        COMPACTION_GROUP_LABEL_NAME,
422        group_label,
423    );
424}
425
426fn remove_compact_skip_frequency(
427    metrics: &MetaMetrics,
428    compaction_group_id: CompactionGroupId,
429    max_level: usize,
430) {
431    for start_level in 0..=max_level {
432        for target_level in 0..=max_level {
433            let level_label = format!(
434                "cg{}-{}-to-{}",
435                compaction_group_id, start_level, target_level
436            );
437            for skip_type in [
438                "write-amp",
439                "count",
440                "pending-files",
441                "overlapping",
442                "picker",
443            ] {
444                metrics
445                    .compact_skip_frequency
446                    .remove_label_values(&[level_label.as_str(), skip_type])
447                    .ok();
448            }
449        }
450    }
451}
452
453pub fn remove_compacting_task_stat(
454    metrics: &MetaMetrics,
455    compaction_group_id: CompactionGroupId,
456    max_level: usize,
457) {
458    for select_level in 0..=max_level {
459        for target_level in 0..=max_level {
460            let label_str = build_compact_task_stat_metrics_label(
461                compaction_group_id,
462                select_level,
463                target_level,
464            );
465            metrics
466                .level_compact_task_cnt
467                .remove_label_values(&[&label_str])
468                .ok();
469        }
470    }
471}
472
473pub fn remove_split_stat(metrics: &MetaMetrics, compaction_group_id: CompactionGroupId) {
474    let label_str = compaction_group_id.to_string();
475    metrics
476        .state_table_count
477        .remove_label_values(&[&label_str])
478        .ok();
479
480    metrics
481        .branched_sst_count
482        .remove_label_values(&[&label_str])
483        .ok();
484}
485
486pub fn remove_compaction_group_stat(metrics: &MetaMetrics, compaction_group_id: CompactionGroupId) {
487    let label_str = compaction_group_id.to_string();
488    metrics
489        .compaction_group_size
490        .remove_label_values(&[&label_str])
491        .ok();
492    metrics
493        .compaction_group_file_count
494        .remove_label_values(&[&label_str])
495        .ok();
496    metrics
497        .compaction_group_throughput
498        .remove_label_values(&[&label_str])
499        .ok();
500
501    metrics
502        .split_compaction_group_count
503        .remove_label_values(&[&label_str])
504        .ok();
505
506    metrics
507        .merge_compaction_group_count
508        .remove_label_values(&[&label_str])
509        .ok();
510}
511
512pub fn trigger_pin_unpin_version_state(
513    metrics: &MetaMetrics,
514    pinned_versions: &BTreeMap<HummockContextId, HummockPinnedVersion>,
515) {
516    if let Some(m) = pinned_versions.values().map(|v| v.min_pinned_id).min() {
517        metrics.min_pinned_version_id.set(m.as_i64_id());
518    } else {
519        metrics.min_pinned_version_id.set(u64::MAX as _);
520    }
521}
522
523pub fn trigger_gc_stat(
524    metrics: &MetaMetrics,
525    checkpoint: &HummockVersionCheckpoint,
526    min_pinned_version_id: HummockVersionId,
527    current_table_change_log: &HashMap<TableId, TableChangeLog>,
528) {
529    let mut current_version_object_size_map: HashMap<_, _> =
530        version_object_size_map(&checkpoint.version);
531    // Note: Because table change logs do not support MVCC, their objects are tracked
532    // exclusively in current_version_object, rather than in old_version_object or stale_object.
533    current_version_object_size_map.extend(current_table_change_log.values().flat_map(
534        |change_log| {
535            change_log.iter().flat_map(|s| {
536                s.change_log_ssts()
537                    .map(|s| (HummockObjectId::Sstable(s.object_id), s.file_size))
538            })
539        },
540    ));
541    let current_version_object_size = current_version_object_size_map.values().sum::<u64>();
542    let current_version_object_count = current_version_object_size_map.len();
543    let mut old_version_object_size = 0;
544    let mut old_version_object_count = 0;
545    let mut stale_object_size = 0;
546    let mut stale_object_count = 0;
547    checkpoint.stale_objects.iter().for_each(|(id, objects)| {
548        if *id <= min_pinned_version_id {
549            stale_object_size += objects.total_file_size;
550            stale_object_count += objects.id.len() as u64;
551        } else {
552            old_version_object_size += objects.total_file_size;
553            old_version_object_count += objects.id.len() as u64;
554        }
555    });
556    metrics
557        .current_version_object_size
558        .set(current_version_object_size as _);
559    metrics
560        .current_version_object_count
561        .set(current_version_object_count as _);
562    metrics
563        .old_version_object_size
564        .set(old_version_object_size as _);
565    metrics
566        .old_version_object_count
567        .set(old_version_object_count as _);
568    metrics.stale_object_size.set(stale_object_size as _);
569    metrics.stale_object_count.set(stale_object_count as _);
570    // table change log
571    for (table_id, logs) in current_table_change_log {
572        let table_id_label = table_id.to_string();
573        let labels = [table_id_label.as_str()];
574        let object_count = logs
575            .iter()
576            .map(|l| l.old_value.len() + l.new_value.len())
577            .sum::<usize>();
578        let object_size = logs
579            .iter()
580            .map(|l| {
581                l.old_value
582                    .iter()
583                    .chain(l.new_value.iter())
584                    .map(|s| s.file_size as usize)
585                    .sum::<usize>()
586            })
587            .sum::<usize>();
588        metrics
589            .table_change_log_object_count
590            .with_label_values(&labels)
591            .set(object_count as _);
592        metrics
593            .table_change_log_object_size
594            .with_label_values(&labels)
595            .set(object_size as _);
596        let min_epoch = logs.epochs().min().unwrap_or_default();
597        metrics
598            .table_change_log_min_epoch
599            .with_label_values(&labels)
600            .set(min_epoch as _);
601    }
602}
603
604// Triggers a report on compact_pending_bytes_needed
605pub fn trigger_lsm_stat(
606    metrics: &MetaMetrics,
607    compaction_config: Arc<CompactionConfig>,
608    levels: &Levels,
609    compaction_group_id: CompactionGroupId,
610) {
611    let group_label = compaction_group_id.to_string();
612    // compact_pending_bytes
613    // we don't actually generate a compaction task here so developer config can be ignored.
614    let dynamic_level_core = DynamicLevelSelectorCore::new(
615        compaction_config.clone(),
616        Arc::new(CompactionDeveloperConfig::default()),
617    );
618    let ctx = dynamic_level_core.calculate_level_base_size(levels);
619    {
620        let compact_pending_bytes_needed =
621            dynamic_level_core.compact_pending_bytes_needed_with_ctx(levels, &ctx);
622
623        metrics
624            .compact_pending_bytes
625            .with_label_values(&[&group_label])
626            .set(compact_pending_bytes_needed as _);
627    }
628
629    {
630        // compact_level_compression_ratio
631        let level_compression_ratio = levels
632            .levels
633            .iter()
634            .map(|level| {
635                let ratio = if level.uncompressed_file_size == 0 {
636                    0.0
637                } else {
638                    level.total_file_size as f64 / level.uncompressed_file_size as f64
639                };
640
641                (level.level_idx, ratio)
642            })
643            .collect_vec();
644
645        for (level_index, compression_ratio) in level_compression_ratio {
646            let compression_algorithm_label = get_compression_algorithm(
647                compaction_config.as_ref(),
648                ctx.base_level,
649                level_index as usize,
650            );
651
652            metrics
653                .compact_level_compression_ratio
654                .with_label_values(&[
655                    &group_label,
656                    &level_index.to_string(),
657                    &compression_algorithm_label,
658                ])
659                .set(compression_ratio);
660        }
661    }
662}
663
664pub fn trigger_write_stop_stats(
665    metrics: &MetaMetrics,
666    write_limit: &HashMap<CompactionGroupId, WriteLimit>,
667) {
668    metrics.write_stop_compaction_groups.reset();
669    for cg in write_limit.keys() {
670        metrics
671            .write_stop_compaction_groups
672            .with_label_values(&[&cg.to_string()])
673            .set(1);
674    }
675}
676
677pub fn trigger_split_stat(metrics: &MetaMetrics, version: &HummockVersion) {
678    let branched_ssts = version.build_branched_sst_info();
679
680    for compaction_group_id in version.levels.keys() {
681        let group_label = compaction_group_id.to_string();
682        metrics
683            .state_table_count
684            .with_label_values(&[&group_label])
685            .set(
686                version
687                    .state_table_info
688                    .compaction_group_member_table_ids(*compaction_group_id)
689                    .len() as _,
690            );
691
692        let branched_sst_count: usize = branched_ssts
693            .values()
694            .map(|branched_map| {
695                branched_map
696                    .keys()
697                    .filter(|group_id| *group_id == compaction_group_id)
698                    .count()
699            })
700            .sum();
701
702        metrics
703            .branched_sst_count
704            .with_label_values(&[&group_label])
705            .set(branched_sst_count as _);
706    }
707}
708
709fn build_level_metrics_label(compaction_group_id: CompactionGroupId, level_idx: usize) -> String {
710    format!("cg{}_L{}", compaction_group_id, level_idx)
711}
712
713fn build_compact_task_stat_metrics_label(
714    compaction_group_id: CompactionGroupId,
715    select_level: usize,
716    target_level: usize,
717) -> String {
718    format!(
719        "cg{} L{} -> L{}",
720        compaction_group_id, select_level, target_level
721    )
722}
723
724#[derive(Clone, Copy)]
725enum L0SubLevelMetricKind {
726    Overlapping,
727    NonOverlapping,
728    VnodePartition,
729}
730
731impl L0SubLevelMetricKind {
732    const ALL: [Self; 3] = [
733        Self::Overlapping,
734        Self::NonOverlapping,
735        Self::VnodePartition,
736    ];
737
738    fn label(self, compaction_group_id: CompactionGroupId) -> String {
739        let suffix = match self {
740            Self::Overlapping => "l0_sub_overlapping",
741            Self::NonOverlapping => "l0_sub_non_overlap",
742            Self::VnodePartition => "l0_sub_partition",
743        };
744        format!("cg{}_{}", compaction_group_id, suffix)
745    }
746}
747
748pub fn build_compact_task_level_type_metrics_label(
749    select_level: usize,
750    target_level: usize,
751) -> String {
752    format!("L{}->L{}", select_level, target_level)
753}
754
755fn remove_compact_task_metrics(metrics: &MetaMetrics, group_label: &str) {
756    remove_metric_series_with_label(
757        &metrics.l0_compact_level_count,
758        COMPACTION_GROUP_LABEL_NAME,
759        group_label,
760    );
761    remove_metric_series_with_label(
762        &metrics.compact_task_size,
763        COMPACTION_GROUP_LABEL_NAME,
764        group_label,
765    );
766    remove_metric_series_with_label(
767        &metrics.compact_task_file_count,
768        COMPACTION_GROUP_LABEL_NAME,
769        group_label,
770    );
771    remove_metric_series_with_label(
772        &metrics.compact_task_trivial_move_sst_count,
773        COMPACTION_GROUP_LABEL_NAME,
774        group_label,
775    );
776}
777
778pub fn trigger_compact_tasks_stat(
779    metrics: &MetaMetrics,
780    compact_tasks: &[CompactTask],
781    compact_status: &BTreeMap<CompactionGroupId, CompactStatus>,
782    current_version: &HummockVersion,
783) {
784    let mut task_status_label_map = HashMap::new();
785    let mut task_type_label_map = HashMap::new();
786    let mut group_label_map = HashMap::new();
787
788    for task in compact_tasks {
789        let task_status_label = task_status_label_map
790            .entry(task.task_status)
791            .or_insert_with(|| task.task_status.as_str_name().to_owned());
792
793        let task_type_label = task_type_label_map
794            .entry(task.task_type)
795            .or_insert_with(|| task.task_type.as_str_name().to_owned());
796
797        let group_label = group_label_map
798            .entry(task.compaction_group_id)
799            .or_insert_with(|| task.compaction_group_id.to_string());
800
801        metrics
802            .compact_frequency
803            .with_label_values(&["normal", group_label, task_type_label, task_status_label])
804            .inc();
805    }
806
807    group_label_map.keys().for_each(|group_id| {
808        trigger_sst_stat(
809            metrics,
810            compact_status.get(group_id),
811            current_version,
812            *group_id,
813        );
814    });
815}