1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17use std::sync::atomic::Ordering;
18use std::time::{SystemTime, UNIX_EPOCH};
19
20use itertools::{Itertools, enumerate};
21use prometheus::IntGauge;
22use prometheus::core::{AtomicU64, GenericCounter};
23use risingwave_common::id::{JobId, TableId};
24use risingwave_hummock_sdk::change_log::TableChangeLog;
25use risingwave_hummock_sdk::compact_task::CompactTask;
26use risingwave_hummock_sdk::compaction_group::hummock_version_ext::version_object_size_map;
27use risingwave_hummock_sdk::level::Levels;
28use risingwave_hummock_sdk::table_stats::PbTableStatsMap;
29use risingwave_hummock_sdk::version::HummockVersion;
30use risingwave_hummock_sdk::{
31 CompactionGroupId, HummockContextId, HummockObjectId, HummockVersionId,
32};
33use risingwave_pb::hummock::write_limits::WriteLimit;
34use risingwave_pb::hummock::{
35 CompactionConfig, HummockPinnedVersion, HummockVersionStats, LevelType,
36};
37
38use super::compaction::selector::DynamicLevelSelectorCore;
39use super::compaction::{CompactionDeveloperConfig, get_compression_algorithm};
40use crate::hummock::checkpoint::HummockVersionCheckpoint;
41use crate::hummock::compaction::CompactStatus;
42use crate::rpc::metrics::MetaMetrics;
43
44pub struct LocalTableMetrics {
45 total_key_count: IntGauge,
46 total_key_size: IntGauge,
47 total_value_size: IntGauge,
48 write_throughput: GenericCounter<AtomicU64>,
49 cal_count: usize,
50 write_size: u64,
51}
52
53const MIN_FLUSH_COUNT: usize = 16;
54const MIN_FLUSH_DATA_SIZE: u64 = 128 * 1024 * 1024;
55
56impl LocalTableMetrics {
57 pub fn inc_write_throughput(&mut self, val: u64) {
58 self.write_size += val;
59 self.cal_count += 1;
60 if self.write_size > MIN_FLUSH_DATA_SIZE || self.cal_count > MIN_FLUSH_COUNT {
61 self.write_throughput.inc_by(self.write_size / 1024 / 1024);
62 self.write_size = 0;
63 self.cal_count = 0;
64 }
65 }
66}
67
68pub fn get_or_create_local_table_stat<'a>(
69 metrics: &MetaMetrics,
70 table_id: TableId,
71 local_metrics: &'a mut HashMap<TableId, LocalTableMetrics>,
72) -> &'a mut LocalTableMetrics {
73 local_metrics.entry(table_id).or_insert_with(|| {
74 let table_label = format!("{}", table_id);
75 LocalTableMetrics {
76 total_key_count: metrics
77 .version_stats
78 .with_label_values(&[table_label.as_str(), "total_key_count"]),
79 total_key_size: metrics
80 .version_stats
81 .with_label_values(&[table_label.as_str(), "total_key_size"]),
82 total_value_size: metrics
83 .version_stats
84 .with_label_values(&[table_label.as_str(), "total_value_size"]),
85 write_throughput: metrics
86 .table_write_throughput
87 .with_label_values(&[&table_label]),
88 cal_count: 0,
89 write_size: 0,
90 }
91 })
92}
93
94pub fn trigger_local_table_stat(
95 metrics: &MetaMetrics,
96 local_metrics: &mut HashMap<TableId, LocalTableMetrics>,
97 version_stats: &HummockVersionStats,
98 table_stats_change: &PbTableStatsMap,
99) {
100 for (table_id, stats) in table_stats_change {
101 if stats.total_key_size == 0 && stats.total_value_size == 0 && stats.total_key_count == 0 {
102 continue;
103 }
104 let table_metrics = get_or_create_local_table_stat(metrics, *table_id, local_metrics);
105 if let Some(table_stats) = version_stats.table_stats.get(table_id) {
106 table_metrics
107 .total_key_count
108 .set(table_stats.total_key_count);
109 table_metrics
110 .total_value_size
111 .set(table_stats.total_value_size);
112 table_metrics.total_key_size.set(table_stats.total_key_size);
113 }
114 }
115}
116
117pub fn trigger_mv_stat(
118 metrics: &MetaMetrics,
119 version_stats: &HummockVersionStats,
120 mv_id_to_all_table_ids: Vec<(JobId, Vec<TableId>)>,
121) {
122 metrics.materialized_view_stats.reset();
123 for (mv_id, all_table_ids) in mv_id_to_all_table_ids {
124 let total_size = all_table_ids
125 .iter()
126 .filter_map(|&table_id| version_stats.table_stats.get(&table_id))
127 .map(|stats| stats.total_key_size + stats.total_value_size)
128 .sum();
129
130 metrics
131 .materialized_view_stats
132 .with_label_values(&[mv_id.to_string().as_str(), "materialized_view_total_size"])
133 .set(total_size);
134 }
135}
136
137pub fn trigger_sst_stat(
138 metrics: &MetaMetrics,
139 compact_status: Option<&CompactStatus>,
140 current_version: &HummockVersion,
141 compaction_group_id: CompactionGroupId,
142) {
143 let level_sst_cnt = |level_idx: usize| {
144 let mut sst_num = 0;
145 current_version.level_iter(compaction_group_id, |level| {
146 if level.level_idx == level_idx as u32 {
147 sst_num += level.table_infos.len();
148 }
149 true
150 });
151 sst_num
152 };
153 let level_sst_size = |level_idx: usize| {
154 let mut level_sst_size = 0;
155 current_version.level_iter(compaction_group_id, |level| {
156 if level.level_idx == level_idx as u32 {
157 level_sst_size += level.total_file_size;
158 }
159 true
160 });
161 level_sst_size / 1024
162 };
163
164 let mut compacting_task_stat: BTreeMap<(usize, usize), usize> = BTreeMap::default();
165 for idx in 0..current_version.num_levels(compaction_group_id) {
166 let sst_num = level_sst_cnt(idx);
167 let level_label = build_level_metrics_label(compaction_group_id, idx);
168 metrics
169 .level_sst_num
170 .with_label_values(&[&level_label])
171 .set(sst_num as i64);
172 metrics
173 .level_file_size
174 .with_label_values(&[&level_label])
175 .set(level_sst_size(idx) as i64);
176 if let Some(compact_status) = compact_status {
177 let compact_cnt = compact_status.level_handlers[idx].pending_file_count();
178 metrics
179 .level_compact_cnt
180 .with_label_values(&[&level_label])
181 .set(compact_cnt as i64);
182
183 let compacting_task = compact_status.level_handlers[idx].pending_tasks();
184 let mut pending_task_ids: HashSet<u64> = HashSet::default();
185 for task in compacting_task {
186 if pending_task_ids.contains(&task.task_id) {
187 continue;
188 }
189
190 if idx != 0 && idx == task.target_level as usize {
191 continue;
192 }
193
194 let key = (idx, task.target_level as usize);
195 let count = compacting_task_stat.entry(key).or_insert(0);
196 *count += 1;
197
198 pending_task_ids.insert(task.task_id);
199 }
200 }
201 }
202
203 tracing::debug!("LSM Compacting STAT {:?}", compacting_task_stat);
204 for ((select, target), compacting_task_count) in &compacting_task_stat {
205 let label_str =
206 build_compact_task_stat_metrics_label(compaction_group_id, *select, *target);
207 metrics
208 .level_compact_task_cnt
209 .with_label_values(&[&label_str])
210 .set(*compacting_task_count as _);
211 }
212
213 if compacting_task_stat.is_empty()
214 && let Some(levels) = current_version.levels.get(&compaction_group_id)
215 {
216 let max_level = levels.levels.len();
217 remove_compacting_task_stat(metrics, compaction_group_id, max_level);
218 }
219
220 {
221 let overlapping_level_label =
223 build_compact_task_l0_stat_metrics_label(compaction_group_id, true, false);
224 let non_overlap_level_label =
225 build_compact_task_l0_stat_metrics_label(compaction_group_id, false, false);
226 let partition_level_label =
227 build_compact_task_l0_stat_metrics_label(compaction_group_id, true, true);
228
229 let overlapping_sst_num = current_version
230 .levels
231 .get(&compaction_group_id)
232 .map(|level| {
233 level
234 .l0
235 .sub_levels
236 .iter()
237 .filter(|sub_level| sub_level.level_type == LevelType::Overlapping)
238 .count()
239 })
240 .unwrap_or(0);
241
242 let non_overlap_sst_num = current_version
243 .levels
244 .get(&compaction_group_id)
245 .map(|level| {
246 level
247 .l0
248 .sub_levels
249 .iter()
250 .filter(|sub_level| sub_level.level_type == LevelType::Nonoverlapping)
251 .count()
252 })
253 .unwrap_or(0);
254
255 let partition_level_num = current_version
256 .levels
257 .get(&compaction_group_id)
258 .map(|level| {
259 level
260 .l0
261 .sub_levels
262 .iter()
263 .filter(|sub_level| {
264 sub_level.level_type == LevelType::Nonoverlapping
265 && sub_level.vnode_partition_count > 0
266 })
267 .count()
268 })
269 .unwrap_or(0);
270 metrics
271 .level_sst_num
272 .with_label_values(&[&overlapping_level_label])
273 .set(overlapping_sst_num as i64);
274
275 metrics
276 .level_sst_num
277 .with_label_values(&[&non_overlap_level_label])
278 .set(non_overlap_sst_num as i64);
279
280 metrics
281 .level_sst_num
282 .with_label_values(&[&partition_level_label])
283 .set(partition_level_num as i64);
284 }
285
286 let previous_time = metrics.time_after_last_observation.load(Ordering::Relaxed);
287 let current_time = SystemTime::now()
288 .duration_since(UNIX_EPOCH)
289 .unwrap()
290 .as_secs();
291 if current_time > 600 + previous_time
292 && metrics
293 .time_after_last_observation
294 .compare_exchange(
295 previous_time,
296 current_time,
297 Ordering::Relaxed,
298 Ordering::Relaxed,
299 )
300 .is_ok()
301 && let Some(compact_status) = compact_status
302 {
303 for (idx, level_handler) in enumerate(compact_status.level_handlers.iter()) {
304 let sst_num = level_sst_cnt(idx);
305 let sst_size = level_sst_size(idx);
306 let compact_cnt = level_handler.pending_file_count();
307 tracing::info!(
308 "Level {} has {} SSTs, the total size of which is {}KB, while {} of those are being compacted to bottom levels",
309 idx,
310 sst_num,
311 sst_size,
312 compact_cnt,
313 );
314 }
315 }
316}
317
318pub fn trigger_epoch_stat(metrics: &MetaMetrics, version: &HummockVersion) {
319 metrics.max_committed_epoch.set(
320 version
321 .state_table_info
322 .info()
323 .values()
324 .map(|i| i.committed_epoch)
325 .max()
326 .unwrap_or(0) as _,
327 );
328 metrics.min_committed_epoch.set(
329 version
330 .state_table_info
331 .info()
332 .values()
333 .map(|i| i.committed_epoch)
334 .min()
335 .unwrap_or(0) as _,
336 );
337}
338
339pub fn remove_compaction_group_in_sst_stat(
340 metrics: &MetaMetrics,
341 compaction_group_id: CompactionGroupId,
342 max_level: usize,
343) {
344 let mut idx = 0;
345 loop {
346 let level_label = build_level_metrics_label(compaction_group_id, idx);
347 let should_continue = metrics
348 .level_sst_num
349 .remove_label_values(&[&level_label])
350 .is_ok();
351
352 metrics
353 .level_file_size
354 .remove_label_values(&[&level_label])
355 .ok();
356
357 metrics
358 .level_compact_cnt
359 .remove_label_values(&[&level_label])
360 .ok();
361 if !should_continue {
362 break;
363 }
364 idx += 1;
365 }
366
367 let overlapping_level_label = build_level_l0_metrics_label(compaction_group_id, true);
368 let non_overlap_level_label = build_level_l0_metrics_label(compaction_group_id, false);
369 metrics
370 .level_sst_num
371 .remove_label_values(&[&overlapping_level_label])
372 .ok();
373 metrics
374 .level_sst_num
375 .remove_label_values(&[&non_overlap_level_label])
376 .ok();
377
378 remove_compacting_task_stat(metrics, compaction_group_id, max_level);
379 remove_split_stat(metrics, compaction_group_id);
380 remove_compact_task_metrics(metrics, compaction_group_id, max_level);
381 remove_compaction_group_stat(metrics, compaction_group_id);
382}
383
384pub fn remove_compacting_task_stat(
385 metrics: &MetaMetrics,
386 compaction_group_id: CompactionGroupId,
387 max_level: usize,
388) {
389 for select_level in 0..=max_level {
390 for target_level in 0..=max_level {
391 let label_str = build_compact_task_stat_metrics_label(
392 compaction_group_id,
393 select_level,
394 target_level,
395 );
396 metrics
397 .level_compact_task_cnt
398 .remove_label_values(&[&label_str])
399 .ok();
400 }
401 }
402}
403
404pub fn remove_split_stat(metrics: &MetaMetrics, compaction_group_id: CompactionGroupId) {
405 let label_str = compaction_group_id.to_string();
406 metrics
407 .state_table_count
408 .remove_label_values(&[&label_str])
409 .ok();
410
411 metrics
412 .branched_sst_count
413 .remove_label_values(&[&label_str])
414 .ok();
415}
416
417pub fn remove_compaction_group_stat(metrics: &MetaMetrics, compaction_group_id: CompactionGroupId) {
418 let label_str = compaction_group_id.to_string();
419 metrics
420 .compaction_group_size
421 .remove_label_values(&[&label_str])
422 .ok();
423 metrics
424 .compaction_group_file_count
425 .remove_label_values(&[&label_str])
426 .ok();
427 metrics
428 .compaction_group_throughput
429 .remove_label_values(&[&label_str])
430 .ok();
431
432 metrics
433 .split_compaction_group_count
434 .remove_label_values(&[&label_str])
435 .ok();
436
437 metrics
438 .merge_compaction_group_count
439 .remove_label_values(&[&label_str])
440 .ok();
441}
442
443pub fn trigger_pin_unpin_version_state(
444 metrics: &MetaMetrics,
445 pinned_versions: &BTreeMap<HummockContextId, HummockPinnedVersion>,
446) {
447 if let Some(m) = pinned_versions.values().map(|v| v.min_pinned_id).min() {
448 metrics.min_pinned_version_id.set(m.as_i64_id());
449 } else {
450 metrics.min_pinned_version_id.set(u64::MAX as _);
451 }
452}
453
454pub fn trigger_gc_stat(
455 metrics: &MetaMetrics,
456 checkpoint: &HummockVersionCheckpoint,
457 min_pinned_version_id: HummockVersionId,
458 current_table_change_log: &HashMap<TableId, TableChangeLog>,
459) {
460 let mut current_version_object_size_map: HashMap<_, _> =
461 version_object_size_map(&checkpoint.version);
462 current_version_object_size_map.extend(current_table_change_log.values().flat_map(
465 |change_log| {
466 change_log.iter().flat_map(|s| {
467 s.change_log_ssts()
468 .map(|s| (HummockObjectId::Sstable(s.object_id), s.file_size))
469 })
470 },
471 ));
472 let current_version_object_size = current_version_object_size_map.values().sum::<u64>();
473 let current_version_object_count = current_version_object_size_map.len();
474 let mut old_version_object_size = 0;
475 let mut old_version_object_count = 0;
476 let mut stale_object_size = 0;
477 let mut stale_object_count = 0;
478 checkpoint.stale_objects.iter().for_each(|(id, objects)| {
479 if *id <= min_pinned_version_id {
480 stale_object_size += objects.total_file_size;
481 stale_object_count += objects.id.len() as u64;
482 } else {
483 old_version_object_size += objects.total_file_size;
484 old_version_object_count += objects.id.len() as u64;
485 }
486 });
487 metrics
488 .current_version_object_size
489 .set(current_version_object_size as _);
490 metrics
491 .current_version_object_count
492 .set(current_version_object_count as _);
493 metrics
494 .old_version_object_size
495 .set(old_version_object_size as _);
496 metrics
497 .old_version_object_count
498 .set(old_version_object_count as _);
499 metrics.stale_object_size.set(stale_object_size as _);
500 metrics.stale_object_count.set(stale_object_count as _);
501 for (table_id, logs) in current_table_change_log {
503 let table_id_label = table_id.to_string();
504 let labels = [table_id_label.as_str()];
505 let object_count = logs
506 .iter()
507 .map(|l| l.old_value.len() + l.new_value.len())
508 .sum::<usize>();
509 let object_size = logs
510 .iter()
511 .map(|l| {
512 l.old_value
513 .iter()
514 .chain(l.new_value.iter())
515 .map(|s| s.file_size as usize)
516 .sum::<usize>()
517 })
518 .sum::<usize>();
519 metrics
520 .table_change_log_object_count
521 .with_label_values(&labels)
522 .set(object_count as _);
523 metrics
524 .table_change_log_object_size
525 .with_label_values(&labels)
526 .set(object_size as _);
527 let min_epoch = logs.epochs().min().unwrap_or_default();
528 metrics
529 .table_change_log_min_epoch
530 .with_label_values(&labels)
531 .set(min_epoch as _);
532 }
533}
534
535pub fn trigger_lsm_stat(
537 metrics: &MetaMetrics,
538 compaction_config: Arc<CompactionConfig>,
539 levels: &Levels,
540 compaction_group_id: CompactionGroupId,
541) {
542 let group_label = compaction_group_id.to_string();
543 let dynamic_level_core = DynamicLevelSelectorCore::new(
546 compaction_config.clone(),
547 Arc::new(CompactionDeveloperConfig::default()),
548 );
549 let ctx = dynamic_level_core.calculate_level_base_size(levels);
550 {
551 let compact_pending_bytes_needed =
552 dynamic_level_core.compact_pending_bytes_needed_with_ctx(levels, &ctx);
553
554 metrics
555 .compact_pending_bytes
556 .with_label_values(&[&group_label])
557 .set(compact_pending_bytes_needed as _);
558 }
559
560 {
561 let level_compression_ratio = levels
563 .levels
564 .iter()
565 .map(|level| {
566 let ratio = if level.uncompressed_file_size == 0 {
567 0.0
568 } else {
569 level.total_file_size as f64 / level.uncompressed_file_size as f64
570 };
571
572 (level.level_idx, ratio)
573 })
574 .collect_vec();
575
576 for (level_index, compression_ratio) in level_compression_ratio {
577 let compression_algorithm_label = get_compression_algorithm(
578 compaction_config.as_ref(),
579 ctx.base_level,
580 level_index as usize,
581 );
582
583 metrics
584 .compact_level_compression_ratio
585 .with_label_values(&[
586 &group_label,
587 &level_index.to_string(),
588 &compression_algorithm_label,
589 ])
590 .set(compression_ratio);
591 }
592 }
593}
594
595pub fn trigger_write_stop_stats(
596 metrics: &MetaMetrics,
597 write_limit: &HashMap<CompactionGroupId, WriteLimit>,
598) {
599 metrics.write_stop_compaction_groups.reset();
600 for cg in write_limit.keys() {
601 metrics
602 .write_stop_compaction_groups
603 .with_label_values(&[&cg.to_string()])
604 .set(1);
605 }
606}
607
608pub fn trigger_split_stat(metrics: &MetaMetrics, version: &HummockVersion) {
609 let branched_ssts = version.build_branched_sst_info();
610
611 for compaction_group_id in version.levels.keys() {
612 let group_label = compaction_group_id.to_string();
613 metrics
614 .state_table_count
615 .with_label_values(&[&group_label])
616 .set(
617 version
618 .state_table_info
619 .compaction_group_member_table_ids(*compaction_group_id)
620 .len() as _,
621 );
622
623 let branched_sst_count: usize = branched_ssts
624 .values()
625 .map(|branched_map| {
626 branched_map
627 .keys()
628 .filter(|group_id| *group_id == compaction_group_id)
629 .count()
630 })
631 .sum();
632
633 metrics
634 .branched_sst_count
635 .with_label_values(&[&group_label])
636 .set(branched_sst_count as _);
637 }
638}
639
640pub fn build_level_metrics_label(
641 compaction_group_id: CompactionGroupId,
642 level_idx: usize,
643) -> String {
644 format!("cg{}_L{}", compaction_group_id, level_idx)
645}
646
647pub fn build_level_l0_metrics_label(
648 compaction_group_id: CompactionGroupId,
649 overlapping: bool,
650) -> String {
651 if overlapping {
652 format!("cg{}_l0_sub_overlapping", compaction_group_id)
653 } else {
654 format!("cg{}_l0_sub_non_overlap", compaction_group_id)
655 }
656}
657
658pub fn build_compact_task_stat_metrics_label(
659 compaction_group_id: CompactionGroupId,
660 select_level: usize,
661 target_level: usize,
662) -> String {
663 format!(
664 "cg{} L{} -> L{}",
665 compaction_group_id, select_level, target_level
666 )
667}
668
669pub fn build_compact_task_l0_stat_metrics_label(
670 compaction_group_id: CompactionGroupId,
671 overlapping: bool,
672 partition: bool,
673) -> String {
674 if partition {
675 format!("cg{}_l0_sub_partition", compaction_group_id)
676 } else if overlapping {
677 format!("cg{}_l0_sub_overlapping", compaction_group_id)
678 } else {
679 format!("cg{}_l0_sub_non_overlap", compaction_group_id)
680 }
681}
682
683pub fn build_compact_task_level_type_metrics_label(
684 select_level: usize,
685 target_level: usize,
686) -> String {
687 format!("L{}->L{}", select_level, target_level)
688}
689
690pub fn remove_compact_task_metrics(
691 metrics: &MetaMetrics,
692 compaction_group_id: CompactionGroupId,
693 max_level: usize,
694) {
695 for select_level in 0..=max_level {
696 for target_level in 0..=max_level {
697 let level_type_label =
698 build_compact_task_level_type_metrics_label(select_level, target_level);
699 let should_continue = metrics
700 .l0_compact_level_count
701 .remove_label_values(&[&compaction_group_id.to_string(), &level_type_label])
702 .is_ok();
703
704 metrics
705 .compact_task_size
706 .remove_label_values(&[&compaction_group_id.to_string(), &level_type_label])
707 .ok();
708
709 metrics
710 .compact_task_size
711 .remove_label_values(&[
712 &compaction_group_id.to_string(),
713 &format!("{} uncompressed", level_type_label),
714 ])
715 .ok();
716 if !should_continue {
717 break;
718 }
719 }
720 }
721}
722
723pub fn trigger_compact_tasks_stat(
724 metrics: &MetaMetrics,
725 compact_tasks: &[CompactTask],
726 compact_status: &BTreeMap<CompactionGroupId, CompactStatus>,
727 current_version: &HummockVersion,
728) {
729 let mut task_status_label_map = HashMap::new();
730 let mut task_type_label_map = HashMap::new();
731 let mut group_label_map = HashMap::new();
732
733 for task in compact_tasks {
734 let task_status_label = task_status_label_map
735 .entry(task.task_status)
736 .or_insert_with(|| task.task_status.as_str_name().to_owned());
737
738 let task_type_label = task_type_label_map
739 .entry(task.task_type)
740 .or_insert_with(|| task.task_type.as_str_name().to_owned());
741
742 let group_label = group_label_map
743 .entry(task.compaction_group_id)
744 .or_insert_with(|| task.compaction_group_id.to_string());
745
746 metrics
747 .compact_frequency
748 .with_label_values(&["normal", group_label, task_type_label, task_status_label])
749 .inc();
750 }
751
752 group_label_map.keys().for_each(|group_id| {
753 trigger_sst_stat(
754 metrics,
755 compact_status.get(group_id),
756 current_version,
757 *group_id,
758 );
759 });
760}