1use std::collections::HashMap;
16use std::sync::atomic::AtomicU64;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use prometheus::core::{AtomicF64, GenericGaugeVec};
21use prometheus::{
22 GaugeVec, Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
23 exponential_buckets, histogram_opts, register_gauge_vec_with_registry,
24 register_histogram_vec_with_registry, register_histogram_with_registry,
25 register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
26 register_int_gauge_with_registry,
27};
28use risingwave_common::metrics::{
29 LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
30 LabelGuardedUintGaugeVec,
31};
32use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::{
35 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
36 register_guarded_int_gauge_vec_with_registry, register_guarded_uint_gauge_vec_with_registry,
37};
38use risingwave_connector::source::monitor::EnumeratorMetrics as SourceEnumeratorMetrics;
39use risingwave_meta_model::WorkerId;
40use risingwave_object_store::object::object_metrics::{
41 GLOBAL_OBJECT_STORE_METRICS, ObjectStoreMetrics,
42};
43use risingwave_pb::common::WorkerType;
44use thiserror_ext::AsReport;
45use tokio::sync::oneshot::Sender;
46use tokio::task::JoinHandle;
47
48use crate::controller::catalog::CatalogControllerRef;
49use crate::controller::cluster::ClusterControllerRef;
50use crate::controller::system_param::SystemParamsControllerRef;
51use crate::controller::utils::PartialFragmentStateTables;
52use crate::hummock::HummockManagerRef;
53use crate::manager::MetadataManager;
54use crate::rpc::ElectionClientRef;
55
56#[derive(Clone)]
57pub struct MetaMetrics {
58 pub worker_num: IntGaugeVec,
61 pub meta_type: IntGaugeVec,
63
64 pub grpc_latency: HistogramVec,
67
68 pub barrier_latency: LabelGuardedHistogramVec,
72 pub barrier_wait_commit_latency: Histogram,
74 pub barrier_send_latency: LabelGuardedHistogramVec,
76 pub all_barrier_nums: LabelGuardedIntGaugeVec,
79 pub in_flight_barrier_nums: LabelGuardedIntGaugeVec,
81 pub last_committed_barrier_time: IntGaugeVec,
83 pub barrier_interval_by_database: GaugeVec,
85
86 pub snapshot_backfill_barrier_latency: LabelGuardedHistogramVec, pub snapshot_backfill_wait_commit_latency: LabelGuardedHistogramVec, pub snapshot_backfill_lag: LabelGuardedIntGaugeVec, pub snapshot_backfill_inflight_barrier_num: LabelGuardedIntGaugeVec, pub recovery_failure_cnt: IntCounterVec,
98 pub recovery_latency: HistogramVec,
99
100 pub max_committed_epoch: IntGauge,
103 pub min_committed_epoch: IntGauge,
105 pub level_sst_num: IntGaugeVec,
107 pub level_compact_cnt: IntGaugeVec,
109 pub compact_frequency: IntCounterVec,
111 pub level_file_size: IntGaugeVec,
113 pub version_size: IntGauge,
115 pub current_version_id: IntGauge,
117 pub checkpoint_version_id: IntGauge,
119 pub min_pinned_version_id: IntGauge,
121 pub min_safepoint_version_id: IntGauge,
123 pub write_stop_compaction_groups: IntGaugeVec,
125 pub full_gc_trigger_count: IntGauge,
127 pub full_gc_candidate_object_count: Histogram,
129 pub full_gc_selected_object_count: Histogram,
131 pub version_stats: IntGaugeVec,
133 pub materialized_view_stats: IntGaugeVec,
135 pub stale_object_count: IntGauge,
137 pub stale_object_size: IntGauge,
139 pub old_version_object_count: IntGauge,
141 pub old_version_object_size: IntGauge,
143 pub time_travel_object_count: IntGauge,
145 pub current_version_object_count: IntGauge,
147 pub current_version_object_size: IntGauge,
149 pub total_object_count: IntGauge,
151 pub total_object_size: IntGauge,
153 pub table_change_log_object_count: IntGaugeVec,
155 pub table_change_log_object_size: IntGaugeVec,
157 pub delta_log_count: IntGauge,
159 pub version_checkpoint_latency: Histogram,
161 pub hummock_manager_lock_time: HistogramVec,
163 pub hummock_manager_real_process_time: HistogramVec,
165 pub compact_skip_frequency: IntCounterVec,
167 pub compact_pending_bytes: IntGaugeVec,
169 pub compact_level_compression_ratio: GenericGaugeVec<AtomicF64>,
171 pub level_compact_task_cnt: IntGaugeVec,
173 pub time_after_last_observation: Arc<AtomicU64>,
174 pub l0_compact_level_count: HistogramVec,
175 pub compact_task_size: HistogramVec,
176 pub compact_task_file_count: HistogramVec,
177 pub compact_task_batch_count: HistogramVec,
178 pub split_compaction_group_count: IntCounterVec,
179 pub state_table_count: IntGaugeVec,
180 pub branched_sst_count: IntGaugeVec,
181 pub compact_task_trivial_move_sst_count: HistogramVec,
182
183 pub compaction_event_consumed_latency: Histogram,
184 pub compaction_event_loop_iteration_latency: Histogram,
185
186 pub object_store_metric: Arc<ObjectStoreMetrics>,
189
190 pub source_is_up: LabelGuardedIntGaugeVec,
193 pub source_enumerator_metrics: Arc<SourceEnumeratorMetrics>,
194
195 pub actor_info: IntGaugeVec,
198 pub table_info: IntGaugeVec,
200 pub sink_info: IntGaugeVec,
202 pub relation_info: IntGaugeVec,
204
205 pub system_param_info: IntGaugeVec,
209
210 pub table_write_throughput: IntCounterVec,
212
213 pub merge_compaction_group_count: IntCounterVec,
215
216 pub auto_schema_change_failure_cnt: LabelGuardedIntCounterVec,
218 pub auto_schema_change_success_cnt: LabelGuardedIntCounterVec,
219 pub auto_schema_change_latency: LabelGuardedHistogramVec,
220
221 pub time_travel_version_replay_latency: Histogram,
222
223 pub compaction_group_count: IntGauge,
224 pub compaction_group_size: IntGaugeVec,
225 pub compaction_group_file_count: IntGaugeVec,
226 pub compaction_group_throughput: IntGaugeVec,
227
228 pub refresh_job_duration: LabelGuardedUintGaugeVec,
230 pub refresh_job_finish_cnt: LabelGuardedIntCounterVec,
231 pub refresh_cron_job_trigger_cnt: LabelGuardedIntCounterVec,
232 pub refresh_cron_job_miss_cnt: LabelGuardedIntCounterVec,
233}
234
235pub static GLOBAL_META_METRICS: LazyLock<MetaMetrics> =
236 LazyLock::new(|| MetaMetrics::new(&GLOBAL_METRICS_REGISTRY));
237
238impl MetaMetrics {
239 fn new(registry: &Registry) -> Self {
240 let opts = histogram_opts!(
241 "meta_grpc_duration_seconds",
242 "gRPC latency of meta services",
243 exponential_buckets(0.0001, 2.0, 20).unwrap() );
245 let grpc_latency =
246 register_histogram_vec_with_registry!(opts, &["path"], registry).unwrap();
247
248 let opts = histogram_opts!(
249 "meta_barrier_duration_seconds",
250 "barrier latency",
251 exponential_buckets(0.1, 1.5, 20).unwrap() );
253 let barrier_latency =
254 register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
255 .unwrap();
256
257 let opts = histogram_opts!(
258 "meta_barrier_wait_commit_duration_seconds",
259 "barrier_wait_commit_latency",
260 exponential_buckets(0.1, 1.5, 20).unwrap() );
262 let barrier_wait_commit_latency =
263 register_histogram_with_registry!(opts, registry).unwrap();
264
265 let opts = histogram_opts!(
266 "meta_barrier_send_duration_seconds",
267 "barrier send latency",
268 exponential_buckets(0.1, 1.5, 19).unwrap() );
270 let barrier_send_latency =
271 register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
272 .unwrap();
273 let barrier_interval_by_database = register_gauge_vec_with_registry!(
274 "meta_barrier_interval_by_database",
275 "barrier interval of each database",
276 &["database_id"],
277 registry
278 )
279 .unwrap();
280
281 let all_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
282 "all_barrier_nums",
283 "num of of all_barrier",
284 &["database_id"],
285 registry
286 )
287 .unwrap();
288 let in_flight_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
289 "in_flight_barrier_nums",
290 "num of of in_flight_barrier",
291 &["database_id"],
292 registry
293 )
294 .unwrap();
295 let last_committed_barrier_time = register_int_gauge_vec_with_registry!(
296 "last_committed_barrier_time",
297 "The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.",
298 &["database_id"],
299 registry
300 )
301 .unwrap();
302
303 let opts = histogram_opts!(
305 "meta_snapshot_backfill_barrier_duration_seconds",
306 "snapshot backfill barrier latency",
307 exponential_buckets(0.1, 1.5, 20).unwrap() );
309 let snapshot_backfill_barrier_latency = register_guarded_histogram_vec_with_registry!(
310 opts,
311 &["table_id", "barrier_type"],
312 registry
313 )
314 .unwrap();
315 let opts = histogram_opts!(
316 "meta_snapshot_backfill_barrier_wait_commit_duration_seconds",
317 "snapshot backfill barrier_wait_commit_latency",
318 exponential_buckets(0.1, 1.5, 20).unwrap() );
320 let snapshot_backfill_wait_commit_latency =
321 register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
322
323 let snapshot_backfill_lag = register_guarded_int_gauge_vec_with_registry!(
324 "meta_snapshot_backfill_upstream_lag",
325 "snapshot backfill upstream_lag",
326 &["table_id"],
327 registry
328 )
329 .unwrap();
330 let snapshot_backfill_inflight_barrier_num = register_guarded_int_gauge_vec_with_registry!(
331 "meta_snapshot_backfill_inflight_barrier_num",
332 "snapshot backfill inflight_barrier_num",
333 &["table_id"],
334 registry
335 )
336 .unwrap();
337
338 let max_committed_epoch = register_int_gauge_with_registry!(
339 "storage_max_committed_epoch",
340 "max committed epoch",
341 registry
342 )
343 .unwrap();
344
345 let min_committed_epoch = register_int_gauge_with_registry!(
346 "storage_min_committed_epoch",
347 "min committed epoch",
348 registry
349 )
350 .unwrap();
351
352 let level_sst_num = register_int_gauge_vec_with_registry!(
353 "storage_level_sst_num",
354 "num of SSTs in each level",
355 &["level_index"],
356 registry
357 )
358 .unwrap();
359
360 let level_compact_cnt = register_int_gauge_vec_with_registry!(
361 "storage_level_compact_cnt",
362 "num of SSTs to be merged to next level in each level",
363 &["level_index"],
364 registry
365 )
366 .unwrap();
367
368 let compact_frequency = register_int_counter_vec_with_registry!(
369 "storage_level_compact_frequency",
370 "The number of compactions from one level to another level that have completed or failed.",
371 &["compactor", "group", "task_type", "result"],
372 registry
373 )
374 .unwrap();
375 let compact_skip_frequency = register_int_counter_vec_with_registry!(
376 "storage_skip_compact_frequency",
377 "The number of compactions from one level to another level that have been skipped.",
378 &["level", "type"],
379 registry
380 )
381 .unwrap();
382
383 let version_size =
384 register_int_gauge_with_registry!("storage_version_size", "version size", registry)
385 .unwrap();
386
387 let current_version_id = register_int_gauge_with_registry!(
388 "storage_current_version_id",
389 "current version id",
390 registry
391 )
392 .unwrap();
393
394 let checkpoint_version_id = register_int_gauge_with_registry!(
395 "storage_checkpoint_version_id",
396 "checkpoint version id",
397 registry
398 )
399 .unwrap();
400
401 let min_pinned_version_id = register_int_gauge_with_registry!(
402 "storage_min_pinned_version_id",
403 "min pinned version id",
404 registry
405 )
406 .unwrap();
407
408 let write_stop_compaction_groups = register_int_gauge_vec_with_registry!(
409 "storage_write_stop_compaction_groups",
410 "compaction groups of write stop state",
411 &["compaction_group_id"],
412 registry
413 )
414 .unwrap();
415
416 let full_gc_trigger_count = register_int_gauge_with_registry!(
417 "storage_full_gc_trigger_count",
418 "the number of attempts to trigger full GC",
419 registry
420 )
421 .unwrap();
422
423 let opts = histogram_opts!(
424 "storage_full_gc_candidate_object_count",
425 "the number of candidate object to delete after scanning object store",
426 exponential_buckets(1.0, 10.0, 6).unwrap()
427 );
428 let full_gc_candidate_object_count =
429 register_histogram_with_registry!(opts, registry).unwrap();
430
431 let opts = histogram_opts!(
432 "storage_full_gc_selected_object_count",
433 "the number of object to delete after filtering by meta node",
434 exponential_buckets(1.0, 10.0, 6).unwrap()
435 );
436 let full_gc_selected_object_count =
437 register_histogram_with_registry!(opts, registry).unwrap();
438
439 let min_safepoint_version_id = register_int_gauge_with_registry!(
440 "storage_min_safepoint_version_id",
441 "min safepoint version id",
442 registry
443 )
444 .unwrap();
445
446 let level_file_size = register_int_gauge_vec_with_registry!(
447 "storage_level_total_file_size",
448 "KBs total file bytes in each level",
449 &["level_index"],
450 registry
451 )
452 .unwrap();
453
454 let version_stats = register_int_gauge_vec_with_registry!(
455 "storage_version_stats",
456 "per table stats in current hummock version",
457 &["table_id", "metric"],
458 registry
459 )
460 .unwrap();
461
462 let materialized_view_stats = register_int_gauge_vec_with_registry!(
463 "storage_materialized_view_stats",
464 "per materialized view stats in current hummock version",
465 &["table_id", "metric"],
466 registry
467 )
468 .unwrap();
469
470 let stale_object_count = register_int_gauge_with_registry!(
471 "storage_stale_object_count",
472 "total number of objects that is no longer referenced by versions.",
473 registry
474 )
475 .unwrap();
476
477 let stale_object_size = register_int_gauge_with_registry!(
478 "storage_stale_object_size",
479 "total size of objects that is no longer referenced by versions.",
480 registry
481 )
482 .unwrap();
483
484 let old_version_object_count = register_int_gauge_with_registry!(
485 "storage_old_version_object_count",
486 "total number of objects that is still referenced by non-current versions",
487 registry
488 )
489 .unwrap();
490
491 let old_version_object_size = register_int_gauge_with_registry!(
492 "storage_old_version_object_size",
493 "total size of objects that is still referenced by non-current versions",
494 registry
495 )
496 .unwrap();
497
498 let current_version_object_count = register_int_gauge_with_registry!(
499 "storage_current_version_object_count",
500 "total number of objects that is referenced by current version",
501 registry
502 )
503 .unwrap();
504
505 let current_version_object_size = register_int_gauge_with_registry!(
506 "storage_current_version_object_size",
507 "total size of objects that is referenced by current version",
508 registry
509 )
510 .unwrap();
511
512 let total_object_count = register_int_gauge_with_registry!(
513 "storage_total_object_count",
514 "Total number of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
515 registry
516 ).unwrap();
517
518 let total_object_size = register_int_gauge_with_registry!(
519 "storage_total_object_size",
520 "Total size of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
521 registry
522 ).unwrap();
523
524 let table_change_log_object_count = register_int_gauge_vec_with_registry!(
525 "storage_table_change_log_object_count",
526 "per table change log object count",
527 &["table_id"],
528 registry
529 )
530 .unwrap();
531
532 let table_change_log_object_size = register_int_gauge_vec_with_registry!(
533 "storage_table_change_log_object_size",
534 "per table change log object size",
535 &["table_id"],
536 registry
537 )
538 .unwrap();
539
540 let time_travel_object_count = register_int_gauge_with_registry!(
541 "storage_time_travel_object_count",
542 "total number of objects that is referenced by time travel.",
543 registry
544 )
545 .unwrap();
546
547 let delta_log_count = register_int_gauge_with_registry!(
548 "storage_delta_log_count",
549 "total number of hummock version delta log",
550 registry
551 )
552 .unwrap();
553
554 let opts = histogram_opts!(
555 "storage_version_checkpoint_latency",
556 "hummock version checkpoint latency",
557 exponential_buckets(0.1, 1.5, 20).unwrap()
558 );
559 let version_checkpoint_latency = register_histogram_with_registry!(opts, registry).unwrap();
560
561 let hummock_manager_lock_time = register_histogram_vec_with_registry!(
562 "hummock_manager_lock_time",
563 "latency for hummock manager to acquire the rwlock",
564 &["lock_name", "lock_type"],
565 registry
566 )
567 .unwrap();
568
569 let hummock_manager_real_process_time = register_histogram_vec_with_registry!(
570 "meta_hummock_manager_real_process_time",
571 "latency for hummock manager to really process the request",
572 &["method"],
573 registry
574 )
575 .unwrap();
576
577 let worker_num = register_int_gauge_vec_with_registry!(
578 "worker_num",
579 "number of nodes in the cluster",
580 &["worker_type"],
581 registry,
582 )
583 .unwrap();
584
585 let meta_type = register_int_gauge_vec_with_registry!(
586 "meta_num",
587 "role of meta nodes in the cluster",
588 &["worker_addr", "role"],
589 registry,
590 )
591 .unwrap();
592
593 let compact_pending_bytes = register_int_gauge_vec_with_registry!(
594 "storage_compact_pending_bytes",
595 "bytes of lsm tree needed to reach balance",
596 &["group"],
597 registry
598 )
599 .unwrap();
600
601 let compact_level_compression_ratio = register_gauge_vec_with_registry!(
602 "storage_compact_level_compression_ratio",
603 "compression ratio of each level of the lsm tree",
604 &["group", "level", "algorithm"],
605 registry
606 )
607 .unwrap();
608
609 let level_compact_task_cnt = register_int_gauge_vec_with_registry!(
610 "storage_level_compact_task_cnt",
611 "num of compact_task organized by group and level",
612 &["task"],
613 registry
614 )
615 .unwrap();
616 let object_store_metric = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
617
618 let recovery_failure_cnt = register_int_counter_vec_with_registry!(
619 "recovery_failure_cnt",
620 "Number of failed recovery attempts",
621 &["recovery_type"],
622 registry
623 )
624 .unwrap();
625 let opts = histogram_opts!(
626 "recovery_latency",
627 "Latency of the recovery process",
628 exponential_buckets(0.1, 1.5, 20).unwrap() );
630 let recovery_latency =
631 register_histogram_vec_with_registry!(opts, &["recovery_type"], registry).unwrap();
632
633 let auto_schema_change_failure_cnt = register_guarded_int_counter_vec_with_registry!(
634 "auto_schema_change_failure_cnt",
635 "Number of failed auto schema change",
636 &["table_id", "table_name"],
637 registry
638 )
639 .unwrap();
640
641 let auto_schema_change_success_cnt = register_guarded_int_counter_vec_with_registry!(
642 "auto_schema_change_success_cnt",
643 "Number of success auto schema change",
644 &["table_id", "table_name"],
645 registry
646 )
647 .unwrap();
648
649 let opts = histogram_opts!(
650 "auto_schema_change_latency",
651 "Latency of the auto schema change process",
652 exponential_buckets(0.1, 1.5, 20).unwrap() );
654 let auto_schema_change_latency = register_guarded_histogram_vec_with_registry!(
655 opts,
656 &["table_id", "table_name"],
657 registry
658 )
659 .unwrap();
660
661 let source_is_up = register_guarded_int_gauge_vec_with_registry!(
662 "source_status_is_up",
663 "source is up or not",
664 &["source_id", "source_name"],
665 registry
666 )
667 .unwrap();
668 let source_enumerator_metrics = Arc::new(SourceEnumeratorMetrics::default());
669
670 let actor_info = register_int_gauge_vec_with_registry!(
671 "actor_info",
672 "Mapping from actor id to (fragment id, compute node)",
673 &["actor_id", "fragment_id", "compute_node"],
674 registry
675 )
676 .unwrap();
677
678 let table_info = register_int_gauge_vec_with_registry!(
679 "table_info",
680 "Mapping from table id to (actor id, table name)",
681 &[
682 "materialized_view_id",
683 "table_id",
684 "fragment_id",
685 "table_name",
686 "table_type",
687 "compaction_group_id"
688 ],
689 registry
690 )
691 .unwrap();
692
693 let sink_info = register_int_gauge_vec_with_registry!(
694 "sink_info",
695 "Mapping from actor id to (actor id, sink name)",
696 &["actor_id", "sink_id", "sink_name",],
697 registry
698 )
699 .unwrap();
700
701 let relation_info = register_int_gauge_vec_with_registry!(
702 "relation_info",
703 "Information of the database relation (table/source/sink)",
704 &["id", "database", "schema", "name", "resource_group", "type"],
705 registry
706 )
707 .unwrap();
708
709 let l0_compact_level_count = register_histogram_vec_with_registry!(
710 "storage_l0_compact_level_count",
711 "level_count of l0 compact task",
712 &["group", "type"],
713 registry
714 )
715 .unwrap();
716
717 let system_param_info = register_int_gauge_vec_with_registry!(
719 "system_param_info",
720 "Information of system parameters",
721 &["name", "value"],
722 registry
723 )
724 .unwrap();
725
726 let opts = histogram_opts!(
727 "storage_compact_task_size",
728 "Total size of compact that have been issued to state store",
729 exponential_buckets(1048576.0, 2.0, 16).unwrap()
730 );
731
732 let compact_task_size =
733 register_histogram_vec_with_registry!(opts, &["group", "type"], registry).unwrap();
734
735 let compact_task_file_count = register_histogram_vec_with_registry!(
736 "storage_compact_task_file_count",
737 "file count of compact task",
738 &["group", "type"],
739 registry
740 )
741 .unwrap();
742 let opts = histogram_opts!(
743 "storage_compact_task_batch_count",
744 "count of compact task batch",
745 exponential_buckets(1.0, 2.0, 8).unwrap()
746 );
747 let compact_task_batch_count =
748 register_histogram_vec_with_registry!(opts, &["type"], registry).unwrap();
749
750 let table_write_throughput = register_int_counter_vec_with_registry!(
751 "storage_commit_write_throughput",
752 "The number of compactions from one level to another level that have been skipped.",
753 &["table_id"],
754 registry
755 )
756 .unwrap();
757
758 let split_compaction_group_count = register_int_counter_vec_with_registry!(
759 "storage_split_compaction_group_count",
760 "Count of trigger split compaction group",
761 &["group"],
762 registry
763 )
764 .unwrap();
765
766 let state_table_count = register_int_gauge_vec_with_registry!(
767 "storage_state_table_count",
768 "Count of stable table per compaction group",
769 &["group"],
770 registry
771 )
772 .unwrap();
773
774 let branched_sst_count = register_int_gauge_vec_with_registry!(
775 "storage_branched_sst_count",
776 "Count of branched sst per compaction group",
777 &["group"],
778 registry
779 )
780 .unwrap();
781
782 let opts = histogram_opts!(
783 "storage_compaction_event_consumed_latency",
784 "The latency(ms) of each event being consumed",
785 exponential_buckets(1.0, 1.5, 30).unwrap() );
787 let compaction_event_consumed_latency =
788 register_histogram_with_registry!(opts, registry).unwrap();
789
790 let opts = histogram_opts!(
791 "storage_compaction_event_loop_iteration_latency",
792 "The latency(ms) of each iteration of the compaction event loop",
793 exponential_buckets(1.0, 1.5, 30).unwrap() );
795 let compaction_event_loop_iteration_latency =
796 register_histogram_with_registry!(opts, registry).unwrap();
797
798 let merge_compaction_group_count = register_int_counter_vec_with_registry!(
799 "storage_merge_compaction_group_count",
800 "Count of trigger merge compaction group",
801 &["group"],
802 registry
803 )
804 .unwrap();
805
806 let opts = histogram_opts!(
807 "storage_time_travel_version_replay_latency",
808 "The latency(ms) of replaying a hummock version for time travel",
809 exponential_buckets(0.01, 10.0, 6).unwrap()
810 );
811 let time_travel_version_replay_latency =
812 register_histogram_with_registry!(opts, registry).unwrap();
813
814 let compaction_group_count = register_int_gauge_with_registry!(
815 "storage_compaction_group_count",
816 "The number of compaction groups",
817 registry,
818 )
819 .unwrap();
820
821 let compaction_group_size = register_int_gauge_vec_with_registry!(
822 "storage_compaction_group_size",
823 "The size of compaction group",
824 &["group"],
825 registry
826 )
827 .unwrap();
828
829 let compaction_group_file_count = register_int_gauge_vec_with_registry!(
830 "storage_compaction_group_file_count",
831 "The file count of compaction group",
832 &["group"],
833 registry
834 )
835 .unwrap();
836
837 let compaction_group_throughput = register_int_gauge_vec_with_registry!(
838 "storage_compaction_group_throughput",
839 "The throughput of compaction group",
840 &["group"],
841 registry
842 )
843 .unwrap();
844
845 let opts = histogram_opts!(
846 "storage_compact_task_trivial_move_sst_count",
847 "sst count of compact trivial-move task",
848 exponential_buckets(1.0, 2.0, 8).unwrap()
849 );
850 let compact_task_trivial_move_sst_count =
851 register_histogram_vec_with_registry!(opts, &["group"], registry).unwrap();
852
853 let refresh_job_duration = register_guarded_uint_gauge_vec_with_registry!(
854 "meta_refresh_job_duration",
855 "The duration of refresh job",
856 &["table_id", "status"],
857 registry
858 )
859 .unwrap();
860 let refresh_job_finish_cnt = register_guarded_int_counter_vec_with_registry!(
861 "meta_refresh_job_finish_cnt",
862 "The number of finished refresh jobs",
863 &["table_id", "status"],
864 registry
865 )
866 .unwrap();
867 let refresh_cron_job_trigger_cnt = register_guarded_int_counter_vec_with_registry!(
868 "meta_refresh_cron_job_trigger_cnt",
869 "The number of cron refresh jobs triggered",
870 &["table_id"],
871 registry
872 )
873 .unwrap();
874 let refresh_cron_job_miss_cnt = register_guarded_int_counter_vec_with_registry!(
875 "meta_refresh_cron_job_miss_cnt",
876 "The number of cron refresh jobs missed",
877 &["table_id"],
878 registry
879 )
880 .unwrap();
881
882 Self {
883 grpc_latency,
884 barrier_latency,
885 barrier_wait_commit_latency,
886 barrier_send_latency,
887 all_barrier_nums,
888 in_flight_barrier_nums,
889 last_committed_barrier_time,
890 barrier_interval_by_database,
891 snapshot_backfill_barrier_latency,
892 snapshot_backfill_wait_commit_latency,
893 snapshot_backfill_lag,
894 snapshot_backfill_inflight_barrier_num,
895 recovery_failure_cnt,
896 recovery_latency,
897
898 max_committed_epoch,
899 min_committed_epoch,
900 level_sst_num,
901 level_compact_cnt,
902 compact_frequency,
903 compact_skip_frequency,
904 level_file_size,
905 version_size,
906 version_stats,
907 materialized_view_stats,
908 stale_object_count,
909 stale_object_size,
910 old_version_object_count,
911 old_version_object_size,
912 time_travel_object_count,
913 current_version_object_count,
914 current_version_object_size,
915 total_object_count,
916 total_object_size,
917 table_change_log_object_count,
918 table_change_log_object_size,
919 delta_log_count,
920 version_checkpoint_latency,
921 current_version_id,
922 checkpoint_version_id,
923 min_pinned_version_id,
924 min_safepoint_version_id,
925 write_stop_compaction_groups,
926 full_gc_trigger_count,
927 full_gc_candidate_object_count,
928 full_gc_selected_object_count,
929 hummock_manager_lock_time,
930 hummock_manager_real_process_time,
931 time_after_last_observation: Arc::new(AtomicU64::new(0)),
932 worker_num,
933 meta_type,
934 compact_pending_bytes,
935 compact_level_compression_ratio,
936 level_compact_task_cnt,
937 object_store_metric,
938 source_is_up,
939 source_enumerator_metrics,
940 actor_info,
941 table_info,
942 sink_info,
943 relation_info,
944 system_param_info,
945 l0_compact_level_count,
946 compact_task_size,
947 compact_task_file_count,
948 compact_task_batch_count,
949 compact_task_trivial_move_sst_count,
950 table_write_throughput,
951 split_compaction_group_count,
952 state_table_count,
953 branched_sst_count,
954 compaction_event_consumed_latency,
955 compaction_event_loop_iteration_latency,
956 auto_schema_change_failure_cnt,
957 auto_schema_change_success_cnt,
958 auto_schema_change_latency,
959 merge_compaction_group_count,
960 time_travel_version_replay_latency,
961 compaction_group_count,
962 compaction_group_size,
963 compaction_group_file_count,
964 compaction_group_throughput,
965 refresh_job_duration,
966 refresh_job_finish_cnt,
967 refresh_cron_job_trigger_cnt,
968 refresh_cron_job_miss_cnt,
969 }
970 }
971
972 #[cfg(test)]
973 pub fn for_test(registry: &Registry) -> Self {
974 Self::new(registry)
975 }
976}
977impl Default for MetaMetrics {
978 fn default() -> Self {
979 GLOBAL_META_METRICS.clone()
980 }
981}
982
983pub async fn refresh_system_param_info_metrics(
985 system_params_controller: &SystemParamsControllerRef,
986 meta_metrics: Arc<MetaMetrics>,
987) {
988 let params_info = system_params_controller.get_params().await.get_all();
989
990 meta_metrics.system_param_info.reset();
991 for info in params_info {
992 meta_metrics
993 .system_param_info
994 .with_label_values(&[info.name, &info.value])
995 .set(1);
996 }
997}
998
999pub fn start_worker_info_monitor(
1000 metadata_manager: MetadataManager,
1001 election_client: ElectionClientRef,
1002 interval: Duration,
1003 meta_metrics: Arc<MetaMetrics>,
1004) -> (JoinHandle<()>, Sender<()>) {
1005 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1006 let join_handle = tokio::spawn(async move {
1007 let mut monitor_interval = tokio::time::interval(interval);
1008 monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1009 loop {
1010 tokio::select! {
1011 _ = monitor_interval.tick() => {},
1013 _ = &mut shutdown_rx => {
1015 tracing::info!("Worker number monitor is stopped");
1016 return;
1017 }
1018 }
1019
1020 let node_map = match metadata_manager.count_worker_node().await {
1021 Ok(node_map) => node_map,
1022 Err(err) => {
1023 tracing::warn!(error = %err.as_report(), "fail to count worker node");
1024 continue;
1025 }
1026 };
1027
1028 meta_metrics.worker_num.reset();
1030 meta_metrics.meta_type.reset();
1031
1032 for (worker_type, worker_num) in node_map {
1033 meta_metrics
1034 .worker_num
1035 .with_label_values(&[(worker_type.as_str_name())])
1036 .set(worker_num as i64);
1037 }
1038 if let Ok(meta_members) = election_client.get_members().await {
1039 meta_metrics
1040 .worker_num
1041 .with_label_values(&[WorkerType::Meta.as_str_name()])
1042 .set(meta_members.len() as i64);
1043 meta_members.into_iter().for_each(|m| {
1044 let role = if m.is_leader { "leader" } else { "follower" };
1045 meta_metrics
1046 .meta_type
1047 .with_label_values(&[m.id.as_str(), role])
1048 .set(1);
1049 });
1050 }
1051 }
1052 });
1053
1054 (join_handle, shutdown_tx)
1055}
1056
1057pub async fn refresh_fragment_info_metrics(
1058 catalog_controller: &CatalogControllerRef,
1059 cluster_controller: &ClusterControllerRef,
1060 hummock_manager: &HummockManagerRef,
1061 meta_metrics: Arc<MetaMetrics>,
1062) {
1063 let worker_nodes = match cluster_controller
1064 .list_workers(Some(WorkerType::ComputeNode.into()), None)
1065 .await
1066 {
1067 Ok(worker_nodes) => worker_nodes,
1068 Err(err) => {
1069 tracing::warn!(error=%err.as_report(), "fail to list worker node");
1070 return;
1071 }
1072 };
1073 let actor_locations = match catalog_controller.list_actor_locations() {
1074 Ok(actor_locations) => actor_locations,
1075 Err(err) => {
1076 tracing::warn!(error=%err.as_report(), "fail to get actor locations");
1077 return;
1078 }
1079 };
1080 let sink_actor_mapping = match catalog_controller.list_sink_actor_mapping().await {
1081 Ok(sink_actor_mapping) => sink_actor_mapping,
1082 Err(err) => {
1083 tracing::warn!(error=%err.as_report(), "fail to get sink actor mapping");
1084 return;
1085 }
1086 };
1087 let fragment_state_tables = match catalog_controller.list_fragment_state_tables().await {
1088 Ok(fragment_state_tables) => fragment_state_tables,
1089 Err(err) => {
1090 tracing::warn!(error=%err.as_report(), "fail to get fragment state tables");
1091 return;
1092 }
1093 };
1094 let table_name_and_type_mapping = match catalog_controller.get_table_name_type_mapping().await {
1095 Ok(mapping) => mapping,
1096 Err(err) => {
1097 tracing::warn!(error=%err.as_report(), "fail to get table name mapping");
1098 return;
1099 }
1100 };
1101
1102 let worker_addr_mapping: HashMap<WorkerId, String> = worker_nodes
1103 .into_iter()
1104 .map(|worker_node| {
1105 let addr = match worker_node.host {
1106 Some(host) => format!("{}:{}", host.host, host.port),
1107 None => "".to_owned(),
1108 };
1109 (worker_node.id, addr)
1110 })
1111 .collect();
1112 let table_compaction_group_id_mapping = hummock_manager
1113 .get_table_compaction_group_id_mapping()
1114 .await;
1115
1116 meta_metrics.actor_info.reset();
1119 meta_metrics.table_info.reset();
1120 meta_metrics.sink_info.reset();
1121 for actor_location in actor_locations {
1122 let actor_id_str = actor_location.actor_id.to_string();
1123 let fragment_id_str = actor_location.fragment_id.to_string();
1124 if let Some(address) = worker_addr_mapping.get(&actor_location.worker_id) {
1127 meta_metrics
1128 .actor_info
1129 .with_label_values(&[&actor_id_str, &fragment_id_str, address])
1130 .set(1);
1131 }
1132 }
1133 for (sink_id, (sink_name, actor_ids)) in sink_actor_mapping {
1134 let sink_id_str = sink_id.to_string();
1135 for actor_id in actor_ids {
1136 let actor_id_str = actor_id.to_string();
1137 meta_metrics
1138 .sink_info
1139 .with_label_values(&[&actor_id_str, &sink_id_str, &sink_name])
1140 .set(1);
1141 }
1142 }
1143 for PartialFragmentStateTables {
1144 fragment_id,
1145 job_id,
1146 state_table_ids,
1147 } in fragment_state_tables
1148 {
1149 let fragment_id_str = fragment_id.to_string();
1150 let job_id_str = job_id.to_string();
1151 for table_id in state_table_ids.into_inner() {
1152 let table_id_str = table_id.to_string();
1153 let (table_name, table_type) = table_name_and_type_mapping
1154 .get(&table_id)
1155 .cloned()
1156 .unwrap_or_else(|| ("unknown".to_owned(), "unknown".to_owned()));
1157 let compaction_group_id = table_compaction_group_id_mapping
1158 .get(&table_id)
1159 .map(|cg_id| cg_id.to_string())
1160 .unwrap_or_else(|| "unknown".to_owned());
1161 meta_metrics
1162 .table_info
1163 .with_label_values(&[
1164 &job_id_str,
1165 &table_id_str,
1166 &fragment_id_str,
1167 &table_name,
1168 &table_type,
1169 &compaction_group_id,
1170 ])
1171 .set(1);
1172 }
1173 }
1174}
1175
1176pub async fn refresh_relation_info_metrics(
1177 catalog_controller: &CatalogControllerRef,
1178 meta_metrics: Arc<MetaMetrics>,
1179) {
1180 let table_objects = match catalog_controller.list_table_objects().await {
1181 Ok(table_objects) => table_objects,
1182 Err(err) => {
1183 tracing::warn!(error=%err.as_report(), "fail to get table objects");
1184 return;
1185 }
1186 };
1187
1188 let source_objects = match catalog_controller.list_source_objects().await {
1189 Ok(source_objects) => source_objects,
1190 Err(err) => {
1191 tracing::warn!(error=%err.as_report(), "fail to get source objects");
1192 return;
1193 }
1194 };
1195
1196 let sink_objects = match catalog_controller.list_sink_objects().await {
1197 Ok(sink_objects) => sink_objects,
1198 Err(err) => {
1199 tracing::warn!(error=%err.as_report(), "fail to get sink objects");
1200 return;
1201 }
1202 };
1203
1204 meta_metrics.relation_info.reset();
1205
1206 for (id, db, schema, name, resource_group) in table_objects {
1207 meta_metrics
1208 .relation_info
1209 .with_label_values(&[
1210 &id.to_string(),
1211 &db,
1212 &schema,
1213 &name,
1214 &resource_group,
1215 &"table".to_owned(),
1216 ])
1217 .set(1);
1218 }
1219
1220 for (id, db, schema, name, resource_group) in source_objects {
1221 meta_metrics
1222 .relation_info
1223 .with_label_values(&[
1224 &id.to_string(),
1225 &db,
1226 &schema,
1227 &name,
1228 &resource_group,
1229 &"source".to_owned(),
1230 ])
1231 .set(1);
1232 }
1233
1234 for (id, db, schema, name, resource_group) in sink_objects {
1235 meta_metrics
1236 .relation_info
1237 .with_label_values(&[
1238 &id.to_string(),
1239 &db,
1240 &schema,
1241 &name,
1242 &resource_group,
1243 &"sink".to_owned(),
1244 ])
1245 .set(1);
1246 }
1247}
1248
1249pub fn start_info_monitor(
1250 metadata_manager: MetadataManager,
1251 hummock_manager: HummockManagerRef,
1252 system_params_controller: SystemParamsControllerRef,
1253 meta_metrics: Arc<MetaMetrics>,
1254) -> (JoinHandle<()>, Sender<()>) {
1255 const COLLECT_INTERVAL_SECONDS: u64 = 60;
1256
1257 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1258 let join_handle = tokio::spawn(async move {
1259 let mut monitor_interval =
1260 tokio::time::interval(Duration::from_secs(COLLECT_INTERVAL_SECONDS));
1261 monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1262 loop {
1263 tokio::select! {
1264 _ = monitor_interval.tick() => {},
1266 _ = &mut shutdown_rx => {
1268 tracing::info!("Meta info monitor is stopped");
1269 return;
1270 }
1271 }
1272
1273 refresh_fragment_info_metrics(
1275 &metadata_manager.catalog_controller,
1276 &metadata_manager.cluster_controller,
1277 &hummock_manager,
1278 meta_metrics.clone(),
1279 )
1280 .await;
1281
1282 refresh_relation_info_metrics(
1283 &metadata_manager.catalog_controller,
1284 meta_metrics.clone(),
1285 )
1286 .await;
1287
1288 refresh_system_param_info_metrics(&system_params_controller, meta_metrics.clone())
1290 .await;
1291 }
1292 });
1293
1294 (join_handle, shutdown_tx)
1295}