1use std::collections::HashMap;
16use std::sync::atomic::AtomicU64;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use prometheus::core::{AtomicF64, GenericGaugeVec};
21use prometheus::{
22 Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry, exponential_buckets,
23 histogram_opts, register_gauge_vec_with_registry, register_histogram_vec_with_registry,
24 register_histogram_with_registry, register_int_counter_vec_with_registry,
25 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
26};
27use risingwave_common::metrics::{
28 LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
29};
30use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
31use risingwave_common::{
32 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
33 register_guarded_int_gauge_vec_with_registry,
34};
35use risingwave_connector::source::monitor::EnumeratorMetrics as SourceEnumeratorMetrics;
36use risingwave_meta_model::WorkerId;
37use risingwave_object_store::object::object_metrics::{
38 GLOBAL_OBJECT_STORE_METRICS, ObjectStoreMetrics,
39};
40use risingwave_pb::common::WorkerType;
41use thiserror_ext::AsReport;
42use tokio::sync::oneshot::Sender;
43use tokio::task::JoinHandle;
44
45use crate::controller::catalog::CatalogControllerRef;
46use crate::controller::cluster::ClusterControllerRef;
47use crate::controller::utils::PartialFragmentStateTables;
48use crate::hummock::HummockManagerRef;
49use crate::manager::MetadataManager;
50use crate::rpc::ElectionClientRef;
51
52#[derive(Clone)]
53pub struct MetaMetrics {
54 pub worker_num: IntGaugeVec,
57 pub meta_type: IntGaugeVec,
59
60 pub grpc_latency: HistogramVec,
63
64 pub barrier_latency: LabelGuardedHistogramVec<1>,
68 pub barrier_wait_commit_latency: Histogram,
70 pub barrier_send_latency: LabelGuardedHistogramVec<1>,
72 pub all_barrier_nums: LabelGuardedIntGaugeVec<1>,
75 pub in_flight_barrier_nums: LabelGuardedIntGaugeVec<1>,
77 pub last_committed_barrier_time: IntGaugeVec,
79
80 pub snapshot_backfill_barrier_latency: LabelGuardedHistogramVec<2>, pub snapshot_backfill_wait_commit_latency: LabelGuardedHistogramVec<1>, pub snapshot_backfill_lag: LabelGuardedIntGaugeVec<1>, pub snapshot_backfill_inflight_barrier_num: LabelGuardedIntGaugeVec<1>, pub recovery_failure_cnt: IntCounterVec,
92 pub recovery_latency: HistogramVec,
93
94 pub max_committed_epoch: IntGauge,
97 pub min_committed_epoch: IntGauge,
99 pub level_sst_num: IntGaugeVec,
101 pub level_compact_cnt: IntGaugeVec,
103 pub compact_frequency: IntCounterVec,
105 pub level_file_size: IntGaugeVec,
107 pub version_size: IntGauge,
109 pub current_version_id: IntGauge,
111 pub checkpoint_version_id: IntGauge,
113 pub min_pinned_version_id: IntGauge,
115 pub min_safepoint_version_id: IntGauge,
117 pub write_stop_compaction_groups: IntGaugeVec,
119 pub full_gc_trigger_count: IntGauge,
121 pub full_gc_candidate_object_count: Histogram,
123 pub full_gc_selected_object_count: Histogram,
125 pub version_stats: IntGaugeVec,
127 pub materialized_view_stats: IntGaugeVec,
129 pub stale_object_count: IntGauge,
131 pub stale_object_size: IntGauge,
133 pub old_version_object_count: IntGauge,
135 pub old_version_object_size: IntGauge,
137 pub time_travel_object_count: IntGauge,
139 pub current_version_object_count: IntGauge,
141 pub current_version_object_size: IntGauge,
143 pub total_object_count: IntGauge,
145 pub total_object_size: IntGauge,
147 pub table_change_log_object_count: IntGaugeVec,
149 pub table_change_log_object_size: IntGaugeVec,
151 pub delta_log_count: IntGauge,
153 pub version_checkpoint_latency: Histogram,
155 pub hummock_manager_lock_time: HistogramVec,
157 pub hummock_manager_real_process_time: HistogramVec,
159 pub compact_skip_frequency: IntCounterVec,
161 pub compact_pending_bytes: IntGaugeVec,
163 pub compact_level_compression_ratio: GenericGaugeVec<AtomicF64>,
165 pub level_compact_task_cnt: IntGaugeVec,
167 pub time_after_last_observation: Arc<AtomicU64>,
168 pub l0_compact_level_count: HistogramVec,
169 pub compact_task_size: HistogramVec,
170 pub compact_task_file_count: HistogramVec,
171 pub compact_task_batch_count: HistogramVec,
172 pub split_compaction_group_count: IntCounterVec,
173 pub state_table_count: IntGaugeVec,
174 pub branched_sst_count: IntGaugeVec,
175 pub compact_task_trivial_move_sst_count: HistogramVec,
176
177 pub compaction_event_consumed_latency: Histogram,
178 pub compaction_event_loop_iteration_latency: Histogram,
179
180 pub object_store_metric: Arc<ObjectStoreMetrics>,
183
184 pub source_is_up: LabelGuardedIntGaugeVec<2>,
187 pub source_enumerator_metrics: Arc<SourceEnumeratorMetrics>,
188
189 pub actor_info: IntGaugeVec,
192 pub table_info: IntGaugeVec,
194 pub sink_info: IntGaugeVec,
196
197 pub table_write_throughput: IntCounterVec,
199
200 pub merge_compaction_group_count: IntCounterVec,
202
203 pub auto_schema_change_failure_cnt: LabelGuardedIntCounterVec<2>,
205 pub auto_schema_change_success_cnt: LabelGuardedIntCounterVec<2>,
206 pub auto_schema_change_latency: LabelGuardedHistogramVec<2>,
207
208 pub time_travel_version_replay_latency: Histogram,
209
210 pub compaction_group_count: IntGauge,
211 pub compaction_group_size: IntGaugeVec,
212 pub compaction_group_file_count: IntGaugeVec,
213 pub compaction_group_throughput: IntGaugeVec,
214}
215
216pub static GLOBAL_META_METRICS: LazyLock<MetaMetrics> =
217 LazyLock::new(|| MetaMetrics::new(&GLOBAL_METRICS_REGISTRY));
218
219impl MetaMetrics {
220 fn new(registry: &Registry) -> Self {
221 let opts = histogram_opts!(
222 "meta_grpc_duration_seconds",
223 "gRPC latency of meta services",
224 exponential_buckets(0.0001, 2.0, 20).unwrap() );
226 let grpc_latency =
227 register_histogram_vec_with_registry!(opts, &["path"], registry).unwrap();
228
229 let opts = histogram_opts!(
230 "meta_barrier_duration_seconds",
231 "barrier latency",
232 exponential_buckets(0.1, 1.5, 20).unwrap() );
234 let barrier_latency =
235 register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
236 .unwrap();
237
238 let opts = histogram_opts!(
239 "meta_barrier_wait_commit_duration_seconds",
240 "barrier_wait_commit_latency",
241 exponential_buckets(0.1, 1.5, 20).unwrap() );
243 let barrier_wait_commit_latency =
244 register_histogram_with_registry!(opts, registry).unwrap();
245
246 let opts = histogram_opts!(
247 "meta_barrier_send_duration_seconds",
248 "barrier send latency",
249 exponential_buckets(0.1, 1.5, 19).unwrap() );
251 let barrier_send_latency =
252 register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
253 .unwrap();
254
255 let all_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
256 "all_barrier_nums",
257 "num of of all_barrier",
258 &["database_id"],
259 registry
260 )
261 .unwrap();
262 let in_flight_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
263 "in_flight_barrier_nums",
264 "num of of in_flight_barrier",
265 &["database_id"],
266 registry
267 )
268 .unwrap();
269 let last_committed_barrier_time = register_int_gauge_vec_with_registry!(
270 "last_committed_barrier_time",
271 "The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.",
272 &["database_id"],
273 registry
274 )
275 .unwrap();
276
277 let opts = histogram_opts!(
279 "meta_snapshot_backfill_barrier_duration_seconds",
280 "snapshot backfill barrier latency",
281 exponential_buckets(0.1, 1.5, 20).unwrap() );
283 let snapshot_backfill_barrier_latency = register_guarded_histogram_vec_with_registry!(
284 opts,
285 &["table_id", "barrier_type"],
286 registry
287 )
288 .unwrap();
289 let opts = histogram_opts!(
290 "meta_snapshot_backfill_barrier_wait_commit_duration_seconds",
291 "snapshot backfill barrier_wait_commit_latency",
292 exponential_buckets(0.1, 1.5, 20).unwrap() );
294 let snapshot_backfill_wait_commit_latency =
295 register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
296
297 let snapshot_backfill_lag = register_guarded_int_gauge_vec_with_registry!(
298 "meta_snapshot_backfill_upstream_lag",
299 "snapshot backfill upstream_lag",
300 &["table_id"],
301 registry
302 )
303 .unwrap();
304 let snapshot_backfill_inflight_barrier_num = register_guarded_int_gauge_vec_with_registry!(
305 "meta_snapshot_backfill_inflight_barrier_num",
306 "snapshot backfill inflight_barrier_num",
307 &["table_id"],
308 registry
309 )
310 .unwrap();
311
312 let max_committed_epoch = register_int_gauge_with_registry!(
313 "storage_max_committed_epoch",
314 "max committed epoch",
315 registry
316 )
317 .unwrap();
318
319 let min_committed_epoch = register_int_gauge_with_registry!(
320 "storage_min_committed_epoch",
321 "min committed epoch",
322 registry
323 )
324 .unwrap();
325
326 let level_sst_num = register_int_gauge_vec_with_registry!(
327 "storage_level_sst_num",
328 "num of SSTs in each level",
329 &["level_index"],
330 registry
331 )
332 .unwrap();
333
334 let level_compact_cnt = register_int_gauge_vec_with_registry!(
335 "storage_level_compact_cnt",
336 "num of SSTs to be merged to next level in each level",
337 &["level_index"],
338 registry
339 )
340 .unwrap();
341
342 let compact_frequency = register_int_counter_vec_with_registry!(
343 "storage_level_compact_frequency",
344 "The number of compactions from one level to another level that have completed or failed.",
345 &["compactor", "group", "task_type", "result"],
346 registry
347 )
348 .unwrap();
349 let compact_skip_frequency = register_int_counter_vec_with_registry!(
350 "storage_skip_compact_frequency",
351 "The number of compactions from one level to another level that have been skipped.",
352 &["level", "type"],
353 registry
354 )
355 .unwrap();
356
357 let version_size =
358 register_int_gauge_with_registry!("storage_version_size", "version size", registry)
359 .unwrap();
360
361 let current_version_id = register_int_gauge_with_registry!(
362 "storage_current_version_id",
363 "current version id",
364 registry
365 )
366 .unwrap();
367
368 let checkpoint_version_id = register_int_gauge_with_registry!(
369 "storage_checkpoint_version_id",
370 "checkpoint version id",
371 registry
372 )
373 .unwrap();
374
375 let min_pinned_version_id = register_int_gauge_with_registry!(
376 "storage_min_pinned_version_id",
377 "min pinned version id",
378 registry
379 )
380 .unwrap();
381
382 let write_stop_compaction_groups = register_int_gauge_vec_with_registry!(
383 "storage_write_stop_compaction_groups",
384 "compaction groups of write stop state",
385 &["compaction_group_id"],
386 registry
387 )
388 .unwrap();
389
390 let full_gc_trigger_count = register_int_gauge_with_registry!(
391 "storage_full_gc_trigger_count",
392 "the number of attempts to trigger full GC",
393 registry
394 )
395 .unwrap();
396
397 let opts = histogram_opts!(
398 "storage_full_gc_candidate_object_count",
399 "the number of candidate object to delete after scanning object store",
400 exponential_buckets(1.0, 10.0, 6).unwrap()
401 );
402 let full_gc_candidate_object_count =
403 register_histogram_with_registry!(opts, registry).unwrap();
404
405 let opts = histogram_opts!(
406 "storage_full_gc_selected_object_count",
407 "the number of object to delete after filtering by meta node",
408 exponential_buckets(1.0, 10.0, 6).unwrap()
409 );
410 let full_gc_selected_object_count =
411 register_histogram_with_registry!(opts, registry).unwrap();
412
413 let min_safepoint_version_id = register_int_gauge_with_registry!(
414 "storage_min_safepoint_version_id",
415 "min safepoint version id",
416 registry
417 )
418 .unwrap();
419
420 let level_file_size = register_int_gauge_vec_with_registry!(
421 "storage_level_total_file_size",
422 "KBs total file bytes in each level",
423 &["level_index"],
424 registry
425 )
426 .unwrap();
427
428 let version_stats = register_int_gauge_vec_with_registry!(
429 "storage_version_stats",
430 "per table stats in current hummock version",
431 &["table_id", "metric"],
432 registry
433 )
434 .unwrap();
435
436 let materialized_view_stats = register_int_gauge_vec_with_registry!(
437 "storage_materialized_view_stats",
438 "per materialized view stats in current hummock version",
439 &["table_id", "metric"],
440 registry
441 )
442 .unwrap();
443
444 let stale_object_count = register_int_gauge_with_registry!(
445 "storage_stale_object_count",
446 "total number of objects that is no longer referenced by versions.",
447 registry
448 )
449 .unwrap();
450
451 let stale_object_size = register_int_gauge_with_registry!(
452 "storage_stale_object_size",
453 "total size of objects that is no longer referenced by versions.",
454 registry
455 )
456 .unwrap();
457
458 let old_version_object_count = register_int_gauge_with_registry!(
459 "storage_old_version_object_count",
460 "total number of objects that is still referenced by non-current versions",
461 registry
462 )
463 .unwrap();
464
465 let old_version_object_size = register_int_gauge_with_registry!(
466 "storage_old_version_object_size",
467 "total size of objects that is still referenced by non-current versions",
468 registry
469 )
470 .unwrap();
471
472 let current_version_object_count = register_int_gauge_with_registry!(
473 "storage_current_version_object_count",
474 "total number of objects that is referenced by current version",
475 registry
476 )
477 .unwrap();
478
479 let current_version_object_size = register_int_gauge_with_registry!(
480 "storage_current_version_object_size",
481 "total size of objects that is referenced by current version",
482 registry
483 )
484 .unwrap();
485
486 let total_object_count = register_int_gauge_with_registry!(
487 "storage_total_object_count",
488 "Total number of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
489 registry
490 ).unwrap();
491
492 let total_object_size = register_int_gauge_with_registry!(
493 "storage_total_object_size",
494 "Total size of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
495 registry
496 ).unwrap();
497
498 let table_change_log_object_count = register_int_gauge_vec_with_registry!(
499 "storage_table_change_log_object_count",
500 "per table change log object count",
501 &["table_id"],
502 registry
503 )
504 .unwrap();
505
506 let table_change_log_object_size = register_int_gauge_vec_with_registry!(
507 "storage_table_change_log_object_size",
508 "per table change log object size",
509 &["table_id"],
510 registry
511 )
512 .unwrap();
513
514 let time_travel_object_count = register_int_gauge_with_registry!(
515 "storage_time_travel_object_count",
516 "total number of objects that is referenced by time travel.",
517 registry
518 )
519 .unwrap();
520
521 let delta_log_count = register_int_gauge_with_registry!(
522 "storage_delta_log_count",
523 "total number of hummock version delta log",
524 registry
525 )
526 .unwrap();
527
528 let opts = histogram_opts!(
529 "storage_version_checkpoint_latency",
530 "hummock version checkpoint latency",
531 exponential_buckets(0.1, 1.5, 20).unwrap()
532 );
533 let version_checkpoint_latency = register_histogram_with_registry!(opts, registry).unwrap();
534
535 let hummock_manager_lock_time = register_histogram_vec_with_registry!(
536 "hummock_manager_lock_time",
537 "latency for hummock manager to acquire the rwlock",
538 &["lock_name", "lock_type"],
539 registry
540 )
541 .unwrap();
542
543 let hummock_manager_real_process_time = register_histogram_vec_with_registry!(
544 "meta_hummock_manager_real_process_time",
545 "latency for hummock manager to really process the request",
546 &["method"],
547 registry
548 )
549 .unwrap();
550
551 let worker_num = register_int_gauge_vec_with_registry!(
552 "worker_num",
553 "number of nodes in the cluster",
554 &["worker_type"],
555 registry,
556 )
557 .unwrap();
558
559 let meta_type = register_int_gauge_vec_with_registry!(
560 "meta_num",
561 "role of meta nodes in the cluster",
562 &["worker_addr", "role"],
563 registry,
564 )
565 .unwrap();
566
567 let compact_pending_bytes = register_int_gauge_vec_with_registry!(
568 "storage_compact_pending_bytes",
569 "bytes of lsm tree needed to reach balance",
570 &["group"],
571 registry
572 )
573 .unwrap();
574
575 let compact_level_compression_ratio = register_gauge_vec_with_registry!(
576 "storage_compact_level_compression_ratio",
577 "compression ratio of each level of the lsm tree",
578 &["group", "level", "algorithm"],
579 registry
580 )
581 .unwrap();
582
583 let level_compact_task_cnt = register_int_gauge_vec_with_registry!(
584 "storage_level_compact_task_cnt",
585 "num of compact_task organized by group and level",
586 &["task"],
587 registry
588 )
589 .unwrap();
590 let object_store_metric = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
591
592 let recovery_failure_cnt = register_int_counter_vec_with_registry!(
593 "recovery_failure_cnt",
594 "Number of failed recovery attempts",
595 &["recovery_type"],
596 registry
597 )
598 .unwrap();
599 let opts = histogram_opts!(
600 "recovery_latency",
601 "Latency of the recovery process",
602 exponential_buckets(0.1, 1.5, 20).unwrap() );
604 let recovery_latency =
605 register_histogram_vec_with_registry!(opts, &["recovery_type"], registry).unwrap();
606
607 let auto_schema_change_failure_cnt = register_guarded_int_counter_vec_with_registry!(
608 "auto_schema_change_failure_cnt",
609 "Number of failed auto schema change",
610 &["table_id", "table_name"],
611 registry
612 )
613 .unwrap();
614
615 let auto_schema_change_success_cnt = register_guarded_int_counter_vec_with_registry!(
616 "auto_schema_change_success_cnt",
617 "Number of success auto schema change",
618 &["table_id", "table_name"],
619 registry
620 )
621 .unwrap();
622
623 let opts = histogram_opts!(
624 "auto_schema_change_latency",
625 "Latency of the auto schema change process",
626 exponential_buckets(0.1, 1.5, 20).unwrap() );
628 let auto_schema_change_latency = register_guarded_histogram_vec_with_registry!(
629 opts,
630 &["table_id", "table_name"],
631 registry
632 )
633 .unwrap();
634
635 let source_is_up = register_guarded_int_gauge_vec_with_registry!(
636 "source_status_is_up",
637 "source is up or not",
638 &["source_id", "source_name"],
639 registry
640 )
641 .unwrap();
642 let source_enumerator_metrics = Arc::new(SourceEnumeratorMetrics::default());
643
644 let actor_info = register_int_gauge_vec_with_registry!(
645 "actor_info",
646 "Mapping from actor id to (fragment id, compute node)",
647 &["actor_id", "fragment_id", "compute_node"],
648 registry
649 )
650 .unwrap();
651
652 let table_info = register_int_gauge_vec_with_registry!(
653 "table_info",
654 "Mapping from table id to (actor id, table name)",
655 &[
656 "materialized_view_id",
657 "table_id",
658 "fragment_id",
659 "table_name",
660 "table_type",
661 "compaction_group_id"
662 ],
663 registry
664 )
665 .unwrap();
666
667 let sink_info = register_int_gauge_vec_with_registry!(
668 "sink_info",
669 "Mapping from actor id to (actor id, sink name)",
670 &["actor_id", "sink_id", "sink_name",],
671 registry
672 )
673 .unwrap();
674
675 let l0_compact_level_count = register_histogram_vec_with_registry!(
676 "storage_l0_compact_level_count",
677 "level_count of l0 compact task",
678 &["group", "type"],
679 registry
680 )
681 .unwrap();
682
683 let opts = histogram_opts!(
684 "storage_compact_task_size",
685 "Total size of compact that have been issued to state store",
686 exponential_buckets(1048576.0, 2.0, 16).unwrap()
687 );
688
689 let compact_task_size =
690 register_histogram_vec_with_registry!(opts, &["group", "type"], registry).unwrap();
691
692 let compact_task_file_count = register_histogram_vec_with_registry!(
693 "storage_compact_task_file_count",
694 "file count of compact task",
695 &["group", "type"],
696 registry
697 )
698 .unwrap();
699 let opts = histogram_opts!(
700 "storage_compact_task_batch_count",
701 "count of compact task batch",
702 exponential_buckets(1.0, 2.0, 8).unwrap()
703 );
704 let compact_task_batch_count =
705 register_histogram_vec_with_registry!(opts, &["type"], registry).unwrap();
706
707 let table_write_throughput = register_int_counter_vec_with_registry!(
708 "storage_commit_write_throughput",
709 "The number of compactions from one level to another level that have been skipped.",
710 &["table_id"],
711 registry
712 )
713 .unwrap();
714
715 let split_compaction_group_count = register_int_counter_vec_with_registry!(
716 "storage_split_compaction_group_count",
717 "Count of trigger split compaction group",
718 &["group"],
719 registry
720 )
721 .unwrap();
722
723 let state_table_count = register_int_gauge_vec_with_registry!(
724 "storage_state_table_count",
725 "Count of stable table per compaction group",
726 &["group"],
727 registry
728 )
729 .unwrap();
730
731 let branched_sst_count = register_int_gauge_vec_with_registry!(
732 "storage_branched_sst_count",
733 "Count of branched sst per compaction group",
734 &["group"],
735 registry
736 )
737 .unwrap();
738
739 let opts = histogram_opts!(
740 "storage_compaction_event_consumed_latency",
741 "The latency(ms) of each event being consumed",
742 exponential_buckets(1.0, 1.5, 30).unwrap() );
744 let compaction_event_consumed_latency =
745 register_histogram_with_registry!(opts, registry).unwrap();
746
747 let opts = histogram_opts!(
748 "storage_compaction_event_loop_iteration_latency",
749 "The latency(ms) of each iteration of the compaction event loop",
750 exponential_buckets(1.0, 1.5, 30).unwrap() );
752 let compaction_event_loop_iteration_latency =
753 register_histogram_with_registry!(opts, registry).unwrap();
754
755 let merge_compaction_group_count = register_int_counter_vec_with_registry!(
756 "storage_merge_compaction_group_count",
757 "Count of trigger merge compaction group",
758 &["group"],
759 registry
760 )
761 .unwrap();
762
763 let opts = histogram_opts!(
764 "storage_time_travel_version_replay_latency",
765 "The latency(ms) of replaying a hummock version for time travel",
766 exponential_buckets(0.01, 10.0, 6).unwrap()
767 );
768 let time_travel_version_replay_latency =
769 register_histogram_with_registry!(opts, registry).unwrap();
770
771 let compaction_group_count = register_int_gauge_with_registry!(
772 "storage_compaction_group_count",
773 "The number of compaction groups",
774 registry,
775 )
776 .unwrap();
777
778 let compaction_group_size = register_int_gauge_vec_with_registry!(
779 "storage_compaction_group_size",
780 "The size of compaction group",
781 &["group"],
782 registry
783 )
784 .unwrap();
785
786 let compaction_group_file_count = register_int_gauge_vec_with_registry!(
787 "storage_compaction_group_file_count",
788 "The file count of compaction group",
789 &["group"],
790 registry
791 )
792 .unwrap();
793
794 let compaction_group_throughput = register_int_gauge_vec_with_registry!(
795 "storage_compaction_group_throughput",
796 "The throughput of compaction group",
797 &["group"],
798 registry
799 )
800 .unwrap();
801
802 let opts = histogram_opts!(
803 "storage_compact_task_trivial_move_sst_count",
804 "sst count of compact trivial-move task",
805 exponential_buckets(1.0, 2.0, 8).unwrap()
806 );
807 let compact_task_trivial_move_sst_count =
808 register_histogram_vec_with_registry!(opts, &["group"], registry).unwrap();
809
810 Self {
811 grpc_latency,
812 barrier_latency,
813 barrier_wait_commit_latency,
814 barrier_send_latency,
815 all_barrier_nums,
816 in_flight_barrier_nums,
817 last_committed_barrier_time,
818 snapshot_backfill_barrier_latency,
819 snapshot_backfill_wait_commit_latency,
820 snapshot_backfill_lag,
821 snapshot_backfill_inflight_barrier_num,
822 recovery_failure_cnt,
823 recovery_latency,
824
825 max_committed_epoch,
826 min_committed_epoch,
827 level_sst_num,
828 level_compact_cnt,
829 compact_frequency,
830 compact_skip_frequency,
831 level_file_size,
832 version_size,
833 version_stats,
834 materialized_view_stats,
835 stale_object_count,
836 stale_object_size,
837 old_version_object_count,
838 old_version_object_size,
839 time_travel_object_count,
840 current_version_object_count,
841 current_version_object_size,
842 total_object_count,
843 total_object_size,
844 table_change_log_object_count,
845 table_change_log_object_size,
846 delta_log_count,
847 version_checkpoint_latency,
848 current_version_id,
849 checkpoint_version_id,
850 min_pinned_version_id,
851 min_safepoint_version_id,
852 write_stop_compaction_groups,
853 full_gc_trigger_count,
854 full_gc_candidate_object_count,
855 full_gc_selected_object_count,
856 hummock_manager_lock_time,
857 hummock_manager_real_process_time,
858 time_after_last_observation: Arc::new(AtomicU64::new(0)),
859 worker_num,
860 meta_type,
861 compact_pending_bytes,
862 compact_level_compression_ratio,
863 level_compact_task_cnt,
864 object_store_metric,
865 source_is_up,
866 source_enumerator_metrics,
867 actor_info,
868 table_info,
869 sink_info,
870 l0_compact_level_count,
871 compact_task_size,
872 compact_task_file_count,
873 compact_task_batch_count,
874 compact_task_trivial_move_sst_count,
875 table_write_throughput,
876 split_compaction_group_count,
877 state_table_count,
878 branched_sst_count,
879 compaction_event_consumed_latency,
880 compaction_event_loop_iteration_latency,
881 auto_schema_change_failure_cnt,
882 auto_schema_change_success_cnt,
883 auto_schema_change_latency,
884 merge_compaction_group_count,
885 time_travel_version_replay_latency,
886 compaction_group_count,
887 compaction_group_size,
888 compaction_group_file_count,
889 compaction_group_throughput,
890 }
891 }
892
893 #[cfg(test)]
894 pub fn for_test(registry: &Registry) -> Self {
895 Self::new(registry)
896 }
897}
898impl Default for MetaMetrics {
899 fn default() -> Self {
900 GLOBAL_META_METRICS.clone()
901 }
902}
903
904pub fn start_worker_info_monitor(
905 metadata_manager: MetadataManager,
906 election_client: ElectionClientRef,
907 interval: Duration,
908 meta_metrics: Arc<MetaMetrics>,
909) -> (JoinHandle<()>, Sender<()>) {
910 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
911 let join_handle = tokio::spawn(async move {
912 let mut monitor_interval = tokio::time::interval(interval);
913 monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
914 loop {
915 tokio::select! {
916 _ = monitor_interval.tick() => {},
918 _ = &mut shutdown_rx => {
920 tracing::info!("Worker number monitor is stopped");
921 return;
922 }
923 }
924
925 let node_map = match metadata_manager.count_worker_node().await {
926 Ok(node_map) => node_map,
927 Err(err) => {
928 tracing::warn!(error = %err.as_report(), "fail to count worker node");
929 continue;
930 }
931 };
932
933 meta_metrics.worker_num.reset();
935 meta_metrics.meta_type.reset();
936
937 for (worker_type, worker_num) in node_map {
938 meta_metrics
939 .worker_num
940 .with_label_values(&[(worker_type.as_str_name())])
941 .set(worker_num as i64);
942 }
943 if let Ok(meta_members) = election_client.get_members().await {
944 meta_metrics
945 .worker_num
946 .with_label_values(&[WorkerType::Meta.as_str_name()])
947 .set(meta_members.len() as i64);
948 meta_members.into_iter().for_each(|m| {
949 let role = if m.is_leader { "leader" } else { "follower" };
950 meta_metrics
951 .meta_type
952 .with_label_values(&[&m.id, role])
953 .set(1);
954 });
955 }
956 }
957 });
958
959 (join_handle, shutdown_tx)
960}
961
962pub async fn refresh_fragment_info_metrics(
963 catalog_controller: &CatalogControllerRef,
964 cluster_controller: &ClusterControllerRef,
965 hummock_manager: &HummockManagerRef,
966 meta_metrics: Arc<MetaMetrics>,
967) {
968 let worker_nodes = match cluster_controller
969 .list_workers(Some(WorkerType::ComputeNode.into()), None)
970 .await
971 {
972 Ok(worker_nodes) => worker_nodes,
973 Err(err) => {
974 tracing::warn!(error=%err.as_report(), "fail to list worker node");
975 return;
976 }
977 };
978 let actor_locations = match catalog_controller.list_actor_locations().await {
979 Ok(actor_locations) => actor_locations,
980 Err(err) => {
981 tracing::warn!(error=%err.as_report(), "fail to get actor locations");
982 return;
983 }
984 };
985 let sink_actor_mapping = match catalog_controller.list_sink_actor_mapping().await {
986 Ok(sink_actor_mapping) => sink_actor_mapping,
987 Err(err) => {
988 tracing::warn!(error=%err.as_report(), "fail to get sink actor mapping");
989 return;
990 }
991 };
992 let fragment_state_tables = match catalog_controller.list_fragment_state_tables().await {
993 Ok(fragment_state_tables) => fragment_state_tables,
994 Err(err) => {
995 tracing::warn!(error=%err.as_report(), "fail to get fragment state tables");
996 return;
997 }
998 };
999 let table_name_and_type_mapping = match catalog_controller.get_table_name_type_mapping().await {
1000 Ok(mapping) => mapping,
1001 Err(err) => {
1002 tracing::warn!(error=%err.as_report(), "fail to get table name mapping");
1003 return;
1004 }
1005 };
1006
1007 let worker_addr_mapping: HashMap<WorkerId, String> = worker_nodes
1008 .into_iter()
1009 .map(|worker_node| {
1010 let addr = match worker_node.host {
1011 Some(host) => format!("{}:{}", host.host, host.port),
1012 None => "".to_owned(),
1013 };
1014 (worker_node.id as WorkerId, addr)
1015 })
1016 .collect();
1017 let table_compaction_group_id_mapping = hummock_manager
1018 .get_table_compaction_group_id_mapping()
1019 .await;
1020
1021 meta_metrics.actor_info.reset();
1024 meta_metrics.table_info.reset();
1025 meta_metrics.sink_info.reset();
1026 for actor_location in actor_locations {
1027 let actor_id_str = actor_location.actor_id.to_string();
1028 let fragment_id_str = actor_location.fragment_id.to_string();
1029 if let Some(address) = worker_addr_mapping.get(&actor_location.worker_id) {
1032 meta_metrics
1033 .actor_info
1034 .with_label_values(&[&actor_id_str, &fragment_id_str, address])
1035 .set(1);
1036 }
1037 }
1038 for (sink_id, (sink_name, actor_ids)) in sink_actor_mapping {
1039 let sink_id_str = sink_id.to_string();
1040 for actor_id in actor_ids {
1041 let actor_id_str = actor_id.to_string();
1042 meta_metrics
1043 .sink_info
1044 .with_label_values(&[&actor_id_str, &sink_id_str, &sink_name])
1045 .set(1);
1046 }
1047 }
1048 for PartialFragmentStateTables {
1049 fragment_id,
1050 job_id,
1051 state_table_ids,
1052 } in fragment_state_tables
1053 {
1054 let fragment_id_str = fragment_id.to_string();
1055 let job_id_str = job_id.to_string();
1056 for table_id in state_table_ids.into_inner() {
1057 let table_id_str = table_id.to_string();
1058 let (table_name, table_type) = table_name_and_type_mapping
1059 .get(&table_id)
1060 .cloned()
1061 .unwrap_or_else(|| ("unknown".to_owned(), "unknown".to_owned()));
1062 let compaction_group_id = table_compaction_group_id_mapping
1063 .get(&(table_id as u32))
1064 .map(|cg_id| cg_id.to_string())
1065 .unwrap_or_else(|| "unknown".to_owned());
1066 meta_metrics
1067 .table_info
1068 .with_label_values(&[
1069 &job_id_str,
1070 &table_id_str,
1071 &fragment_id_str,
1072 &table_name,
1073 &table_type,
1074 &compaction_group_id,
1075 ])
1076 .set(1);
1077 }
1078 }
1079}
1080
1081pub fn start_fragment_info_monitor(
1082 metadata_manager: MetadataManager,
1083 hummock_manager: HummockManagerRef,
1084 meta_metrics: Arc<MetaMetrics>,
1085) -> (JoinHandle<()>, Sender<()>) {
1086 const COLLECT_INTERVAL_SECONDS: u64 = 60;
1087
1088 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1089 let join_handle = tokio::spawn(async move {
1090 let mut monitor_interval =
1091 tokio::time::interval(Duration::from_secs(COLLECT_INTERVAL_SECONDS));
1092 monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1093 loop {
1094 tokio::select! {
1095 _ = monitor_interval.tick() => {},
1097 _ = &mut shutdown_rx => {
1099 tracing::info!("Fragment info monitor is stopped");
1100 return;
1101 }
1102 }
1103
1104 refresh_fragment_info_metrics(
1105 &metadata_manager.catalog_controller,
1106 &metadata_manager.cluster_controller,
1107 &hummock_manager,
1108 meta_metrics.clone(),
1109 )
1110 .await;
1111 }
1112 });
1113
1114 (join_handle, shutdown_tx)
1115}