1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17use std::sync::atomic::Ordering;
18use std::time::{SystemTime, UNIX_EPOCH};
19
20use itertools::{Itertools, enumerate};
21use prometheus::IntGauge;
22use prometheus::core::{AtomicU64, Collector, GenericCounter, MetricVec, MetricVecBuilder};
23use risingwave_common::id::{JobId, TableId};
24use risingwave_hummock_sdk::change_log::TableChangeLog;
25use risingwave_hummock_sdk::compact_task::CompactTask;
26use risingwave_hummock_sdk::compaction_group::hummock_version_ext::version_object_size_map;
27use risingwave_hummock_sdk::level::Levels;
28use risingwave_hummock_sdk::table_stats::PbTableStatsMap;
29use risingwave_hummock_sdk::version::HummockVersion;
30use risingwave_hummock_sdk::{
31 CompactionGroupId, HummockContextId, HummockObjectId, HummockVersionId,
32};
33use risingwave_pb::hummock::write_limits::WriteLimit;
34use risingwave_pb::hummock::{
35 CompactionConfig, HummockPinnedVersion, HummockVersionStats, LevelType,
36};
37
38use super::compaction::selector::DynamicLevelSelectorCore;
39use super::compaction::{CompactionDeveloperConfig, get_compression_algorithm};
40use crate::hummock::checkpoint::HummockVersionCheckpoint;
41use crate::hummock::compaction::CompactStatus;
42use crate::rpc::metrics::MetaMetrics;
43
44pub struct LocalTableMetrics {
45 total_key_count: IntGauge,
46 total_key_size: IntGauge,
47 total_value_size: IntGauge,
48 write_throughput: GenericCounter<AtomicU64>,
49 cal_count: usize,
50 write_size: u64,
51}
52
53const MIN_FLUSH_COUNT: usize = 16;
54const MIN_FLUSH_DATA_SIZE: u64 = 128 * 1024 * 1024;
55const COMPACTION_GROUP_LABEL_NAME: &str = "group";
56
57impl LocalTableMetrics {
58 pub fn inc_write_throughput(&mut self, val: u64) {
59 self.write_size += val;
60 self.cal_count += 1;
61 if self.write_size > MIN_FLUSH_DATA_SIZE || self.cal_count > MIN_FLUSH_COUNT {
62 self.write_throughput.inc_by(self.write_size / 1024 / 1024);
63 self.write_size = 0;
64 self.cal_count = 0;
65 }
66 }
67}
68
69pub fn get_or_create_local_table_stat<'a>(
70 metrics: &MetaMetrics,
71 table_id: TableId,
72 local_metrics: &'a mut HashMap<TableId, LocalTableMetrics>,
73) -> &'a mut LocalTableMetrics {
74 local_metrics.entry(table_id).or_insert_with(|| {
75 let table_label = format!("{}", table_id);
76 LocalTableMetrics {
77 total_key_count: metrics
78 .version_stats
79 .with_label_values(&[table_label.as_str(), "total_key_count"]),
80 total_key_size: metrics
81 .version_stats
82 .with_label_values(&[table_label.as_str(), "total_key_size"]),
83 total_value_size: metrics
84 .version_stats
85 .with_label_values(&[table_label.as_str(), "total_value_size"]),
86 write_throughput: metrics
87 .table_write_throughput
88 .with_label_values(&[&table_label]),
89 cal_count: 0,
90 write_size: 0,
91 }
92 })
93}
94
95pub fn trigger_local_table_stat(
96 metrics: &MetaMetrics,
97 local_metrics: &mut HashMap<TableId, LocalTableMetrics>,
98 version_stats: &HummockVersionStats,
99 table_stats_change: &PbTableStatsMap,
100) {
101 for (table_id, stats) in table_stats_change {
102 if stats.total_key_size == 0 && stats.total_value_size == 0 && stats.total_key_count == 0 {
103 continue;
104 }
105 let table_metrics = get_or_create_local_table_stat(metrics, *table_id, local_metrics);
106 if let Some(table_stats) = version_stats.table_stats.get(table_id) {
107 table_metrics
108 .total_key_count
109 .set(table_stats.total_key_count);
110 table_metrics
111 .total_value_size
112 .set(table_stats.total_value_size);
113 table_metrics.total_key_size.set(table_stats.total_key_size);
114 }
115 }
116}
117
118pub fn trigger_mv_stat(
119 metrics: &MetaMetrics,
120 version_stats: &HummockVersionStats,
121 mv_id_to_all_table_ids: Vec<(JobId, Vec<TableId>)>,
122) {
123 metrics.materialized_view_stats.reset();
124 for (mv_id, all_table_ids) in mv_id_to_all_table_ids {
125 let total_size = all_table_ids
126 .iter()
127 .filter_map(|&table_id| version_stats.table_stats.get(&table_id))
128 .map(|stats| stats.total_key_size + stats.total_value_size)
129 .sum();
130
131 metrics
132 .materialized_view_stats
133 .with_label_values(&[mv_id.to_string().as_str(), "materialized_view_total_size"])
134 .set(total_size);
135 }
136}
137
138pub fn trigger_sst_stat(
139 metrics: &MetaMetrics,
140 compact_status: Option<&CompactStatus>,
141 current_version: &HummockVersion,
142 compaction_group_id: CompactionGroupId,
143) {
144 let level_sst_cnt = |level_idx: usize| {
145 let mut sst_num = 0;
146 current_version.level_iter(compaction_group_id, |level| {
147 if level.level_idx == level_idx as u32 {
148 sst_num += level.table_infos.len();
149 }
150 true
151 });
152 sst_num
153 };
154 let level_sst_size = |level_idx: usize| {
155 let mut level_sst_size = 0;
156 current_version.level_iter(compaction_group_id, |level| {
157 if level.level_idx == level_idx as u32 {
158 level_sst_size += level.total_file_size;
159 }
160 true
161 });
162 level_sst_size / 1024
163 };
164
165 let mut compacting_task_stat: BTreeMap<(usize, usize), usize> = BTreeMap::default();
166 for idx in 0..current_version.num_levels(compaction_group_id) {
167 let sst_num = level_sst_cnt(idx);
168 let level_label = build_level_metrics_label(compaction_group_id, idx);
169 metrics
170 .level_sst_num
171 .with_label_values(&[&level_label])
172 .set(sst_num as i64);
173 metrics
174 .level_file_size
175 .with_label_values(&[&level_label])
176 .set(level_sst_size(idx) as i64);
177 if let Some(compact_status) = compact_status {
178 let compact_cnt = compact_status.level_handlers[idx].pending_file_count();
179 metrics
180 .level_compact_cnt
181 .with_label_values(&[&level_label])
182 .set(compact_cnt as i64);
183
184 let compacting_task = compact_status.level_handlers[idx].pending_tasks();
185 let mut pending_task_ids: HashSet<u64> = HashSet::default();
186 for task in compacting_task {
187 if pending_task_ids.contains(&task.task_id) {
188 continue;
189 }
190
191 if idx != 0 && idx == task.target_level as usize {
192 continue;
193 }
194
195 let key = (idx, task.target_level as usize);
196 let count = compacting_task_stat.entry(key).or_insert(0);
197 *count += 1;
198
199 pending_task_ids.insert(task.task_id);
200 }
201 }
202 }
203
204 tracing::debug!("LSM Compacting STAT {:?}", compacting_task_stat);
205 for ((select, target), compacting_task_count) in &compacting_task_stat {
206 let label_str =
207 build_compact_task_stat_metrics_label(compaction_group_id, *select, *target);
208 metrics
209 .level_compact_task_cnt
210 .with_label_values(&[&label_str])
211 .set(*compacting_task_count as _);
212 }
213
214 if compacting_task_stat.is_empty()
215 && let Some(levels) = current_version.levels.get(&compaction_group_id)
216 {
217 let max_level = levels.levels.len();
218 remove_compacting_task_stat(metrics, compaction_group_id, max_level);
219 }
220
221 {
222 let overlapping_level_label = L0SubLevelMetricKind::Overlapping.label(compaction_group_id);
224 let non_overlap_level_label =
225 L0SubLevelMetricKind::NonOverlapping.label(compaction_group_id);
226 let partition_level_label = L0SubLevelMetricKind::VnodePartition.label(compaction_group_id);
227
228 let overlapping_sst_num = current_version
229 .levels
230 .get(&compaction_group_id)
231 .map(|level| {
232 level
233 .l0
234 .sub_levels
235 .iter()
236 .filter(|sub_level| sub_level.level_type == LevelType::Overlapping)
237 .count()
238 })
239 .unwrap_or(0);
240
241 let non_overlap_sst_num = current_version
242 .levels
243 .get(&compaction_group_id)
244 .map(|level| {
245 level
246 .l0
247 .sub_levels
248 .iter()
249 .filter(|sub_level| sub_level.level_type == LevelType::Nonoverlapping)
250 .count()
251 })
252 .unwrap_or(0);
253
254 let partition_level_num = current_version
255 .levels
256 .get(&compaction_group_id)
257 .map(|level| {
258 level
259 .l0
260 .sub_levels
261 .iter()
262 .filter(|sub_level| {
263 sub_level.level_type == LevelType::Nonoverlapping
264 && sub_level.vnode_partition_count > 0
265 })
266 .count()
267 })
268 .unwrap_or(0);
269 metrics
270 .level_sst_num
271 .with_label_values(&[&overlapping_level_label])
272 .set(overlapping_sst_num as i64);
273
274 metrics
275 .level_sst_num
276 .with_label_values(&[&non_overlap_level_label])
277 .set(non_overlap_sst_num as i64);
278
279 metrics
280 .level_sst_num
281 .with_label_values(&[&partition_level_label])
282 .set(partition_level_num as i64);
283 }
284
285 let previous_time = metrics.time_after_last_observation.load(Ordering::Relaxed);
286 let current_time = SystemTime::now()
287 .duration_since(UNIX_EPOCH)
288 .unwrap()
289 .as_secs();
290 if current_time > 600 + previous_time
291 && metrics
292 .time_after_last_observation
293 .compare_exchange(
294 previous_time,
295 current_time,
296 Ordering::Relaxed,
297 Ordering::Relaxed,
298 )
299 .is_ok()
300 && let Some(compact_status) = compact_status
301 {
302 for (idx, level_handler) in enumerate(compact_status.level_handlers.iter()) {
303 let sst_num = level_sst_cnt(idx);
304 let sst_size = level_sst_size(idx);
305 let compact_cnt = level_handler.pending_file_count();
306 tracing::info!(
307 "Level {} has {} SSTs, the total size of which is {}KB, while {} of those are being compacted to bottom levels",
308 idx,
309 sst_num,
310 sst_size,
311 compact_cnt,
312 );
313 }
314 }
315}
316
317pub fn trigger_epoch_stat(metrics: &MetaMetrics, version: &HummockVersion) {
318 metrics.max_committed_epoch.set(
319 version
320 .state_table_info
321 .info()
322 .values()
323 .map(|i| i.committed_epoch)
324 .max()
325 .unwrap_or(0) as _,
326 );
327 metrics.min_committed_epoch.set(
328 version
329 .state_table_info
330 .info()
331 .values()
332 .map(|i| i.committed_epoch)
333 .min()
334 .unwrap_or(0) as _,
335 );
336}
337
338pub fn remove_compaction_group_metrics(
339 metrics: &MetaMetrics,
340 compaction_group_id: CompactionGroupId,
341 max_level: usize,
342) {
343 let group_label = compaction_group_id.to_string();
344 let mut idx = 0;
345 loop {
346 let level_label = build_level_metrics_label(compaction_group_id, idx);
347 let should_continue = metrics
348 .level_sst_num
349 .remove_label_values(&[&level_label])
350 .is_ok();
351 metrics
352 .level_file_size
353 .remove_label_values(&[&level_label])
354 .ok();
355 metrics
356 .level_compact_cnt
357 .remove_label_values(&[&level_label])
358 .ok();
359 if !should_continue {
360 break;
361 }
362 idx += 1;
363 }
364
365 for kind in L0SubLevelMetricKind::ALL {
366 let level_label = kind.label(compaction_group_id);
367 metrics
368 .level_sst_num
369 .remove_label_values(&[&level_label])
370 .ok();
371 }
372
373 remove_compacting_task_stat(metrics, compaction_group_id, max_level);
374 remove_compact_skip_frequency(metrics, compaction_group_id, max_level);
375 remove_lsm_stat(metrics, &group_label);
376 remove_split_stat(metrics, compaction_group_id);
377 remove_compact_task_metrics(metrics, &group_label);
378 remove_compaction_group_stat(metrics, compaction_group_id);
379}
380
381fn remove_metric_series_with_label<T>(
382 metric_vec: &MetricVec<T>,
383 target_label_name: &str,
384 target_label_value: &str,
385) where
386 T: MetricVecBuilder,
387{
388 for metric_family in metric_vec.collect() {
389 for metric in metric_family.get_metric() {
390 let labels = metric.get_label();
391 if labels.iter().any(|label| {
392 label.name() == target_label_name && label.value() == target_label_value
393 }) {
394 let labels: HashMap<_, _> = labels
395 .iter()
396 .map(|label| (label.name(), label.value()))
397 .collect();
398 metric_vec.remove(&labels).ok();
399 }
400 }
401 }
402}
403
404fn remove_lsm_stat(metrics: &MetaMetrics, group_label: &str) {
405 metrics
406 .write_stop_compaction_groups
407 .remove_label_values(&[group_label])
408 .ok();
409 metrics
410 .compact_pending_bytes
411 .remove_label_values(&[group_label])
412 .ok();
413
414 remove_metric_series_with_label(
415 &metrics.compact_frequency,
416 COMPACTION_GROUP_LABEL_NAME,
417 group_label,
418 );
419 remove_metric_series_with_label(
420 &metrics.compact_level_compression_ratio,
421 COMPACTION_GROUP_LABEL_NAME,
422 group_label,
423 );
424}
425
426fn remove_compact_skip_frequency(
427 metrics: &MetaMetrics,
428 compaction_group_id: CompactionGroupId,
429 max_level: usize,
430) {
431 for start_level in 0..=max_level {
432 for target_level in 0..=max_level {
433 let level_label = format!(
434 "cg{}-{}-to-{}",
435 compaction_group_id, start_level, target_level
436 );
437 for skip_type in [
438 "write-amp",
439 "count",
440 "pending-files",
441 "overlapping",
442 "picker",
443 ] {
444 metrics
445 .compact_skip_frequency
446 .remove_label_values(&[level_label.as_str(), skip_type])
447 .ok();
448 }
449 }
450 }
451}
452
453pub fn remove_compacting_task_stat(
454 metrics: &MetaMetrics,
455 compaction_group_id: CompactionGroupId,
456 max_level: usize,
457) {
458 for select_level in 0..=max_level {
459 for target_level in 0..=max_level {
460 let label_str = build_compact_task_stat_metrics_label(
461 compaction_group_id,
462 select_level,
463 target_level,
464 );
465 metrics
466 .level_compact_task_cnt
467 .remove_label_values(&[&label_str])
468 .ok();
469 }
470 }
471}
472
473pub fn remove_split_stat(metrics: &MetaMetrics, compaction_group_id: CompactionGroupId) {
474 let label_str = compaction_group_id.to_string();
475 metrics
476 .state_table_count
477 .remove_label_values(&[&label_str])
478 .ok();
479
480 metrics
481 .branched_sst_count
482 .remove_label_values(&[&label_str])
483 .ok();
484}
485
486pub fn remove_compaction_group_stat(metrics: &MetaMetrics, compaction_group_id: CompactionGroupId) {
487 let label_str = compaction_group_id.to_string();
488 metrics
489 .compaction_group_size
490 .remove_label_values(&[&label_str])
491 .ok();
492 metrics
493 .compaction_group_file_count
494 .remove_label_values(&[&label_str])
495 .ok();
496 metrics
497 .compaction_group_throughput
498 .remove_label_values(&[&label_str])
499 .ok();
500
501 metrics
502 .split_compaction_group_count
503 .remove_label_values(&[&label_str])
504 .ok();
505
506 metrics
507 .merge_compaction_group_count
508 .remove_label_values(&[&label_str])
509 .ok();
510}
511
512pub fn trigger_pin_unpin_version_state(
513 metrics: &MetaMetrics,
514 pinned_versions: &BTreeMap<HummockContextId, HummockPinnedVersion>,
515) {
516 if let Some(m) = pinned_versions.values().map(|v| v.min_pinned_id).min() {
517 metrics.min_pinned_version_id.set(m.as_i64_id());
518 } else {
519 metrics.min_pinned_version_id.set(u64::MAX as _);
520 }
521}
522
523pub fn trigger_gc_stat(
524 metrics: &MetaMetrics,
525 checkpoint: &HummockVersionCheckpoint,
526 min_pinned_version_id: HummockVersionId,
527 current_table_change_log: &HashMap<TableId, TableChangeLog>,
528) {
529 let mut current_version_object_size_map: HashMap<_, _> =
530 version_object_size_map(&checkpoint.version);
531 current_version_object_size_map.extend(current_table_change_log.values().flat_map(
534 |change_log| {
535 change_log.iter().flat_map(|s| {
536 s.change_log_ssts()
537 .map(|s| (HummockObjectId::Sstable(s.object_id), s.file_size))
538 })
539 },
540 ));
541 let current_version_object_size = current_version_object_size_map.values().sum::<u64>();
542 let current_version_object_count = current_version_object_size_map.len();
543 let mut old_version_object_size = 0;
544 let mut old_version_object_count = 0;
545 let mut stale_object_size = 0;
546 let mut stale_object_count = 0;
547 checkpoint.stale_objects.iter().for_each(|(id, objects)| {
548 if *id <= min_pinned_version_id {
549 stale_object_size += objects.total_file_size;
550 stale_object_count += objects.id.len() as u64;
551 } else {
552 old_version_object_size += objects.total_file_size;
553 old_version_object_count += objects.id.len() as u64;
554 }
555 });
556 metrics
557 .current_version_object_size
558 .set(current_version_object_size as _);
559 metrics
560 .current_version_object_count
561 .set(current_version_object_count as _);
562 metrics
563 .old_version_object_size
564 .set(old_version_object_size as _);
565 metrics
566 .old_version_object_count
567 .set(old_version_object_count as _);
568 metrics.stale_object_size.set(stale_object_size as _);
569 metrics.stale_object_count.set(stale_object_count as _);
570 for (table_id, logs) in current_table_change_log {
572 let table_id_label = table_id.to_string();
573 let labels = [table_id_label.as_str()];
574 let object_count = logs
575 .iter()
576 .map(|l| l.old_value.len() + l.new_value.len())
577 .sum::<usize>();
578 let object_size = logs
579 .iter()
580 .map(|l| {
581 l.old_value
582 .iter()
583 .chain(l.new_value.iter())
584 .map(|s| s.file_size as usize)
585 .sum::<usize>()
586 })
587 .sum::<usize>();
588 metrics
589 .table_change_log_object_count
590 .with_label_values(&labels)
591 .set(object_count as _);
592 metrics
593 .table_change_log_object_size
594 .with_label_values(&labels)
595 .set(object_size as _);
596 let min_epoch = logs.epochs().min().unwrap_or_default();
597 metrics
598 .table_change_log_min_epoch
599 .with_label_values(&labels)
600 .set(min_epoch as _);
601 }
602}
603
604pub fn trigger_lsm_stat(
606 metrics: &MetaMetrics,
607 compaction_config: Arc<CompactionConfig>,
608 levels: &Levels,
609 compaction_group_id: CompactionGroupId,
610) {
611 let group_label = compaction_group_id.to_string();
612 let dynamic_level_core = DynamicLevelSelectorCore::new(
615 compaction_config.clone(),
616 Arc::new(CompactionDeveloperConfig::default()),
617 );
618 let ctx = dynamic_level_core.calculate_level_base_size(levels);
619 {
620 let compact_pending_bytes_needed =
621 dynamic_level_core.compact_pending_bytes_needed_with_ctx(levels, &ctx);
622
623 metrics
624 .compact_pending_bytes
625 .with_label_values(&[&group_label])
626 .set(compact_pending_bytes_needed as _);
627 }
628
629 {
630 let level_compression_ratio = levels
632 .levels
633 .iter()
634 .map(|level| {
635 let ratio = if level.uncompressed_file_size == 0 {
636 0.0
637 } else {
638 level.total_file_size as f64 / level.uncompressed_file_size as f64
639 };
640
641 (level.level_idx, ratio)
642 })
643 .collect_vec();
644
645 for (level_index, compression_ratio) in level_compression_ratio {
646 let compression_algorithm_label = get_compression_algorithm(
647 compaction_config.as_ref(),
648 ctx.base_level,
649 level_index as usize,
650 );
651
652 metrics
653 .compact_level_compression_ratio
654 .with_label_values(&[
655 &group_label,
656 &level_index.to_string(),
657 &compression_algorithm_label,
658 ])
659 .set(compression_ratio);
660 }
661 }
662}
663
664pub fn trigger_write_stop_stats(
665 metrics: &MetaMetrics,
666 write_limit: &HashMap<CompactionGroupId, WriteLimit>,
667) {
668 metrics.write_stop_compaction_groups.reset();
669 for cg in write_limit.keys() {
670 metrics
671 .write_stop_compaction_groups
672 .with_label_values(&[&cg.to_string()])
673 .set(1);
674 }
675}
676
677pub fn trigger_split_stat(metrics: &MetaMetrics, version: &HummockVersion) {
678 let branched_ssts = version.build_branched_sst_info();
679
680 for compaction_group_id in version.levels.keys() {
681 let group_label = compaction_group_id.to_string();
682 metrics
683 .state_table_count
684 .with_label_values(&[&group_label])
685 .set(
686 version
687 .state_table_info
688 .compaction_group_member_table_ids(*compaction_group_id)
689 .len() as _,
690 );
691
692 let branched_sst_count: usize = branched_ssts
693 .values()
694 .map(|branched_map| {
695 branched_map
696 .keys()
697 .filter(|group_id| *group_id == compaction_group_id)
698 .count()
699 })
700 .sum();
701
702 metrics
703 .branched_sst_count
704 .with_label_values(&[&group_label])
705 .set(branched_sst_count as _);
706 }
707}
708
709fn build_level_metrics_label(compaction_group_id: CompactionGroupId, level_idx: usize) -> String {
710 format!("cg{}_L{}", compaction_group_id, level_idx)
711}
712
713fn build_compact_task_stat_metrics_label(
714 compaction_group_id: CompactionGroupId,
715 select_level: usize,
716 target_level: usize,
717) -> String {
718 format!(
719 "cg{} L{} -> L{}",
720 compaction_group_id, select_level, target_level
721 )
722}
723
724#[derive(Clone, Copy)]
725enum L0SubLevelMetricKind {
726 Overlapping,
727 NonOverlapping,
728 VnodePartition,
729}
730
731impl L0SubLevelMetricKind {
732 const ALL: [Self; 3] = [
733 Self::Overlapping,
734 Self::NonOverlapping,
735 Self::VnodePartition,
736 ];
737
738 fn label(self, compaction_group_id: CompactionGroupId) -> String {
739 let suffix = match self {
740 Self::Overlapping => "l0_sub_overlapping",
741 Self::NonOverlapping => "l0_sub_non_overlap",
742 Self::VnodePartition => "l0_sub_partition",
743 };
744 format!("cg{}_{}", compaction_group_id, suffix)
745 }
746}
747
748pub fn build_compact_task_level_type_metrics_label(
749 select_level: usize,
750 target_level: usize,
751) -> String {
752 format!("L{}->L{}", select_level, target_level)
753}
754
755fn remove_compact_task_metrics(metrics: &MetaMetrics, group_label: &str) {
756 remove_metric_series_with_label(
757 &metrics.l0_compact_level_count,
758 COMPACTION_GROUP_LABEL_NAME,
759 group_label,
760 );
761 remove_metric_series_with_label(
762 &metrics.compact_task_size,
763 COMPACTION_GROUP_LABEL_NAME,
764 group_label,
765 );
766 remove_metric_series_with_label(
767 &metrics.compact_task_file_count,
768 COMPACTION_GROUP_LABEL_NAME,
769 group_label,
770 );
771 remove_metric_series_with_label(
772 &metrics.compact_task_trivial_move_sst_count,
773 COMPACTION_GROUP_LABEL_NAME,
774 group_label,
775 );
776}
777
778pub fn trigger_compact_tasks_stat(
779 metrics: &MetaMetrics,
780 compact_tasks: &[CompactTask],
781 compact_status: &BTreeMap<CompactionGroupId, CompactStatus>,
782 current_version: &HummockVersion,
783) {
784 let mut task_status_label_map = HashMap::new();
785 let mut task_type_label_map = HashMap::new();
786 let mut group_label_map = HashMap::new();
787
788 for task in compact_tasks {
789 let task_status_label = task_status_label_map
790 .entry(task.task_status)
791 .or_insert_with(|| task.task_status.as_str_name().to_owned());
792
793 let task_type_label = task_type_label_map
794 .entry(task.task_type)
795 .or_insert_with(|| task.task_type.as_str_name().to_owned());
796
797 let group_label = group_label_map
798 .entry(task.compaction_group_id)
799 .or_insert_with(|| task.compaction_group_id.to_string());
800
801 metrics
802 .compact_frequency
803 .with_label_values(&["normal", group_label, task_type_label, task_status_label])
804 .inc();
805 }
806
807 group_label_map.keys().for_each(|group_id| {
808 trigger_sst_stat(
809 metrics,
810 compact_status.get(group_id),
811 current_version,
812 *group_id,
813 );
814 });
815}