1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17use std::sync::atomic::Ordering;
18use std::time::{SystemTime, UNIX_EPOCH};
19
20use itertools::{Itertools, enumerate};
21use prometheus::IntGauge;
22use prometheus::core::{AtomicU64, GenericCounter};
23use risingwave_hummock_sdk::compact_task::CompactTask;
24use risingwave_hummock_sdk::compaction_group::hummock_version_ext::object_size_map;
25use risingwave_hummock_sdk::level::Levels;
26use risingwave_hummock_sdk::table_stats::PbTableStatsMap;
27use risingwave_hummock_sdk::version::HummockVersion;
28use risingwave_hummock_sdk::{CompactionGroupId, HummockContextId, HummockVersionId};
29use risingwave_pb::hummock::write_limits::WriteLimit;
30use risingwave_pb::hummock::{
31 CompactionConfig, HummockPinnedVersion, HummockVersionStats, LevelType,
32};
33
34use super::compaction::selector::DynamicLevelSelectorCore;
35use super::compaction::{CompactionDeveloperConfig, get_compression_algorithm};
36use crate::hummock::checkpoint::HummockVersionCheckpoint;
37use crate::hummock::compaction::CompactStatus;
38use crate::rpc::metrics::MetaMetrics;
39
40pub struct LocalTableMetrics {
41 total_key_count: IntGauge,
42 total_key_size: IntGauge,
43 total_value_size: IntGauge,
44 write_throughput: GenericCounter<AtomicU64>,
45 cal_count: usize,
46 write_size: u64,
47}
48
49const MIN_FLUSH_COUNT: usize = 16;
50const MIN_FLUSH_DATA_SIZE: u64 = 128 * 1024 * 1024;
51
52impl LocalTableMetrics {
53 pub fn inc_write_throughput(&mut self, val: u64) {
54 self.write_size += val;
55 self.cal_count += 1;
56 if self.write_size > MIN_FLUSH_DATA_SIZE || self.cal_count > MIN_FLUSH_COUNT {
57 self.write_throughput.inc_by(self.write_size / 1024 / 1024);
58 self.write_size = 0;
59 self.cal_count = 0;
60 }
61 }
62}
63
64pub fn get_or_create_local_table_stat<'a>(
65 metrics: &MetaMetrics,
66 table_id: u32,
67 local_metrics: &'a mut HashMap<u32, LocalTableMetrics>,
68) -> &'a mut LocalTableMetrics {
69 local_metrics.entry(table_id).or_insert_with(|| {
70 let table_label = format!("{}", table_id);
71 LocalTableMetrics {
72 total_key_count: metrics
73 .version_stats
74 .with_label_values(&[table_label.as_str(), "total_key_count"]),
75 total_key_size: metrics
76 .version_stats
77 .with_label_values(&[table_label.as_str(), "total_key_size"]),
78 total_value_size: metrics
79 .version_stats
80 .with_label_values(&[table_label.as_str(), "total_value_size"]),
81 write_throughput: metrics
82 .table_write_throughput
83 .with_label_values(&[&table_label]),
84 cal_count: 0,
85 write_size: 0,
86 }
87 })
88}
89
90pub fn trigger_local_table_stat(
91 metrics: &MetaMetrics,
92 local_metrics: &mut HashMap<u32, LocalTableMetrics>,
93 version_stats: &HummockVersionStats,
94 table_stats_change: &PbTableStatsMap,
95) {
96 for (table_id, stats) in table_stats_change {
97 if stats.total_key_size == 0 && stats.total_value_size == 0 && stats.total_key_count == 0 {
98 continue;
99 }
100 let table_metrics = get_or_create_local_table_stat(metrics, *table_id, local_metrics);
101 if let Some(table_stats) = version_stats.table_stats.get(table_id) {
102 table_metrics
103 .total_key_count
104 .set(table_stats.total_key_count);
105 table_metrics
106 .total_value_size
107 .set(table_stats.total_value_size);
108 table_metrics.total_key_size.set(table_stats.total_key_size);
109 }
110 }
111}
112
113pub fn trigger_mv_stat(
114 metrics: &MetaMetrics,
115 version_stats: &HummockVersionStats,
116 mv_id_to_all_table_ids: Vec<(u32, Vec<u32>)>,
117) {
118 metrics.materialized_view_stats.reset();
119 for (mv_id, all_table_ids) in mv_id_to_all_table_ids {
120 let total_size = all_table_ids
121 .iter()
122 .filter_map(|&table_id| version_stats.table_stats.get(&table_id))
123 .map(|stats| stats.total_key_size + stats.total_value_size)
124 .sum();
125
126 metrics
127 .materialized_view_stats
128 .with_label_values(&[mv_id.to_string().as_str(), "materialized_view_total_size"])
129 .set(total_size);
130 }
131}
132
133pub fn trigger_sst_stat(
134 metrics: &MetaMetrics,
135 compact_status: Option<&CompactStatus>,
136 current_version: &HummockVersion,
137 compaction_group_id: CompactionGroupId,
138) {
139 let level_sst_cnt = |level_idx: usize| {
140 let mut sst_num = 0;
141 current_version.level_iter(compaction_group_id, |level| {
142 if level.level_idx == level_idx as u32 {
143 sst_num += level.table_infos.len();
144 }
145 true
146 });
147 sst_num
148 };
149 let level_sst_size = |level_idx: usize| {
150 let mut level_sst_size = 0;
151 current_version.level_iter(compaction_group_id, |level| {
152 if level.level_idx == level_idx as u32 {
153 level_sst_size += level.total_file_size;
154 }
155 true
156 });
157 level_sst_size / 1024
158 };
159
160 let mut compacting_task_stat: BTreeMap<(usize, usize), usize> = BTreeMap::default();
161 for idx in 0..current_version.num_levels(compaction_group_id) {
162 let sst_num = level_sst_cnt(idx);
163 let level_label = build_level_metrics_label(compaction_group_id, idx);
164 metrics
165 .level_sst_num
166 .with_label_values(&[&level_label])
167 .set(sst_num as i64);
168 metrics
169 .level_file_size
170 .with_label_values(&[&level_label])
171 .set(level_sst_size(idx) as i64);
172 if let Some(compact_status) = compact_status {
173 let compact_cnt = compact_status.level_handlers[idx].pending_file_count();
174 metrics
175 .level_compact_cnt
176 .with_label_values(&[&level_label])
177 .set(compact_cnt as i64);
178
179 let compacting_task = compact_status.level_handlers[idx].pending_tasks();
180 let mut pending_task_ids: HashSet<u64> = HashSet::default();
181 for task in compacting_task {
182 if pending_task_ids.contains(&task.task_id) {
183 continue;
184 }
185
186 if idx != 0 && idx == task.target_level as usize {
187 continue;
188 }
189
190 let key = (idx, task.target_level as usize);
191 let count = compacting_task_stat.entry(key).or_insert(0);
192 *count += 1;
193
194 pending_task_ids.insert(task.task_id);
195 }
196 }
197 }
198
199 tracing::debug!("LSM Compacting STAT {:?}", compacting_task_stat);
200 for ((select, target), compacting_task_count) in &compacting_task_stat {
201 let label_str =
202 build_compact_task_stat_metrics_label(compaction_group_id, *select, *target);
203 metrics
204 .level_compact_task_cnt
205 .with_label_values(&[&label_str])
206 .set(*compacting_task_count as _);
207 }
208
209 if compacting_task_stat.is_empty()
210 && let Some(levels) = current_version.levels.get(&compaction_group_id)
211 {
212 let max_level = levels.levels.len();
213 remove_compacting_task_stat(metrics, compaction_group_id, max_level);
214 }
215
216 {
217 let overlapping_level_label =
219 build_compact_task_l0_stat_metrics_label(compaction_group_id, true, false);
220 let non_overlap_level_label =
221 build_compact_task_l0_stat_metrics_label(compaction_group_id, false, false);
222 let partition_level_label =
223 build_compact_task_l0_stat_metrics_label(compaction_group_id, true, true);
224
225 let overlapping_sst_num = current_version
226 .levels
227 .get(&compaction_group_id)
228 .map(|level| {
229 level
230 .l0
231 .sub_levels
232 .iter()
233 .filter(|sub_level| sub_level.level_type == LevelType::Overlapping)
234 .count()
235 })
236 .unwrap_or(0);
237
238 let non_overlap_sst_num = current_version
239 .levels
240 .get(&compaction_group_id)
241 .map(|level| {
242 level
243 .l0
244 .sub_levels
245 .iter()
246 .filter(|sub_level| sub_level.level_type == LevelType::Nonoverlapping)
247 .count()
248 })
249 .unwrap_or(0);
250
251 let partition_level_num = current_version
252 .levels
253 .get(&compaction_group_id)
254 .map(|level| {
255 level
256 .l0
257 .sub_levels
258 .iter()
259 .filter(|sub_level| {
260 sub_level.level_type == LevelType::Nonoverlapping
261 && sub_level.vnode_partition_count > 0
262 })
263 .count()
264 })
265 .unwrap_or(0);
266 metrics
267 .level_sst_num
268 .with_label_values(&[&overlapping_level_label])
269 .set(overlapping_sst_num as i64);
270
271 metrics
272 .level_sst_num
273 .with_label_values(&[&non_overlap_level_label])
274 .set(non_overlap_sst_num as i64);
275
276 metrics
277 .level_sst_num
278 .with_label_values(&[&partition_level_label])
279 .set(partition_level_num as i64);
280 }
281
282 let previous_time = metrics.time_after_last_observation.load(Ordering::Relaxed);
283 let current_time = SystemTime::now()
284 .duration_since(UNIX_EPOCH)
285 .unwrap()
286 .as_secs();
287 if current_time > 600 + previous_time
288 && metrics
289 .time_after_last_observation
290 .compare_exchange(
291 previous_time,
292 current_time,
293 Ordering::Relaxed,
294 Ordering::Relaxed,
295 )
296 .is_ok()
297 && let Some(compact_status) = compact_status
298 {
299 for (idx, level_handler) in enumerate(compact_status.level_handlers.iter()) {
300 let sst_num = level_sst_cnt(idx);
301 let sst_size = level_sst_size(idx);
302 let compact_cnt = level_handler.pending_file_count();
303 tracing::info!(
304 "Level {} has {} SSTs, the total size of which is {}KB, while {} of those are being compacted to bottom levels",
305 idx,
306 sst_num,
307 sst_size,
308 compact_cnt,
309 );
310 }
311 }
312}
313
314pub fn trigger_epoch_stat(metrics: &MetaMetrics, version: &HummockVersion) {
315 metrics.max_committed_epoch.set(
316 version
317 .state_table_info
318 .info()
319 .values()
320 .map(|i| i.committed_epoch)
321 .max()
322 .unwrap_or(0) as _,
323 );
324 metrics.min_committed_epoch.set(
325 version
326 .state_table_info
327 .info()
328 .values()
329 .map(|i| i.committed_epoch)
330 .min()
331 .unwrap_or(0) as _,
332 );
333}
334
335pub fn remove_compaction_group_in_sst_stat(
336 metrics: &MetaMetrics,
337 compaction_group_id: CompactionGroupId,
338 max_level: usize,
339) {
340 let mut idx = 0;
341 loop {
342 let level_label = build_level_metrics_label(compaction_group_id, idx);
343 let should_continue = metrics
344 .level_sst_num
345 .remove_label_values(&[&level_label])
346 .is_ok();
347
348 metrics
349 .level_file_size
350 .remove_label_values(&[&level_label])
351 .ok();
352
353 metrics
354 .level_compact_cnt
355 .remove_label_values(&[&level_label])
356 .ok();
357 if !should_continue {
358 break;
359 }
360 idx += 1;
361 }
362
363 let overlapping_level_label = build_level_l0_metrics_label(compaction_group_id, true);
364 let non_overlap_level_label = build_level_l0_metrics_label(compaction_group_id, false);
365 metrics
366 .level_sst_num
367 .remove_label_values(&[&overlapping_level_label])
368 .ok();
369 metrics
370 .level_sst_num
371 .remove_label_values(&[&non_overlap_level_label])
372 .ok();
373
374 remove_compacting_task_stat(metrics, compaction_group_id, max_level);
375 remove_split_stat(metrics, compaction_group_id);
376 remove_compact_task_metrics(metrics, compaction_group_id, max_level);
377 remove_compaction_group_stat(metrics, compaction_group_id);
378}
379
380pub fn remove_compacting_task_stat(
381 metrics: &MetaMetrics,
382 compaction_group_id: CompactionGroupId,
383 max_level: usize,
384) {
385 for select_level in 0..=max_level {
386 for target_level in 0..=max_level {
387 let label_str = build_compact_task_stat_metrics_label(
388 compaction_group_id,
389 select_level,
390 target_level,
391 );
392 metrics
393 .level_compact_task_cnt
394 .remove_label_values(&[&label_str])
395 .ok();
396 }
397 }
398}
399
400pub fn remove_split_stat(metrics: &MetaMetrics, compaction_group_id: CompactionGroupId) {
401 let label_str = compaction_group_id.to_string();
402 metrics
403 .state_table_count
404 .remove_label_values(&[&label_str])
405 .ok();
406
407 metrics
408 .branched_sst_count
409 .remove_label_values(&[&label_str])
410 .ok();
411}
412
413pub fn remove_compaction_group_stat(metrics: &MetaMetrics, compaction_group_id: CompactionGroupId) {
414 let label_str = compaction_group_id.to_string();
415 metrics
416 .compaction_group_size
417 .remove_label_values(&[&label_str])
418 .ok();
419 metrics
420 .compaction_group_file_count
421 .remove_label_values(&[&label_str])
422 .ok();
423 metrics
424 .compaction_group_throughput
425 .remove_label_values(&[&label_str])
426 .ok();
427
428 metrics
429 .split_compaction_group_count
430 .remove_label_values(&[&label_str])
431 .ok();
432
433 metrics
434 .merge_compaction_group_count
435 .remove_label_values(&[&label_str])
436 .ok();
437}
438
439pub fn trigger_pin_unpin_version_state(
440 metrics: &MetaMetrics,
441 pinned_versions: &BTreeMap<HummockContextId, HummockPinnedVersion>,
442) {
443 if let Some(m) = pinned_versions.values().map(|v| v.min_pinned_id).min() {
444 metrics.min_pinned_version_id.set(m as i64);
445 } else {
446 metrics
447 .min_pinned_version_id
448 .set(HummockVersionId::MAX.to_u64() as _);
449 }
450}
451
452pub fn trigger_gc_stat(
453 metrics: &MetaMetrics,
454 checkpoint: &HummockVersionCheckpoint,
455 min_pinned_version_id: HummockVersionId,
456) {
457 let current_version_object_size_map = object_size_map(&checkpoint.version);
458 let current_version_object_size = current_version_object_size_map.values().sum::<u64>();
459 let current_version_object_count = current_version_object_size_map.len();
460 let mut old_version_object_size = 0;
461 let mut old_version_object_count = 0;
462 let mut stale_object_size = 0;
463 let mut stale_object_count = 0;
464 checkpoint.stale_objects.iter().for_each(|(id, objects)| {
465 if *id <= min_pinned_version_id {
466 stale_object_size += objects.total_file_size;
467 stale_object_count += objects.id.len() as u64;
468 } else {
469 old_version_object_size += objects.total_file_size;
470 old_version_object_count += objects.id.len() as u64;
471 }
472 });
473 metrics
474 .current_version_object_size
475 .set(current_version_object_size as _);
476 metrics
477 .current_version_object_count
478 .set(current_version_object_count as _);
479 metrics
480 .old_version_object_size
481 .set(old_version_object_size as _);
482 metrics
483 .old_version_object_count
484 .set(old_version_object_count as _);
485 metrics.stale_object_size.set(stale_object_size as _);
486 metrics.stale_object_count.set(stale_object_count as _);
487 for (table_id, logs) in &checkpoint.version.table_change_log {
489 let object_count = logs
490 .iter()
491 .map(|l| l.old_value.len() + l.new_value.len())
492 .sum::<usize>();
493 let object_size = logs
494 .iter()
495 .map(|l| {
496 l.old_value
497 .iter()
498 .chain(l.new_value.iter())
499 .map(|s| s.file_size as usize)
500 .sum::<usize>()
501 })
502 .sum::<usize>();
503 metrics
504 .table_change_log_object_count
505 .with_label_values(&[&format!("{table_id}")])
506 .set(object_count as _);
507 metrics
508 .table_change_log_object_size
509 .with_label_values(&[&format!("{table_id}")])
510 .set(object_size as _);
511 }
512}
513
514pub fn trigger_lsm_stat(
516 metrics: &MetaMetrics,
517 compaction_config: Arc<CompactionConfig>,
518 levels: &Levels,
519 compaction_group_id: CompactionGroupId,
520) {
521 let group_label = compaction_group_id.to_string();
522 let dynamic_level_core = DynamicLevelSelectorCore::new(
525 compaction_config.clone(),
526 Arc::new(CompactionDeveloperConfig::default()),
527 );
528 let ctx = dynamic_level_core.calculate_level_base_size(levels);
529 {
530 let compact_pending_bytes_needed =
531 dynamic_level_core.compact_pending_bytes_needed_with_ctx(levels, &ctx);
532
533 metrics
534 .compact_pending_bytes
535 .with_label_values(&[&group_label])
536 .set(compact_pending_bytes_needed as _);
537 }
538
539 {
540 let level_compression_ratio = levels
542 .levels
543 .iter()
544 .map(|level| {
545 let ratio = if level.uncompressed_file_size == 0 {
546 0.0
547 } else {
548 level.total_file_size as f64 / level.uncompressed_file_size as f64
549 };
550
551 (level.level_idx, ratio)
552 })
553 .collect_vec();
554
555 for (level_index, compression_ratio) in level_compression_ratio {
556 let compression_algorithm_label = get_compression_algorithm(
557 compaction_config.as_ref(),
558 ctx.base_level,
559 level_index as usize,
560 );
561
562 metrics
563 .compact_level_compression_ratio
564 .with_label_values(&[
565 &group_label,
566 &level_index.to_string(),
567 &compression_algorithm_label,
568 ])
569 .set(compression_ratio);
570 }
571 }
572}
573
574pub fn trigger_write_stop_stats(
575 metrics: &MetaMetrics,
576 write_limit: &HashMap<CompactionGroupId, WriteLimit>,
577) {
578 metrics.write_stop_compaction_groups.reset();
579 for cg in write_limit.keys() {
580 metrics
581 .write_stop_compaction_groups
582 .with_label_values(&[&cg.to_string()])
583 .set(1);
584 }
585}
586
587pub fn trigger_split_stat(metrics: &MetaMetrics, version: &HummockVersion) {
588 let branched_ssts = version.build_branched_sst_info();
589
590 for compaction_group_id in version.levels.keys() {
591 let group_label = compaction_group_id.to_string();
592 metrics
593 .state_table_count
594 .with_label_values(&[&group_label])
595 .set(
596 version
597 .state_table_info
598 .compaction_group_member_table_ids(*compaction_group_id)
599 .len() as _,
600 );
601
602 let branched_sst_count: usize = branched_ssts
603 .values()
604 .map(|branched_map| {
605 branched_map
606 .keys()
607 .filter(|group_id| *group_id == compaction_group_id)
608 .count()
609 })
610 .sum();
611
612 metrics
613 .branched_sst_count
614 .with_label_values(&[&group_label])
615 .set(branched_sst_count as _);
616 }
617}
618
619pub fn build_level_metrics_label(compaction_group_id: u64, level_idx: usize) -> String {
620 format!("cg{}_L{}", compaction_group_id, level_idx)
621}
622
623pub fn build_level_l0_metrics_label(compaction_group_id: u64, overlapping: bool) -> String {
624 if overlapping {
625 format!("cg{}_l0_sub_overlapping", compaction_group_id)
626 } else {
627 format!("cg{}_l0_sub_non_overlap", compaction_group_id)
628 }
629}
630
631pub fn build_compact_task_stat_metrics_label(
632 compaction_group_id: u64,
633 select_level: usize,
634 target_level: usize,
635) -> String {
636 format!(
637 "cg{} L{} -> L{}",
638 compaction_group_id, select_level, target_level
639 )
640}
641
642pub fn build_compact_task_l0_stat_metrics_label(
643 compaction_group_id: u64,
644 overlapping: bool,
645 partition: bool,
646) -> String {
647 if partition {
648 format!("cg{}_l0_sub_partition", compaction_group_id)
649 } else if overlapping {
650 format!("cg{}_l0_sub_overlapping", compaction_group_id)
651 } else {
652 format!("cg{}_l0_sub_non_overlap", compaction_group_id)
653 }
654}
655
656pub fn build_compact_task_level_type_metrics_label(
657 select_level: usize,
658 target_level: usize,
659) -> String {
660 format!("L{}->L{}", select_level, target_level)
661}
662
663pub fn remove_compact_task_metrics(
664 metrics: &MetaMetrics,
665 compaction_group_id: CompactionGroupId,
666 max_level: usize,
667) {
668 for select_level in 0..=max_level {
669 for target_level in 0..=max_level {
670 let level_type_label =
671 build_compact_task_level_type_metrics_label(select_level, target_level);
672 let should_continue = metrics
673 .l0_compact_level_count
674 .remove_label_values(&[&compaction_group_id.to_string(), &level_type_label])
675 .is_ok();
676
677 metrics
678 .compact_task_size
679 .remove_label_values(&[&compaction_group_id.to_string(), &level_type_label])
680 .ok();
681
682 metrics
683 .compact_task_size
684 .remove_label_values(&[
685 &compaction_group_id.to_string(),
686 &format!("{} uncompressed", level_type_label),
687 ])
688 .ok();
689 if !should_continue {
690 break;
691 }
692 }
693 }
694}
695
696pub fn trigger_compact_tasks_stat(
697 metrics: &MetaMetrics,
698 compact_tasks: &[CompactTask],
699 compact_status: &BTreeMap<CompactionGroupId, CompactStatus>,
700 current_version: &HummockVersion,
701) {
702 let mut task_status_label_map = HashMap::new();
703 let mut task_type_label_map = HashMap::new();
704 let mut group_label_map = HashMap::new();
705
706 for task in compact_tasks {
707 let task_status_label = task_status_label_map
708 .entry(task.task_status)
709 .or_insert_with(|| task.task_status.as_str_name().to_owned());
710
711 let task_type_label = task_type_label_map
712 .entry(task.task_type)
713 .or_insert_with(|| task.task_type.as_str_name().to_owned());
714
715 let group_label = group_label_map
716 .entry(task.compaction_group_id)
717 .or_insert_with(|| task.compaction_group_id.to_string());
718
719 metrics
720 .compact_frequency
721 .with_label_values(&["normal", group_label, task_type_label, task_status_label])
722 .inc();
723 }
724
725 group_label_map.keys().for_each(|group_id| {
726 trigger_sst_stat(
727 metrics,
728 compact_status.get(group_id),
729 current_version,
730 *group_id,
731 );
732 });
733}