risingwave_meta/hummock/
metrics_utils.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17use std::sync::atomic::Ordering;
18use std::time::{SystemTime, UNIX_EPOCH};
19
20use itertools::{Itertools, enumerate};
21use prometheus::IntGauge;
22use prometheus::core::{AtomicU64, GenericCounter};
23use risingwave_common::id::{JobId, TableId};
24use risingwave_hummock_sdk::compact_task::CompactTask;
25use risingwave_hummock_sdk::compaction_group::hummock_version_ext::object_size_map;
26use risingwave_hummock_sdk::level::Levels;
27use risingwave_hummock_sdk::table_stats::PbTableStatsMap;
28use risingwave_hummock_sdk::version::HummockVersion;
29use risingwave_hummock_sdk::{CompactionGroupId, HummockContextId, HummockVersionId};
30use risingwave_pb::hummock::write_limits::WriteLimit;
31use risingwave_pb::hummock::{
32    CompactionConfig, HummockPinnedVersion, HummockVersionStats, LevelType,
33};
34
35use super::compaction::selector::DynamicLevelSelectorCore;
36use super::compaction::{CompactionDeveloperConfig, get_compression_algorithm};
37use crate::hummock::checkpoint::HummockVersionCheckpoint;
38use crate::hummock::compaction::CompactStatus;
39use crate::rpc::metrics::MetaMetrics;
40
41pub struct LocalTableMetrics {
42    total_key_count: IntGauge,
43    total_key_size: IntGauge,
44    total_value_size: IntGauge,
45    write_throughput: GenericCounter<AtomicU64>,
46    cal_count: usize,
47    write_size: u64,
48}
49
50const MIN_FLUSH_COUNT: usize = 16;
51const MIN_FLUSH_DATA_SIZE: u64 = 128 * 1024 * 1024;
52
53impl LocalTableMetrics {
54    pub fn inc_write_throughput(&mut self, val: u64) {
55        self.write_size += val;
56        self.cal_count += 1;
57        if self.write_size > MIN_FLUSH_DATA_SIZE || self.cal_count > MIN_FLUSH_COUNT {
58            self.write_throughput.inc_by(self.write_size / 1024 / 1024);
59            self.write_size = 0;
60            self.cal_count = 0;
61        }
62    }
63}
64
65pub fn get_or_create_local_table_stat<'a>(
66    metrics: &MetaMetrics,
67    table_id: TableId,
68    local_metrics: &'a mut HashMap<TableId, LocalTableMetrics>,
69) -> &'a mut LocalTableMetrics {
70    local_metrics.entry(table_id).or_insert_with(|| {
71        let table_label = format!("{}", table_id);
72        LocalTableMetrics {
73            total_key_count: metrics
74                .version_stats
75                .with_label_values(&[table_label.as_str(), "total_key_count"]),
76            total_key_size: metrics
77                .version_stats
78                .with_label_values(&[table_label.as_str(), "total_key_size"]),
79            total_value_size: metrics
80                .version_stats
81                .with_label_values(&[table_label.as_str(), "total_value_size"]),
82            write_throughput: metrics
83                .table_write_throughput
84                .with_label_values(&[&table_label]),
85            cal_count: 0,
86            write_size: 0,
87        }
88    })
89}
90
91pub fn trigger_local_table_stat(
92    metrics: &MetaMetrics,
93    local_metrics: &mut HashMap<TableId, LocalTableMetrics>,
94    version_stats: &HummockVersionStats,
95    table_stats_change: &PbTableStatsMap,
96) {
97    for (table_id, stats) in table_stats_change {
98        if stats.total_key_size == 0 && stats.total_value_size == 0 && stats.total_key_count == 0 {
99            continue;
100        }
101        let table_metrics = get_or_create_local_table_stat(metrics, *table_id, local_metrics);
102        if let Some(table_stats) = version_stats.table_stats.get(table_id) {
103            table_metrics
104                .total_key_count
105                .set(table_stats.total_key_count);
106            table_metrics
107                .total_value_size
108                .set(table_stats.total_value_size);
109            table_metrics.total_key_size.set(table_stats.total_key_size);
110        }
111    }
112}
113
114pub fn trigger_mv_stat(
115    metrics: &MetaMetrics,
116    version_stats: &HummockVersionStats,
117    mv_id_to_all_table_ids: Vec<(JobId, Vec<TableId>)>,
118) {
119    metrics.materialized_view_stats.reset();
120    for (mv_id, all_table_ids) in mv_id_to_all_table_ids {
121        let total_size = all_table_ids
122            .iter()
123            .filter_map(|&table_id| version_stats.table_stats.get(&table_id))
124            .map(|stats| stats.total_key_size + stats.total_value_size)
125            .sum();
126
127        metrics
128            .materialized_view_stats
129            .with_label_values(&[mv_id.to_string().as_str(), "materialized_view_total_size"])
130            .set(total_size);
131    }
132}
133
134pub fn trigger_sst_stat(
135    metrics: &MetaMetrics,
136    compact_status: Option<&CompactStatus>,
137    current_version: &HummockVersion,
138    compaction_group_id: CompactionGroupId,
139) {
140    let level_sst_cnt = |level_idx: usize| {
141        let mut sst_num = 0;
142        current_version.level_iter(compaction_group_id, |level| {
143            if level.level_idx == level_idx as u32 {
144                sst_num += level.table_infos.len();
145            }
146            true
147        });
148        sst_num
149    };
150    let level_sst_size = |level_idx: usize| {
151        let mut level_sst_size = 0;
152        current_version.level_iter(compaction_group_id, |level| {
153            if level.level_idx == level_idx as u32 {
154                level_sst_size += level.total_file_size;
155            }
156            true
157        });
158        level_sst_size / 1024
159    };
160
161    let mut compacting_task_stat: BTreeMap<(usize, usize), usize> = BTreeMap::default();
162    for idx in 0..current_version.num_levels(compaction_group_id) {
163        let sst_num = level_sst_cnt(idx);
164        let level_label = build_level_metrics_label(compaction_group_id, idx);
165        metrics
166            .level_sst_num
167            .with_label_values(&[&level_label])
168            .set(sst_num as i64);
169        metrics
170            .level_file_size
171            .with_label_values(&[&level_label])
172            .set(level_sst_size(idx) as i64);
173        if let Some(compact_status) = compact_status {
174            let compact_cnt = compact_status.level_handlers[idx].pending_file_count();
175            metrics
176                .level_compact_cnt
177                .with_label_values(&[&level_label])
178                .set(compact_cnt as i64);
179
180            let compacting_task = compact_status.level_handlers[idx].pending_tasks();
181            let mut pending_task_ids: HashSet<u64> = HashSet::default();
182            for task in compacting_task {
183                if pending_task_ids.contains(&task.task_id) {
184                    continue;
185                }
186
187                if idx != 0 && idx == task.target_level as usize {
188                    continue;
189                }
190
191                let key = (idx, task.target_level as usize);
192                let count = compacting_task_stat.entry(key).or_insert(0);
193                *count += 1;
194
195                pending_task_ids.insert(task.task_id);
196            }
197        }
198    }
199
200    tracing::debug!("LSM Compacting STAT {:?}", compacting_task_stat);
201    for ((select, target), compacting_task_count) in &compacting_task_stat {
202        let label_str =
203            build_compact_task_stat_metrics_label(compaction_group_id, *select, *target);
204        metrics
205            .level_compact_task_cnt
206            .with_label_values(&[&label_str])
207            .set(*compacting_task_count as _);
208    }
209
210    if compacting_task_stat.is_empty()
211        && let Some(levels) = current_version.levels.get(&compaction_group_id)
212    {
213        let max_level = levels.levels.len();
214        remove_compacting_task_stat(metrics, compaction_group_id, max_level);
215    }
216
217    {
218        // sub level stat
219        let overlapping_level_label =
220            build_compact_task_l0_stat_metrics_label(compaction_group_id, true, false);
221        let non_overlap_level_label =
222            build_compact_task_l0_stat_metrics_label(compaction_group_id, false, false);
223        let partition_level_label =
224            build_compact_task_l0_stat_metrics_label(compaction_group_id, true, true);
225
226        let overlapping_sst_num = current_version
227            .levels
228            .get(&compaction_group_id)
229            .map(|level| {
230                level
231                    .l0
232                    .sub_levels
233                    .iter()
234                    .filter(|sub_level| sub_level.level_type == LevelType::Overlapping)
235                    .count()
236            })
237            .unwrap_or(0);
238
239        let non_overlap_sst_num = current_version
240            .levels
241            .get(&compaction_group_id)
242            .map(|level| {
243                level
244                    .l0
245                    .sub_levels
246                    .iter()
247                    .filter(|sub_level| sub_level.level_type == LevelType::Nonoverlapping)
248                    .count()
249            })
250            .unwrap_or(0);
251
252        let partition_level_num = current_version
253            .levels
254            .get(&compaction_group_id)
255            .map(|level| {
256                level
257                    .l0
258                    .sub_levels
259                    .iter()
260                    .filter(|sub_level| {
261                        sub_level.level_type == LevelType::Nonoverlapping
262                            && sub_level.vnode_partition_count > 0
263                    })
264                    .count()
265            })
266            .unwrap_or(0);
267        metrics
268            .level_sst_num
269            .with_label_values(&[&overlapping_level_label])
270            .set(overlapping_sst_num as i64);
271
272        metrics
273            .level_sst_num
274            .with_label_values(&[&non_overlap_level_label])
275            .set(non_overlap_sst_num as i64);
276
277        metrics
278            .level_sst_num
279            .with_label_values(&[&partition_level_label])
280            .set(partition_level_num as i64);
281    }
282
283    let previous_time = metrics.time_after_last_observation.load(Ordering::Relaxed);
284    let current_time = SystemTime::now()
285        .duration_since(UNIX_EPOCH)
286        .unwrap()
287        .as_secs();
288    if current_time > 600 + previous_time
289        && metrics
290            .time_after_last_observation
291            .compare_exchange(
292                previous_time,
293                current_time,
294                Ordering::Relaxed,
295                Ordering::Relaxed,
296            )
297            .is_ok()
298        && let Some(compact_status) = compact_status
299    {
300        for (idx, level_handler) in enumerate(compact_status.level_handlers.iter()) {
301            let sst_num = level_sst_cnt(idx);
302            let sst_size = level_sst_size(idx);
303            let compact_cnt = level_handler.pending_file_count();
304            tracing::info!(
305                "Level {} has {} SSTs, the total size of which is {}KB, while {} of those are being compacted to bottom levels",
306                idx,
307                sst_num,
308                sst_size,
309                compact_cnt,
310            );
311        }
312    }
313}
314
315pub fn trigger_epoch_stat(metrics: &MetaMetrics, version: &HummockVersion) {
316    metrics.max_committed_epoch.set(
317        version
318            .state_table_info
319            .info()
320            .values()
321            .map(|i| i.committed_epoch)
322            .max()
323            .unwrap_or(0) as _,
324    );
325    metrics.min_committed_epoch.set(
326        version
327            .state_table_info
328            .info()
329            .values()
330            .map(|i| i.committed_epoch)
331            .min()
332            .unwrap_or(0) as _,
333    );
334}
335
336pub fn remove_compaction_group_in_sst_stat(
337    metrics: &MetaMetrics,
338    compaction_group_id: CompactionGroupId,
339    max_level: usize,
340) {
341    let mut idx = 0;
342    loop {
343        let level_label = build_level_metrics_label(compaction_group_id, idx);
344        let should_continue = metrics
345            .level_sst_num
346            .remove_label_values(&[&level_label])
347            .is_ok();
348
349        metrics
350            .level_file_size
351            .remove_label_values(&[&level_label])
352            .ok();
353
354        metrics
355            .level_compact_cnt
356            .remove_label_values(&[&level_label])
357            .ok();
358        if !should_continue {
359            break;
360        }
361        idx += 1;
362    }
363
364    let overlapping_level_label = build_level_l0_metrics_label(compaction_group_id, true);
365    let non_overlap_level_label = build_level_l0_metrics_label(compaction_group_id, false);
366    metrics
367        .level_sst_num
368        .remove_label_values(&[&overlapping_level_label])
369        .ok();
370    metrics
371        .level_sst_num
372        .remove_label_values(&[&non_overlap_level_label])
373        .ok();
374
375    remove_compacting_task_stat(metrics, compaction_group_id, max_level);
376    remove_split_stat(metrics, compaction_group_id);
377    remove_compact_task_metrics(metrics, compaction_group_id, max_level);
378    remove_compaction_group_stat(metrics, compaction_group_id);
379}
380
381pub fn remove_compacting_task_stat(
382    metrics: &MetaMetrics,
383    compaction_group_id: CompactionGroupId,
384    max_level: usize,
385) {
386    for select_level in 0..=max_level {
387        for target_level in 0..=max_level {
388            let label_str = build_compact_task_stat_metrics_label(
389                compaction_group_id,
390                select_level,
391                target_level,
392            );
393            metrics
394                .level_compact_task_cnt
395                .remove_label_values(&[&label_str])
396                .ok();
397        }
398    }
399}
400
401pub fn remove_split_stat(metrics: &MetaMetrics, compaction_group_id: CompactionGroupId) {
402    let label_str = compaction_group_id.to_string();
403    metrics
404        .state_table_count
405        .remove_label_values(&[&label_str])
406        .ok();
407
408    metrics
409        .branched_sst_count
410        .remove_label_values(&[&label_str])
411        .ok();
412}
413
414pub fn remove_compaction_group_stat(metrics: &MetaMetrics, compaction_group_id: CompactionGroupId) {
415    let label_str = compaction_group_id.to_string();
416    metrics
417        .compaction_group_size
418        .remove_label_values(&[&label_str])
419        .ok();
420    metrics
421        .compaction_group_file_count
422        .remove_label_values(&[&label_str])
423        .ok();
424    metrics
425        .compaction_group_throughput
426        .remove_label_values(&[&label_str])
427        .ok();
428
429    metrics
430        .split_compaction_group_count
431        .remove_label_values(&[&label_str])
432        .ok();
433
434    metrics
435        .merge_compaction_group_count
436        .remove_label_values(&[&label_str])
437        .ok();
438}
439
440pub fn trigger_pin_unpin_version_state(
441    metrics: &MetaMetrics,
442    pinned_versions: &BTreeMap<HummockContextId, HummockPinnedVersion>,
443) {
444    if let Some(m) = pinned_versions.values().map(|v| v.min_pinned_id).min() {
445        metrics.min_pinned_version_id.set(m.as_i64_id());
446    } else {
447        metrics.min_pinned_version_id.set(u64::MAX as _);
448    }
449}
450
451pub fn trigger_gc_stat(
452    metrics: &MetaMetrics,
453    checkpoint: &HummockVersionCheckpoint,
454    min_pinned_version_id: HummockVersionId,
455) {
456    let current_version_object_size_map = object_size_map(&checkpoint.version);
457    let current_version_object_size = current_version_object_size_map.values().sum::<u64>();
458    let current_version_object_count = current_version_object_size_map.len();
459    let mut old_version_object_size = 0;
460    let mut old_version_object_count = 0;
461    let mut stale_object_size = 0;
462    let mut stale_object_count = 0;
463    checkpoint.stale_objects.iter().for_each(|(id, objects)| {
464        if *id <= min_pinned_version_id {
465            stale_object_size += objects.total_file_size;
466            stale_object_count += objects.id.len() as u64;
467        } else {
468            old_version_object_size += objects.total_file_size;
469            old_version_object_count += objects.id.len() as u64;
470        }
471    });
472    metrics
473        .current_version_object_size
474        .set(current_version_object_size as _);
475    metrics
476        .current_version_object_count
477        .set(current_version_object_count as _);
478    metrics
479        .old_version_object_size
480        .set(old_version_object_size as _);
481    metrics
482        .old_version_object_count
483        .set(old_version_object_count as _);
484    metrics.stale_object_size.set(stale_object_size as _);
485    metrics.stale_object_count.set(stale_object_count as _);
486    // table change log
487    for (table_id, logs) in &checkpoint.version.table_change_log {
488        let table_id_label = table_id.to_string();
489        let labels = [table_id_label.as_str()];
490        let object_count = logs
491            .iter()
492            .map(|l| l.old_value.len() + l.new_value.len())
493            .sum::<usize>();
494        let object_size = logs
495            .iter()
496            .map(|l| {
497                l.old_value
498                    .iter()
499                    .chain(l.new_value.iter())
500                    .map(|s| s.file_size as usize)
501                    .sum::<usize>()
502            })
503            .sum::<usize>();
504        metrics
505            .table_change_log_object_count
506            .with_label_values(&labels)
507            .set(object_count as _);
508        metrics
509            .table_change_log_object_size
510            .with_label_values(&labels)
511            .set(object_size as _);
512        let min_epoch = logs.epochs().min().unwrap_or_default();
513        metrics
514            .table_change_log_min_epoch
515            .with_label_values(&labels)
516            .set(min_epoch as _);
517    }
518}
519
520// Triggers a report on compact_pending_bytes_needed
521pub fn trigger_lsm_stat(
522    metrics: &MetaMetrics,
523    compaction_config: Arc<CompactionConfig>,
524    levels: &Levels,
525    compaction_group_id: CompactionGroupId,
526) {
527    let group_label = compaction_group_id.to_string();
528    // compact_pending_bytes
529    // we don't actually generate a compaction task here so developer config can be ignored.
530    let dynamic_level_core = DynamicLevelSelectorCore::new(
531        compaction_config.clone(),
532        Arc::new(CompactionDeveloperConfig::default()),
533    );
534    let ctx = dynamic_level_core.calculate_level_base_size(levels);
535    {
536        let compact_pending_bytes_needed =
537            dynamic_level_core.compact_pending_bytes_needed_with_ctx(levels, &ctx);
538
539        metrics
540            .compact_pending_bytes
541            .with_label_values(&[&group_label])
542            .set(compact_pending_bytes_needed as _);
543    }
544
545    {
546        // compact_level_compression_ratio
547        let level_compression_ratio = levels
548            .levels
549            .iter()
550            .map(|level| {
551                let ratio = if level.uncompressed_file_size == 0 {
552                    0.0
553                } else {
554                    level.total_file_size as f64 / level.uncompressed_file_size as f64
555                };
556
557                (level.level_idx, ratio)
558            })
559            .collect_vec();
560
561        for (level_index, compression_ratio) in level_compression_ratio {
562            let compression_algorithm_label = get_compression_algorithm(
563                compaction_config.as_ref(),
564                ctx.base_level,
565                level_index as usize,
566            );
567
568            metrics
569                .compact_level_compression_ratio
570                .with_label_values(&[
571                    &group_label,
572                    &level_index.to_string(),
573                    &compression_algorithm_label,
574                ])
575                .set(compression_ratio);
576        }
577    }
578}
579
580pub fn trigger_write_stop_stats(
581    metrics: &MetaMetrics,
582    write_limit: &HashMap<CompactionGroupId, WriteLimit>,
583) {
584    metrics.write_stop_compaction_groups.reset();
585    for cg in write_limit.keys() {
586        metrics
587            .write_stop_compaction_groups
588            .with_label_values(&[&cg.to_string()])
589            .set(1);
590    }
591}
592
593pub fn trigger_split_stat(metrics: &MetaMetrics, version: &HummockVersion) {
594    let branched_ssts = version.build_branched_sst_info();
595
596    for compaction_group_id in version.levels.keys() {
597        let group_label = compaction_group_id.to_string();
598        metrics
599            .state_table_count
600            .with_label_values(&[&group_label])
601            .set(
602                version
603                    .state_table_info
604                    .compaction_group_member_table_ids(*compaction_group_id)
605                    .len() as _,
606            );
607
608        let branched_sst_count: usize = branched_ssts
609            .values()
610            .map(|branched_map| {
611                branched_map
612                    .keys()
613                    .filter(|group_id| *group_id == compaction_group_id)
614                    .count()
615            })
616            .sum();
617
618        metrics
619            .branched_sst_count
620            .with_label_values(&[&group_label])
621            .set(branched_sst_count as _);
622    }
623}
624
625pub fn build_level_metrics_label(
626    compaction_group_id: CompactionGroupId,
627    level_idx: usize,
628) -> String {
629    format!("cg{}_L{}", compaction_group_id, level_idx)
630}
631
632pub fn build_level_l0_metrics_label(
633    compaction_group_id: CompactionGroupId,
634    overlapping: bool,
635) -> String {
636    if overlapping {
637        format!("cg{}_l0_sub_overlapping", compaction_group_id)
638    } else {
639        format!("cg{}_l0_sub_non_overlap", compaction_group_id)
640    }
641}
642
643pub fn build_compact_task_stat_metrics_label(
644    compaction_group_id: CompactionGroupId,
645    select_level: usize,
646    target_level: usize,
647) -> String {
648    format!(
649        "cg{} L{} -> L{}",
650        compaction_group_id, select_level, target_level
651    )
652}
653
654pub fn build_compact_task_l0_stat_metrics_label(
655    compaction_group_id: CompactionGroupId,
656    overlapping: bool,
657    partition: bool,
658) -> String {
659    if partition {
660        format!("cg{}_l0_sub_partition", compaction_group_id)
661    } else if overlapping {
662        format!("cg{}_l0_sub_overlapping", compaction_group_id)
663    } else {
664        format!("cg{}_l0_sub_non_overlap", compaction_group_id)
665    }
666}
667
668pub fn build_compact_task_level_type_metrics_label(
669    select_level: usize,
670    target_level: usize,
671) -> String {
672    format!("L{}->L{}", select_level, target_level)
673}
674
675pub fn remove_compact_task_metrics(
676    metrics: &MetaMetrics,
677    compaction_group_id: CompactionGroupId,
678    max_level: usize,
679) {
680    for select_level in 0..=max_level {
681        for target_level in 0..=max_level {
682            let level_type_label =
683                build_compact_task_level_type_metrics_label(select_level, target_level);
684            let should_continue = metrics
685                .l0_compact_level_count
686                .remove_label_values(&[&compaction_group_id.to_string(), &level_type_label])
687                .is_ok();
688
689            metrics
690                .compact_task_size
691                .remove_label_values(&[&compaction_group_id.to_string(), &level_type_label])
692                .ok();
693
694            metrics
695                .compact_task_size
696                .remove_label_values(&[
697                    &compaction_group_id.to_string(),
698                    &format!("{} uncompressed", level_type_label),
699                ])
700                .ok();
701            if !should_continue {
702                break;
703            }
704        }
705    }
706}
707
708pub fn trigger_compact_tasks_stat(
709    metrics: &MetaMetrics,
710    compact_tasks: &[CompactTask],
711    compact_status: &BTreeMap<CompactionGroupId, CompactStatus>,
712    current_version: &HummockVersion,
713) {
714    let mut task_status_label_map = HashMap::new();
715    let mut task_type_label_map = HashMap::new();
716    let mut group_label_map = HashMap::new();
717
718    for task in compact_tasks {
719        let task_status_label = task_status_label_map
720            .entry(task.task_status)
721            .or_insert_with(|| task.task_status.as_str_name().to_owned());
722
723        let task_type_label = task_type_label_map
724            .entry(task.task_type)
725            .or_insert_with(|| task.task_type.as_str_name().to_owned());
726
727        let group_label = group_label_map
728            .entry(task.compaction_group_id)
729            .or_insert_with(|| task.compaction_group_id.to_string());
730
731        metrics
732            .compact_frequency
733            .with_label_values(&["normal", group_label, task_type_label, task_status_label])
734            .inc();
735    }
736
737    group_label_map.keys().for_each(|group_id| {
738        trigger_sst_stat(
739            metrics,
740            compact_status.get(group_id),
741            current_version,
742            *group_id,
743        );
744    });
745}