risingwave_meta/hummock/
metrics_utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17use std::sync::atomic::Ordering;
18use std::time::{SystemTime, UNIX_EPOCH};
19
20use itertools::{Itertools, enumerate};
21use prometheus::IntGauge;
22use prometheus::core::{AtomicU64, GenericCounter};
23use risingwave_hummock_sdk::compact_task::CompactTask;
24use risingwave_hummock_sdk::compaction_group::hummock_version_ext::object_size_map;
25use risingwave_hummock_sdk::level::Levels;
26use risingwave_hummock_sdk::table_stats::PbTableStatsMap;
27use risingwave_hummock_sdk::version::HummockVersion;
28use risingwave_hummock_sdk::{CompactionGroupId, HummockContextId, HummockVersionId};
29use risingwave_pb::hummock::write_limits::WriteLimit;
30use risingwave_pb::hummock::{
31    CompactionConfig, HummockPinnedVersion, HummockVersionStats, LevelType,
32};
33
34use super::compaction::selector::DynamicLevelSelectorCore;
35use super::compaction::{CompactionDeveloperConfig, get_compression_algorithm};
36use crate::hummock::checkpoint::HummockVersionCheckpoint;
37use crate::hummock::compaction::CompactStatus;
38use crate::rpc::metrics::MetaMetrics;
39
40pub struct LocalTableMetrics {
41    total_key_count: IntGauge,
42    total_key_size: IntGauge,
43    total_value_size: IntGauge,
44    write_throughput: GenericCounter<AtomicU64>,
45    cal_count: usize,
46    write_size: u64,
47}
48
49const MIN_FLUSH_COUNT: usize = 16;
50const MIN_FLUSH_DATA_SIZE: u64 = 128 * 1024 * 1024;
51
52impl LocalTableMetrics {
53    pub fn inc_write_throughput(&mut self, val: u64) {
54        self.write_size += val;
55        self.cal_count += 1;
56        if self.write_size > MIN_FLUSH_DATA_SIZE || self.cal_count > MIN_FLUSH_COUNT {
57            self.write_throughput.inc_by(self.write_size / 1024 / 1024);
58            self.write_size = 0;
59            self.cal_count = 0;
60        }
61    }
62}
63
64pub fn get_or_create_local_table_stat<'a>(
65    metrics: &MetaMetrics,
66    table_id: u32,
67    local_metrics: &'a mut HashMap<u32, LocalTableMetrics>,
68) -> &'a mut LocalTableMetrics {
69    local_metrics.entry(table_id).or_insert_with(|| {
70        let table_label = format!("{}", table_id);
71        LocalTableMetrics {
72            total_key_count: metrics
73                .version_stats
74                .with_label_values(&[table_label.as_str(), "total_key_count"]),
75            total_key_size: metrics
76                .version_stats
77                .with_label_values(&[table_label.as_str(), "total_key_size"]),
78            total_value_size: metrics
79                .version_stats
80                .with_label_values(&[table_label.as_str(), "total_value_size"]),
81            write_throughput: metrics
82                .table_write_throughput
83                .with_label_values(&[&table_label]),
84            cal_count: 0,
85            write_size: 0,
86        }
87    })
88}
89
90pub fn trigger_local_table_stat(
91    metrics: &MetaMetrics,
92    local_metrics: &mut HashMap<u32, LocalTableMetrics>,
93    version_stats: &HummockVersionStats,
94    table_stats_change: &PbTableStatsMap,
95) {
96    for (table_id, stats) in table_stats_change {
97        if stats.total_key_size == 0 && stats.total_value_size == 0 && stats.total_key_count == 0 {
98            continue;
99        }
100        let table_metrics = get_or_create_local_table_stat(metrics, *table_id, local_metrics);
101        if let Some(table_stats) = version_stats.table_stats.get(table_id) {
102            table_metrics
103                .total_key_count
104                .set(table_stats.total_key_count);
105            table_metrics
106                .total_value_size
107                .set(table_stats.total_value_size);
108            table_metrics.total_key_size.set(table_stats.total_key_size);
109        }
110    }
111}
112
113pub fn trigger_mv_stat(
114    metrics: &MetaMetrics,
115    version_stats: &HummockVersionStats,
116    mv_id_to_all_table_ids: Vec<(u32, Vec<u32>)>,
117) {
118    metrics.materialized_view_stats.reset();
119    for (mv_id, all_table_ids) in mv_id_to_all_table_ids {
120        let total_size = all_table_ids
121            .iter()
122            .filter_map(|&table_id| version_stats.table_stats.get(&table_id))
123            .map(|stats| stats.total_key_size + stats.total_value_size)
124            .sum();
125
126        metrics
127            .materialized_view_stats
128            .with_label_values(&[mv_id.to_string().as_str(), "materialized_view_total_size"])
129            .set(total_size);
130    }
131}
132
133pub fn trigger_sst_stat(
134    metrics: &MetaMetrics,
135    compact_status: Option<&CompactStatus>,
136    current_version: &HummockVersion,
137    compaction_group_id: CompactionGroupId,
138) {
139    let level_sst_cnt = |level_idx: usize| {
140        let mut sst_num = 0;
141        current_version.level_iter(compaction_group_id, |level| {
142            if level.level_idx == level_idx as u32 {
143                sst_num += level.table_infos.len();
144            }
145            true
146        });
147        sst_num
148    };
149    let level_sst_size = |level_idx: usize| {
150        let mut level_sst_size = 0;
151        current_version.level_iter(compaction_group_id, |level| {
152            if level.level_idx == level_idx as u32 {
153                level_sst_size += level.total_file_size;
154            }
155            true
156        });
157        level_sst_size / 1024
158    };
159
160    let mut compacting_task_stat: BTreeMap<(usize, usize), usize> = BTreeMap::default();
161    for idx in 0..current_version.num_levels(compaction_group_id) {
162        let sst_num = level_sst_cnt(idx);
163        let level_label = build_level_metrics_label(compaction_group_id, idx);
164        metrics
165            .level_sst_num
166            .with_label_values(&[&level_label])
167            .set(sst_num as i64);
168        metrics
169            .level_file_size
170            .with_label_values(&[&level_label])
171            .set(level_sst_size(idx) as i64);
172        if let Some(compact_status) = compact_status {
173            let compact_cnt = compact_status.level_handlers[idx].pending_file_count();
174            metrics
175                .level_compact_cnt
176                .with_label_values(&[&level_label])
177                .set(compact_cnt as i64);
178
179            let compacting_task = compact_status.level_handlers[idx].pending_tasks();
180            let mut pending_task_ids: HashSet<u64> = HashSet::default();
181            for task in compacting_task {
182                if pending_task_ids.contains(&task.task_id) {
183                    continue;
184                }
185
186                if idx != 0 && idx == task.target_level as usize {
187                    continue;
188                }
189
190                let key = (idx, task.target_level as usize);
191                let count = compacting_task_stat.entry(key).or_insert(0);
192                *count += 1;
193
194                pending_task_ids.insert(task.task_id);
195            }
196        }
197    }
198
199    tracing::debug!("LSM Compacting STAT {:?}", compacting_task_stat);
200    for ((select, target), compacting_task_count) in &compacting_task_stat {
201        let label_str =
202            build_compact_task_stat_metrics_label(compaction_group_id, *select, *target);
203        metrics
204            .level_compact_task_cnt
205            .with_label_values(&[&label_str])
206            .set(*compacting_task_count as _);
207    }
208
209    if compacting_task_stat.is_empty()
210        && let Some(levels) = current_version.levels.get(&compaction_group_id)
211    {
212        let max_level = levels.levels.len();
213        remove_compacting_task_stat(metrics, compaction_group_id, max_level);
214    }
215
216    {
217        // sub level stat
218        let overlapping_level_label =
219            build_compact_task_l0_stat_metrics_label(compaction_group_id, true, false);
220        let non_overlap_level_label =
221            build_compact_task_l0_stat_metrics_label(compaction_group_id, false, false);
222        let partition_level_label =
223            build_compact_task_l0_stat_metrics_label(compaction_group_id, true, true);
224
225        let overlapping_sst_num = current_version
226            .levels
227            .get(&compaction_group_id)
228            .map(|level| {
229                level
230                    .l0
231                    .sub_levels
232                    .iter()
233                    .filter(|sub_level| sub_level.level_type == LevelType::Overlapping)
234                    .count()
235            })
236            .unwrap_or(0);
237
238        let non_overlap_sst_num = current_version
239            .levels
240            .get(&compaction_group_id)
241            .map(|level| {
242                level
243                    .l0
244                    .sub_levels
245                    .iter()
246                    .filter(|sub_level| sub_level.level_type == LevelType::Nonoverlapping)
247                    .count()
248            })
249            .unwrap_or(0);
250
251        let partition_level_num = current_version
252            .levels
253            .get(&compaction_group_id)
254            .map(|level| {
255                level
256                    .l0
257                    .sub_levels
258                    .iter()
259                    .filter(|sub_level| {
260                        sub_level.level_type == LevelType::Nonoverlapping
261                            && sub_level.vnode_partition_count > 0
262                    })
263                    .count()
264            })
265            .unwrap_or(0);
266        metrics
267            .level_sst_num
268            .with_label_values(&[&overlapping_level_label])
269            .set(overlapping_sst_num as i64);
270
271        metrics
272            .level_sst_num
273            .with_label_values(&[&non_overlap_level_label])
274            .set(non_overlap_sst_num as i64);
275
276        metrics
277            .level_sst_num
278            .with_label_values(&[&partition_level_label])
279            .set(partition_level_num as i64);
280    }
281
282    let previous_time = metrics.time_after_last_observation.load(Ordering::Relaxed);
283    let current_time = SystemTime::now()
284        .duration_since(UNIX_EPOCH)
285        .unwrap()
286        .as_secs();
287    if current_time > 600 + previous_time
288        && metrics
289            .time_after_last_observation
290            .compare_exchange(
291                previous_time,
292                current_time,
293                Ordering::Relaxed,
294                Ordering::Relaxed,
295            )
296            .is_ok()
297        && let Some(compact_status) = compact_status
298    {
299        for (idx, level_handler) in enumerate(compact_status.level_handlers.iter()) {
300            let sst_num = level_sst_cnt(idx);
301            let sst_size = level_sst_size(idx);
302            let compact_cnt = level_handler.pending_file_count();
303            tracing::info!(
304                "Level {} has {} SSTs, the total size of which is {}KB, while {} of those are being compacted to bottom levels",
305                idx,
306                sst_num,
307                sst_size,
308                compact_cnt,
309            );
310        }
311    }
312}
313
314pub fn trigger_epoch_stat(metrics: &MetaMetrics, version: &HummockVersion) {
315    metrics.max_committed_epoch.set(
316        version
317            .state_table_info
318            .info()
319            .values()
320            .map(|i| i.committed_epoch)
321            .max()
322            .unwrap_or(0) as _,
323    );
324    metrics.min_committed_epoch.set(
325        version
326            .state_table_info
327            .info()
328            .values()
329            .map(|i| i.committed_epoch)
330            .min()
331            .unwrap_or(0) as _,
332    );
333}
334
335pub fn remove_compaction_group_in_sst_stat(
336    metrics: &MetaMetrics,
337    compaction_group_id: CompactionGroupId,
338    max_level: usize,
339) {
340    let mut idx = 0;
341    loop {
342        let level_label = build_level_metrics_label(compaction_group_id, idx);
343        let should_continue = metrics
344            .level_sst_num
345            .remove_label_values(&[&level_label])
346            .is_ok();
347
348        metrics
349            .level_file_size
350            .remove_label_values(&[&level_label])
351            .ok();
352
353        metrics
354            .level_compact_cnt
355            .remove_label_values(&[&level_label])
356            .ok();
357        if !should_continue {
358            break;
359        }
360        idx += 1;
361    }
362
363    let overlapping_level_label = build_level_l0_metrics_label(compaction_group_id, true);
364    let non_overlap_level_label = build_level_l0_metrics_label(compaction_group_id, false);
365    metrics
366        .level_sst_num
367        .remove_label_values(&[&overlapping_level_label])
368        .ok();
369    metrics
370        .level_sst_num
371        .remove_label_values(&[&non_overlap_level_label])
372        .ok();
373
374    remove_compacting_task_stat(metrics, compaction_group_id, max_level);
375    remove_split_stat(metrics, compaction_group_id);
376    remove_compact_task_metrics(metrics, compaction_group_id, max_level);
377    remove_compaction_group_stat(metrics, compaction_group_id);
378}
379
380pub fn remove_compacting_task_stat(
381    metrics: &MetaMetrics,
382    compaction_group_id: CompactionGroupId,
383    max_level: usize,
384) {
385    for select_level in 0..=max_level {
386        for target_level in 0..=max_level {
387            let label_str = build_compact_task_stat_metrics_label(
388                compaction_group_id,
389                select_level,
390                target_level,
391            );
392            metrics
393                .level_compact_task_cnt
394                .remove_label_values(&[&label_str])
395                .ok();
396        }
397    }
398}
399
400pub fn remove_split_stat(metrics: &MetaMetrics, compaction_group_id: CompactionGroupId) {
401    let label_str = compaction_group_id.to_string();
402    metrics
403        .state_table_count
404        .remove_label_values(&[&label_str])
405        .ok();
406
407    metrics
408        .branched_sst_count
409        .remove_label_values(&[&label_str])
410        .ok();
411}
412
413pub fn remove_compaction_group_stat(metrics: &MetaMetrics, compaction_group_id: CompactionGroupId) {
414    let label_str = compaction_group_id.to_string();
415    metrics
416        .compaction_group_size
417        .remove_label_values(&[&label_str])
418        .ok();
419    metrics
420        .compaction_group_file_count
421        .remove_label_values(&[&label_str])
422        .ok();
423    metrics
424        .compaction_group_throughput
425        .remove_label_values(&[&label_str])
426        .ok();
427
428    metrics
429        .split_compaction_group_count
430        .remove_label_values(&[&label_str])
431        .ok();
432
433    metrics
434        .merge_compaction_group_count
435        .remove_label_values(&[&label_str])
436        .ok();
437}
438
439pub fn trigger_pin_unpin_version_state(
440    metrics: &MetaMetrics,
441    pinned_versions: &BTreeMap<HummockContextId, HummockPinnedVersion>,
442) {
443    if let Some(m) = pinned_versions.values().map(|v| v.min_pinned_id).min() {
444        metrics.min_pinned_version_id.set(m as i64);
445    } else {
446        metrics
447            .min_pinned_version_id
448            .set(HummockVersionId::MAX.to_u64() as _);
449    }
450}
451
452pub fn trigger_gc_stat(
453    metrics: &MetaMetrics,
454    checkpoint: &HummockVersionCheckpoint,
455    min_pinned_version_id: HummockVersionId,
456) {
457    let current_version_object_size_map = object_size_map(&checkpoint.version);
458    let current_version_object_size = current_version_object_size_map.values().sum::<u64>();
459    let current_version_object_count = current_version_object_size_map.len();
460    let mut old_version_object_size = 0;
461    let mut old_version_object_count = 0;
462    let mut stale_object_size = 0;
463    let mut stale_object_count = 0;
464    checkpoint.stale_objects.iter().for_each(|(id, objects)| {
465        if *id <= min_pinned_version_id {
466            stale_object_size += objects.total_file_size;
467            stale_object_count += objects.id.len() as u64;
468        } else {
469            old_version_object_size += objects.total_file_size;
470            old_version_object_count += objects.id.len() as u64;
471        }
472    });
473    metrics
474        .current_version_object_size
475        .set(current_version_object_size as _);
476    metrics
477        .current_version_object_count
478        .set(current_version_object_count as _);
479    metrics
480        .old_version_object_size
481        .set(old_version_object_size as _);
482    metrics
483        .old_version_object_count
484        .set(old_version_object_count as _);
485    metrics.stale_object_size.set(stale_object_size as _);
486    metrics.stale_object_count.set(stale_object_count as _);
487    // table change log
488    for (table_id, logs) in &checkpoint.version.table_change_log {
489        let object_count = logs
490            .iter()
491            .map(|l| l.old_value.len() + l.new_value.len())
492            .sum::<usize>();
493        let object_size = logs
494            .iter()
495            .map(|l| {
496                l.old_value
497                    .iter()
498                    .chain(l.new_value.iter())
499                    .map(|s| s.file_size as usize)
500                    .sum::<usize>()
501            })
502            .sum::<usize>();
503        metrics
504            .table_change_log_object_count
505            .with_label_values(&[&format!("{table_id}")])
506            .set(object_count as _);
507        metrics
508            .table_change_log_object_size
509            .with_label_values(&[&format!("{table_id}")])
510            .set(object_size as _);
511    }
512}
513
514// Triggers a report on compact_pending_bytes_needed
515pub fn trigger_lsm_stat(
516    metrics: &MetaMetrics,
517    compaction_config: Arc<CompactionConfig>,
518    levels: &Levels,
519    compaction_group_id: CompactionGroupId,
520) {
521    let group_label = compaction_group_id.to_string();
522    // compact_pending_bytes
523    // we don't actually generate a compaction task here so developer config can be ignored.
524    let dynamic_level_core = DynamicLevelSelectorCore::new(
525        compaction_config.clone(),
526        Arc::new(CompactionDeveloperConfig::default()),
527    );
528    let ctx = dynamic_level_core.calculate_level_base_size(levels);
529    {
530        let compact_pending_bytes_needed =
531            dynamic_level_core.compact_pending_bytes_needed_with_ctx(levels, &ctx);
532
533        metrics
534            .compact_pending_bytes
535            .with_label_values(&[&group_label])
536            .set(compact_pending_bytes_needed as _);
537    }
538
539    {
540        // compact_level_compression_ratio
541        let level_compression_ratio = levels
542            .levels
543            .iter()
544            .map(|level| {
545                let ratio = if level.uncompressed_file_size == 0 {
546                    0.0
547                } else {
548                    level.total_file_size as f64 / level.uncompressed_file_size as f64
549                };
550
551                (level.level_idx, ratio)
552            })
553            .collect_vec();
554
555        for (level_index, compression_ratio) in level_compression_ratio {
556            let compression_algorithm_label = get_compression_algorithm(
557                compaction_config.as_ref(),
558                ctx.base_level,
559                level_index as usize,
560            );
561
562            metrics
563                .compact_level_compression_ratio
564                .with_label_values(&[
565                    &group_label,
566                    &level_index.to_string(),
567                    &compression_algorithm_label,
568                ])
569                .set(compression_ratio);
570        }
571    }
572}
573
574pub fn trigger_write_stop_stats(
575    metrics: &MetaMetrics,
576    write_limit: &HashMap<CompactionGroupId, WriteLimit>,
577) {
578    metrics.write_stop_compaction_groups.reset();
579    for cg in write_limit.keys() {
580        metrics
581            .write_stop_compaction_groups
582            .with_label_values(&[&cg.to_string()])
583            .set(1);
584    }
585}
586
587pub fn trigger_split_stat(metrics: &MetaMetrics, version: &HummockVersion) {
588    let branched_ssts = version.build_branched_sst_info();
589
590    for compaction_group_id in version.levels.keys() {
591        let group_label = compaction_group_id.to_string();
592        metrics
593            .state_table_count
594            .with_label_values(&[&group_label])
595            .set(
596                version
597                    .state_table_info
598                    .compaction_group_member_table_ids(*compaction_group_id)
599                    .len() as _,
600            );
601
602        let branched_sst_count: usize = branched_ssts
603            .values()
604            .map(|branched_map| {
605                branched_map
606                    .keys()
607                    .filter(|group_id| *group_id == compaction_group_id)
608                    .count()
609            })
610            .sum();
611
612        metrics
613            .branched_sst_count
614            .with_label_values(&[&group_label])
615            .set(branched_sst_count as _);
616    }
617}
618
619pub fn build_level_metrics_label(compaction_group_id: u64, level_idx: usize) -> String {
620    format!("cg{}_L{}", compaction_group_id, level_idx)
621}
622
623pub fn build_level_l0_metrics_label(compaction_group_id: u64, overlapping: bool) -> String {
624    if overlapping {
625        format!("cg{}_l0_sub_overlapping", compaction_group_id)
626    } else {
627        format!("cg{}_l0_sub_non_overlap", compaction_group_id)
628    }
629}
630
631pub fn build_compact_task_stat_metrics_label(
632    compaction_group_id: u64,
633    select_level: usize,
634    target_level: usize,
635) -> String {
636    format!(
637        "cg{} L{} -> L{}",
638        compaction_group_id, select_level, target_level
639    )
640}
641
642pub fn build_compact_task_l0_stat_metrics_label(
643    compaction_group_id: u64,
644    overlapping: bool,
645    partition: bool,
646) -> String {
647    if partition {
648        format!("cg{}_l0_sub_partition", compaction_group_id)
649    } else if overlapping {
650        format!("cg{}_l0_sub_overlapping", compaction_group_id)
651    } else {
652        format!("cg{}_l0_sub_non_overlap", compaction_group_id)
653    }
654}
655
656pub fn build_compact_task_level_type_metrics_label(
657    select_level: usize,
658    target_level: usize,
659) -> String {
660    format!("L{}->L{}", select_level, target_level)
661}
662
663pub fn remove_compact_task_metrics(
664    metrics: &MetaMetrics,
665    compaction_group_id: CompactionGroupId,
666    max_level: usize,
667) {
668    for select_level in 0..=max_level {
669        for target_level in 0..=max_level {
670            let level_type_label =
671                build_compact_task_level_type_metrics_label(select_level, target_level);
672            let should_continue = metrics
673                .l0_compact_level_count
674                .remove_label_values(&[&compaction_group_id.to_string(), &level_type_label])
675                .is_ok();
676
677            metrics
678                .compact_task_size
679                .remove_label_values(&[&compaction_group_id.to_string(), &level_type_label])
680                .ok();
681
682            metrics
683                .compact_task_size
684                .remove_label_values(&[
685                    &compaction_group_id.to_string(),
686                    &format!("{} uncompressed", level_type_label),
687                ])
688                .ok();
689            if !should_continue {
690                break;
691            }
692        }
693    }
694}
695
696pub fn trigger_compact_tasks_stat(
697    metrics: &MetaMetrics,
698    compact_tasks: &[CompactTask],
699    compact_status: &BTreeMap<CompactionGroupId, CompactStatus>,
700    current_version: &HummockVersion,
701) {
702    let mut task_status_label_map = HashMap::new();
703    let mut task_type_label_map = HashMap::new();
704    let mut group_label_map = HashMap::new();
705
706    for task in compact_tasks {
707        let task_status_label = task_status_label_map
708            .entry(task.task_status)
709            .or_insert_with(|| task.task_status.as_str_name().to_owned());
710
711        let task_type_label = task_type_label_map
712            .entry(task.task_type)
713            .or_insert_with(|| task.task_type.as_str_name().to_owned());
714
715        let group_label = group_label_map
716            .entry(task.compaction_group_id)
717            .or_insert_with(|| task.compaction_group_id.to_string());
718
719        metrics
720            .compact_frequency
721            .with_label_values(&["normal", group_label, task_type_label, task_status_label])
722            .inc();
723    }
724
725    group_label_map.keys().for_each(|group_id| {
726        trigger_sst_stat(
727            metrics,
728            compact_status.get(group_id),
729            current_version,
730            *group_id,
731        );
732    });
733}