1use std::collections::HashMap;
16use std::sync::atomic::AtomicU64;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use prometheus::core::{AtomicF64, GenericGaugeVec};
21use prometheus::{
22 GaugeVec, Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
23 exponential_buckets, histogram_opts, register_gauge_vec_with_registry,
24 register_histogram_vec_with_registry, register_histogram_with_registry,
25 register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
26 register_int_gauge_with_registry,
27};
28use risingwave_common::metrics::{
29 LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
30};
31use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
32use risingwave_common::system_param::reader::SystemParamsRead;
33use risingwave_common::{
34 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
35 register_guarded_int_gauge_vec_with_registry,
36};
37use risingwave_connector::source::monitor::EnumeratorMetrics as SourceEnumeratorMetrics;
38use risingwave_meta_model::WorkerId;
39use risingwave_object_store::object::object_metrics::{
40 GLOBAL_OBJECT_STORE_METRICS, ObjectStoreMetrics,
41};
42use risingwave_pb::common::WorkerType;
43use thiserror_ext::AsReport;
44use tokio::sync::oneshot::Sender;
45use tokio::task::JoinHandle;
46
47use crate::controller::catalog::CatalogControllerRef;
48use crate::controller::cluster::ClusterControllerRef;
49use crate::controller::system_param::SystemParamsControllerRef;
50use crate::controller::utils::PartialFragmentStateTables;
51use crate::hummock::HummockManagerRef;
52use crate::manager::MetadataManager;
53use crate::rpc::ElectionClientRef;
54
55#[derive(Clone)]
56pub struct MetaMetrics {
57 pub worker_num: IntGaugeVec,
60 pub meta_type: IntGaugeVec,
62
63 pub grpc_latency: HistogramVec,
66
67 pub barrier_latency: LabelGuardedHistogramVec,
71 pub barrier_wait_commit_latency: Histogram,
73 pub barrier_send_latency: LabelGuardedHistogramVec,
75 pub all_barrier_nums: LabelGuardedIntGaugeVec,
78 pub in_flight_barrier_nums: LabelGuardedIntGaugeVec,
80 pub last_committed_barrier_time: IntGaugeVec,
82 pub barrier_interval_by_database: GaugeVec,
84
85 pub snapshot_backfill_barrier_latency: LabelGuardedHistogramVec, pub snapshot_backfill_wait_commit_latency: LabelGuardedHistogramVec, pub snapshot_backfill_lag: LabelGuardedIntGaugeVec, pub snapshot_backfill_inflight_barrier_num: LabelGuardedIntGaugeVec, pub recovery_failure_cnt: IntCounterVec,
97 pub recovery_latency: HistogramVec,
98
99 pub max_committed_epoch: IntGauge,
102 pub min_committed_epoch: IntGauge,
104 pub level_sst_num: IntGaugeVec,
106 pub level_compact_cnt: IntGaugeVec,
108 pub compact_frequency: IntCounterVec,
110 pub level_file_size: IntGaugeVec,
112 pub version_size: IntGauge,
114 pub current_version_id: IntGauge,
116 pub checkpoint_version_id: IntGauge,
118 pub min_pinned_version_id: IntGauge,
120 pub min_safepoint_version_id: IntGauge,
122 pub write_stop_compaction_groups: IntGaugeVec,
124 pub full_gc_trigger_count: IntGauge,
126 pub full_gc_candidate_object_count: Histogram,
128 pub full_gc_selected_object_count: Histogram,
130 pub version_stats: IntGaugeVec,
132 pub materialized_view_stats: IntGaugeVec,
134 pub stale_object_count: IntGauge,
136 pub stale_object_size: IntGauge,
138 pub old_version_object_count: IntGauge,
140 pub old_version_object_size: IntGauge,
142 pub time_travel_object_count: IntGauge,
144 pub current_version_object_count: IntGauge,
146 pub current_version_object_size: IntGauge,
148 pub total_object_count: IntGauge,
150 pub total_object_size: IntGauge,
152 pub table_change_log_object_count: IntGaugeVec,
154 pub table_change_log_object_size: IntGaugeVec,
156 pub delta_log_count: IntGauge,
158 pub version_checkpoint_latency: Histogram,
160 pub hummock_manager_lock_time: HistogramVec,
162 pub hummock_manager_real_process_time: HistogramVec,
164 pub compact_skip_frequency: IntCounterVec,
166 pub compact_pending_bytes: IntGaugeVec,
168 pub compact_level_compression_ratio: GenericGaugeVec<AtomicF64>,
170 pub level_compact_task_cnt: IntGaugeVec,
172 pub time_after_last_observation: Arc<AtomicU64>,
173 pub l0_compact_level_count: HistogramVec,
174 pub compact_task_size: HistogramVec,
175 pub compact_task_file_count: HistogramVec,
176 pub compact_task_batch_count: HistogramVec,
177 pub split_compaction_group_count: IntCounterVec,
178 pub state_table_count: IntGaugeVec,
179 pub branched_sst_count: IntGaugeVec,
180 pub compact_task_trivial_move_sst_count: HistogramVec,
181
182 pub compaction_event_consumed_latency: Histogram,
183 pub compaction_event_loop_iteration_latency: Histogram,
184
185 pub object_store_metric: Arc<ObjectStoreMetrics>,
188
189 pub source_is_up: LabelGuardedIntGaugeVec,
192 pub source_enumerator_metrics: Arc<SourceEnumeratorMetrics>,
193
194 pub actor_info: IntGaugeVec,
197 pub table_info: IntGaugeVec,
199 pub sink_info: IntGaugeVec,
201 pub relation_info: IntGaugeVec,
203
204 pub system_param_info: IntGaugeVec,
208
209 pub table_write_throughput: IntCounterVec,
211
212 pub merge_compaction_group_count: IntCounterVec,
214
215 pub auto_schema_change_failure_cnt: LabelGuardedIntCounterVec,
217 pub auto_schema_change_success_cnt: LabelGuardedIntCounterVec,
218 pub auto_schema_change_latency: LabelGuardedHistogramVec,
219
220 pub time_travel_version_replay_latency: Histogram,
221
222 pub compaction_group_count: IntGauge,
223 pub compaction_group_size: IntGaugeVec,
224 pub compaction_group_file_count: IntGaugeVec,
225 pub compaction_group_throughput: IntGaugeVec,
226}
227
228pub static GLOBAL_META_METRICS: LazyLock<MetaMetrics> =
229 LazyLock::new(|| MetaMetrics::new(&GLOBAL_METRICS_REGISTRY));
230
231impl MetaMetrics {
232 fn new(registry: &Registry) -> Self {
233 let opts = histogram_opts!(
234 "meta_grpc_duration_seconds",
235 "gRPC latency of meta services",
236 exponential_buckets(0.0001, 2.0, 20).unwrap() );
238 let grpc_latency =
239 register_histogram_vec_with_registry!(opts, &["path"], registry).unwrap();
240
241 let opts = histogram_opts!(
242 "meta_barrier_duration_seconds",
243 "barrier latency",
244 exponential_buckets(0.1, 1.5, 20).unwrap() );
246 let barrier_latency =
247 register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
248 .unwrap();
249
250 let opts = histogram_opts!(
251 "meta_barrier_wait_commit_duration_seconds",
252 "barrier_wait_commit_latency",
253 exponential_buckets(0.1, 1.5, 20).unwrap() );
255 let barrier_wait_commit_latency =
256 register_histogram_with_registry!(opts, registry).unwrap();
257
258 let opts = histogram_opts!(
259 "meta_barrier_send_duration_seconds",
260 "barrier send latency",
261 exponential_buckets(0.1, 1.5, 19).unwrap() );
263 let barrier_send_latency =
264 register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
265 .unwrap();
266 let barrier_interval_by_database = register_gauge_vec_with_registry!(
267 "meta_barrier_interval_by_database",
268 "barrier interval of each database",
269 &["database_id"],
270 registry
271 )
272 .unwrap();
273
274 let all_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
275 "all_barrier_nums",
276 "num of of all_barrier",
277 &["database_id"],
278 registry
279 )
280 .unwrap();
281 let in_flight_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
282 "in_flight_barrier_nums",
283 "num of of in_flight_barrier",
284 &["database_id"],
285 registry
286 )
287 .unwrap();
288 let last_committed_barrier_time = register_int_gauge_vec_with_registry!(
289 "last_committed_barrier_time",
290 "The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.",
291 &["database_id"],
292 registry
293 )
294 .unwrap();
295
296 let opts = histogram_opts!(
298 "meta_snapshot_backfill_barrier_duration_seconds",
299 "snapshot backfill barrier latency",
300 exponential_buckets(0.1, 1.5, 20).unwrap() );
302 let snapshot_backfill_barrier_latency = register_guarded_histogram_vec_with_registry!(
303 opts,
304 &["table_id", "barrier_type"],
305 registry
306 )
307 .unwrap();
308 let opts = histogram_opts!(
309 "meta_snapshot_backfill_barrier_wait_commit_duration_seconds",
310 "snapshot backfill barrier_wait_commit_latency",
311 exponential_buckets(0.1, 1.5, 20).unwrap() );
313 let snapshot_backfill_wait_commit_latency =
314 register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
315
316 let snapshot_backfill_lag = register_guarded_int_gauge_vec_with_registry!(
317 "meta_snapshot_backfill_upstream_lag",
318 "snapshot backfill upstream_lag",
319 &["table_id"],
320 registry
321 )
322 .unwrap();
323 let snapshot_backfill_inflight_barrier_num = register_guarded_int_gauge_vec_with_registry!(
324 "meta_snapshot_backfill_inflight_barrier_num",
325 "snapshot backfill inflight_barrier_num",
326 &["table_id"],
327 registry
328 )
329 .unwrap();
330
331 let max_committed_epoch = register_int_gauge_with_registry!(
332 "storage_max_committed_epoch",
333 "max committed epoch",
334 registry
335 )
336 .unwrap();
337
338 let min_committed_epoch = register_int_gauge_with_registry!(
339 "storage_min_committed_epoch",
340 "min committed epoch",
341 registry
342 )
343 .unwrap();
344
345 let level_sst_num = register_int_gauge_vec_with_registry!(
346 "storage_level_sst_num",
347 "num of SSTs in each level",
348 &["level_index"],
349 registry
350 )
351 .unwrap();
352
353 let level_compact_cnt = register_int_gauge_vec_with_registry!(
354 "storage_level_compact_cnt",
355 "num of SSTs to be merged to next level in each level",
356 &["level_index"],
357 registry
358 )
359 .unwrap();
360
361 let compact_frequency = register_int_counter_vec_with_registry!(
362 "storage_level_compact_frequency",
363 "The number of compactions from one level to another level that have completed or failed.",
364 &["compactor", "group", "task_type", "result"],
365 registry
366 )
367 .unwrap();
368 let compact_skip_frequency = register_int_counter_vec_with_registry!(
369 "storage_skip_compact_frequency",
370 "The number of compactions from one level to another level that have been skipped.",
371 &["level", "type"],
372 registry
373 )
374 .unwrap();
375
376 let version_size =
377 register_int_gauge_with_registry!("storage_version_size", "version size", registry)
378 .unwrap();
379
380 let current_version_id = register_int_gauge_with_registry!(
381 "storage_current_version_id",
382 "current version id",
383 registry
384 )
385 .unwrap();
386
387 let checkpoint_version_id = register_int_gauge_with_registry!(
388 "storage_checkpoint_version_id",
389 "checkpoint version id",
390 registry
391 )
392 .unwrap();
393
394 let min_pinned_version_id = register_int_gauge_with_registry!(
395 "storage_min_pinned_version_id",
396 "min pinned version id",
397 registry
398 )
399 .unwrap();
400
401 let write_stop_compaction_groups = register_int_gauge_vec_with_registry!(
402 "storage_write_stop_compaction_groups",
403 "compaction groups of write stop state",
404 &["compaction_group_id"],
405 registry
406 )
407 .unwrap();
408
409 let full_gc_trigger_count = register_int_gauge_with_registry!(
410 "storage_full_gc_trigger_count",
411 "the number of attempts to trigger full GC",
412 registry
413 )
414 .unwrap();
415
416 let opts = histogram_opts!(
417 "storage_full_gc_candidate_object_count",
418 "the number of candidate object to delete after scanning object store",
419 exponential_buckets(1.0, 10.0, 6).unwrap()
420 );
421 let full_gc_candidate_object_count =
422 register_histogram_with_registry!(opts, registry).unwrap();
423
424 let opts = histogram_opts!(
425 "storage_full_gc_selected_object_count",
426 "the number of object to delete after filtering by meta node",
427 exponential_buckets(1.0, 10.0, 6).unwrap()
428 );
429 let full_gc_selected_object_count =
430 register_histogram_with_registry!(opts, registry).unwrap();
431
432 let min_safepoint_version_id = register_int_gauge_with_registry!(
433 "storage_min_safepoint_version_id",
434 "min safepoint version id",
435 registry
436 )
437 .unwrap();
438
439 let level_file_size = register_int_gauge_vec_with_registry!(
440 "storage_level_total_file_size",
441 "KBs total file bytes in each level",
442 &["level_index"],
443 registry
444 )
445 .unwrap();
446
447 let version_stats = register_int_gauge_vec_with_registry!(
448 "storage_version_stats",
449 "per table stats in current hummock version",
450 &["table_id", "metric"],
451 registry
452 )
453 .unwrap();
454
455 let materialized_view_stats = register_int_gauge_vec_with_registry!(
456 "storage_materialized_view_stats",
457 "per materialized view stats in current hummock version",
458 &["table_id", "metric"],
459 registry
460 )
461 .unwrap();
462
463 let stale_object_count = register_int_gauge_with_registry!(
464 "storage_stale_object_count",
465 "total number of objects that is no longer referenced by versions.",
466 registry
467 )
468 .unwrap();
469
470 let stale_object_size = register_int_gauge_with_registry!(
471 "storage_stale_object_size",
472 "total size of objects that is no longer referenced by versions.",
473 registry
474 )
475 .unwrap();
476
477 let old_version_object_count = register_int_gauge_with_registry!(
478 "storage_old_version_object_count",
479 "total number of objects that is still referenced by non-current versions",
480 registry
481 )
482 .unwrap();
483
484 let old_version_object_size = register_int_gauge_with_registry!(
485 "storage_old_version_object_size",
486 "total size of objects that is still referenced by non-current versions",
487 registry
488 )
489 .unwrap();
490
491 let current_version_object_count = register_int_gauge_with_registry!(
492 "storage_current_version_object_count",
493 "total number of objects that is referenced by current version",
494 registry
495 )
496 .unwrap();
497
498 let current_version_object_size = register_int_gauge_with_registry!(
499 "storage_current_version_object_size",
500 "total size of objects that is referenced by current version",
501 registry
502 )
503 .unwrap();
504
505 let total_object_count = register_int_gauge_with_registry!(
506 "storage_total_object_count",
507 "Total number of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
508 registry
509 ).unwrap();
510
511 let total_object_size = register_int_gauge_with_registry!(
512 "storage_total_object_size",
513 "Total size of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
514 registry
515 ).unwrap();
516
517 let table_change_log_object_count = register_int_gauge_vec_with_registry!(
518 "storage_table_change_log_object_count",
519 "per table change log object count",
520 &["table_id"],
521 registry
522 )
523 .unwrap();
524
525 let table_change_log_object_size = register_int_gauge_vec_with_registry!(
526 "storage_table_change_log_object_size",
527 "per table change log object size",
528 &["table_id"],
529 registry
530 )
531 .unwrap();
532
533 let time_travel_object_count = register_int_gauge_with_registry!(
534 "storage_time_travel_object_count",
535 "total number of objects that is referenced by time travel.",
536 registry
537 )
538 .unwrap();
539
540 let delta_log_count = register_int_gauge_with_registry!(
541 "storage_delta_log_count",
542 "total number of hummock version delta log",
543 registry
544 )
545 .unwrap();
546
547 let opts = histogram_opts!(
548 "storage_version_checkpoint_latency",
549 "hummock version checkpoint latency",
550 exponential_buckets(0.1, 1.5, 20).unwrap()
551 );
552 let version_checkpoint_latency = register_histogram_with_registry!(opts, registry).unwrap();
553
554 let hummock_manager_lock_time = register_histogram_vec_with_registry!(
555 "hummock_manager_lock_time",
556 "latency for hummock manager to acquire the rwlock",
557 &["lock_name", "lock_type"],
558 registry
559 )
560 .unwrap();
561
562 let hummock_manager_real_process_time = register_histogram_vec_with_registry!(
563 "meta_hummock_manager_real_process_time",
564 "latency for hummock manager to really process the request",
565 &["method"],
566 registry
567 )
568 .unwrap();
569
570 let worker_num = register_int_gauge_vec_with_registry!(
571 "worker_num",
572 "number of nodes in the cluster",
573 &["worker_type"],
574 registry,
575 )
576 .unwrap();
577
578 let meta_type = register_int_gauge_vec_with_registry!(
579 "meta_num",
580 "role of meta nodes in the cluster",
581 &["worker_addr", "role"],
582 registry,
583 )
584 .unwrap();
585
586 let compact_pending_bytes = register_int_gauge_vec_with_registry!(
587 "storage_compact_pending_bytes",
588 "bytes of lsm tree needed to reach balance",
589 &["group"],
590 registry
591 )
592 .unwrap();
593
594 let compact_level_compression_ratio = register_gauge_vec_with_registry!(
595 "storage_compact_level_compression_ratio",
596 "compression ratio of each level of the lsm tree",
597 &["group", "level", "algorithm"],
598 registry
599 )
600 .unwrap();
601
602 let level_compact_task_cnt = register_int_gauge_vec_with_registry!(
603 "storage_level_compact_task_cnt",
604 "num of compact_task organized by group and level",
605 &["task"],
606 registry
607 )
608 .unwrap();
609 let object_store_metric = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
610
611 let recovery_failure_cnt = register_int_counter_vec_with_registry!(
612 "recovery_failure_cnt",
613 "Number of failed recovery attempts",
614 &["recovery_type"],
615 registry
616 )
617 .unwrap();
618 let opts = histogram_opts!(
619 "recovery_latency",
620 "Latency of the recovery process",
621 exponential_buckets(0.1, 1.5, 20).unwrap() );
623 let recovery_latency =
624 register_histogram_vec_with_registry!(opts, &["recovery_type"], registry).unwrap();
625
626 let auto_schema_change_failure_cnt = register_guarded_int_counter_vec_with_registry!(
627 "auto_schema_change_failure_cnt",
628 "Number of failed auto schema change",
629 &["table_id", "table_name"],
630 registry
631 )
632 .unwrap();
633
634 let auto_schema_change_success_cnt = register_guarded_int_counter_vec_with_registry!(
635 "auto_schema_change_success_cnt",
636 "Number of success auto schema change",
637 &["table_id", "table_name"],
638 registry
639 )
640 .unwrap();
641
642 let opts = histogram_opts!(
643 "auto_schema_change_latency",
644 "Latency of the auto schema change process",
645 exponential_buckets(0.1, 1.5, 20).unwrap() );
647 let auto_schema_change_latency = register_guarded_histogram_vec_with_registry!(
648 opts,
649 &["table_id", "table_name"],
650 registry
651 )
652 .unwrap();
653
654 let source_is_up = register_guarded_int_gauge_vec_with_registry!(
655 "source_status_is_up",
656 "source is up or not",
657 &["source_id", "source_name"],
658 registry
659 )
660 .unwrap();
661 let source_enumerator_metrics = Arc::new(SourceEnumeratorMetrics::default());
662
663 let actor_info = register_int_gauge_vec_with_registry!(
664 "actor_info",
665 "Mapping from actor id to (fragment id, compute node)",
666 &["actor_id", "fragment_id", "compute_node"],
667 registry
668 )
669 .unwrap();
670
671 let table_info = register_int_gauge_vec_with_registry!(
672 "table_info",
673 "Mapping from table id to (actor id, table name)",
674 &[
675 "materialized_view_id",
676 "table_id",
677 "fragment_id",
678 "table_name",
679 "table_type",
680 "compaction_group_id"
681 ],
682 registry
683 )
684 .unwrap();
685
686 let sink_info = register_int_gauge_vec_with_registry!(
687 "sink_info",
688 "Mapping from actor id to (actor id, sink name)",
689 &["actor_id", "sink_id", "sink_name",],
690 registry
691 )
692 .unwrap();
693
694 let relation_info = register_int_gauge_vec_with_registry!(
695 "relation_info",
696 "Information of the database relation (table/source/sink)",
697 &["id", "database", "schema", "name", "resource_group", "type"],
698 registry
699 )
700 .unwrap();
701
702 let l0_compact_level_count = register_histogram_vec_with_registry!(
703 "storage_l0_compact_level_count",
704 "level_count of l0 compact task",
705 &["group", "type"],
706 registry
707 )
708 .unwrap();
709
710 let system_param_info = register_int_gauge_vec_with_registry!(
712 "system_param_info",
713 "Information of system parameters",
714 &["name", "value"],
715 registry
716 )
717 .unwrap();
718
719 let opts = histogram_opts!(
720 "storage_compact_task_size",
721 "Total size of compact that have been issued to state store",
722 exponential_buckets(1048576.0, 2.0, 16).unwrap()
723 );
724
725 let compact_task_size =
726 register_histogram_vec_with_registry!(opts, &["group", "type"], registry).unwrap();
727
728 let compact_task_file_count = register_histogram_vec_with_registry!(
729 "storage_compact_task_file_count",
730 "file count of compact task",
731 &["group", "type"],
732 registry
733 )
734 .unwrap();
735 let opts = histogram_opts!(
736 "storage_compact_task_batch_count",
737 "count of compact task batch",
738 exponential_buckets(1.0, 2.0, 8).unwrap()
739 );
740 let compact_task_batch_count =
741 register_histogram_vec_with_registry!(opts, &["type"], registry).unwrap();
742
743 let table_write_throughput = register_int_counter_vec_with_registry!(
744 "storage_commit_write_throughput",
745 "The number of compactions from one level to another level that have been skipped.",
746 &["table_id"],
747 registry
748 )
749 .unwrap();
750
751 let split_compaction_group_count = register_int_counter_vec_with_registry!(
752 "storage_split_compaction_group_count",
753 "Count of trigger split compaction group",
754 &["group"],
755 registry
756 )
757 .unwrap();
758
759 let state_table_count = register_int_gauge_vec_with_registry!(
760 "storage_state_table_count",
761 "Count of stable table per compaction group",
762 &["group"],
763 registry
764 )
765 .unwrap();
766
767 let branched_sst_count = register_int_gauge_vec_with_registry!(
768 "storage_branched_sst_count",
769 "Count of branched sst per compaction group",
770 &["group"],
771 registry
772 )
773 .unwrap();
774
775 let opts = histogram_opts!(
776 "storage_compaction_event_consumed_latency",
777 "The latency(ms) of each event being consumed",
778 exponential_buckets(1.0, 1.5, 30).unwrap() );
780 let compaction_event_consumed_latency =
781 register_histogram_with_registry!(opts, registry).unwrap();
782
783 let opts = histogram_opts!(
784 "storage_compaction_event_loop_iteration_latency",
785 "The latency(ms) of each iteration of the compaction event loop",
786 exponential_buckets(1.0, 1.5, 30).unwrap() );
788 let compaction_event_loop_iteration_latency =
789 register_histogram_with_registry!(opts, registry).unwrap();
790
791 let merge_compaction_group_count = register_int_counter_vec_with_registry!(
792 "storage_merge_compaction_group_count",
793 "Count of trigger merge compaction group",
794 &["group"],
795 registry
796 )
797 .unwrap();
798
799 let opts = histogram_opts!(
800 "storage_time_travel_version_replay_latency",
801 "The latency(ms) of replaying a hummock version for time travel",
802 exponential_buckets(0.01, 10.0, 6).unwrap()
803 );
804 let time_travel_version_replay_latency =
805 register_histogram_with_registry!(opts, registry).unwrap();
806
807 let compaction_group_count = register_int_gauge_with_registry!(
808 "storage_compaction_group_count",
809 "The number of compaction groups",
810 registry,
811 )
812 .unwrap();
813
814 let compaction_group_size = register_int_gauge_vec_with_registry!(
815 "storage_compaction_group_size",
816 "The size of compaction group",
817 &["group"],
818 registry
819 )
820 .unwrap();
821
822 let compaction_group_file_count = register_int_gauge_vec_with_registry!(
823 "storage_compaction_group_file_count",
824 "The file count of compaction group",
825 &["group"],
826 registry
827 )
828 .unwrap();
829
830 let compaction_group_throughput = register_int_gauge_vec_with_registry!(
831 "storage_compaction_group_throughput",
832 "The throughput of compaction group",
833 &["group"],
834 registry
835 )
836 .unwrap();
837
838 let opts = histogram_opts!(
839 "storage_compact_task_trivial_move_sst_count",
840 "sst count of compact trivial-move task",
841 exponential_buckets(1.0, 2.0, 8).unwrap()
842 );
843 let compact_task_trivial_move_sst_count =
844 register_histogram_vec_with_registry!(opts, &["group"], registry).unwrap();
845
846 Self {
847 grpc_latency,
848 barrier_latency,
849 barrier_wait_commit_latency,
850 barrier_send_latency,
851 all_barrier_nums,
852 in_flight_barrier_nums,
853 last_committed_barrier_time,
854 barrier_interval_by_database,
855 snapshot_backfill_barrier_latency,
856 snapshot_backfill_wait_commit_latency,
857 snapshot_backfill_lag,
858 snapshot_backfill_inflight_barrier_num,
859 recovery_failure_cnt,
860 recovery_latency,
861
862 max_committed_epoch,
863 min_committed_epoch,
864 level_sst_num,
865 level_compact_cnt,
866 compact_frequency,
867 compact_skip_frequency,
868 level_file_size,
869 version_size,
870 version_stats,
871 materialized_view_stats,
872 stale_object_count,
873 stale_object_size,
874 old_version_object_count,
875 old_version_object_size,
876 time_travel_object_count,
877 current_version_object_count,
878 current_version_object_size,
879 total_object_count,
880 total_object_size,
881 table_change_log_object_count,
882 table_change_log_object_size,
883 delta_log_count,
884 version_checkpoint_latency,
885 current_version_id,
886 checkpoint_version_id,
887 min_pinned_version_id,
888 min_safepoint_version_id,
889 write_stop_compaction_groups,
890 full_gc_trigger_count,
891 full_gc_candidate_object_count,
892 full_gc_selected_object_count,
893 hummock_manager_lock_time,
894 hummock_manager_real_process_time,
895 time_after_last_observation: Arc::new(AtomicU64::new(0)),
896 worker_num,
897 meta_type,
898 compact_pending_bytes,
899 compact_level_compression_ratio,
900 level_compact_task_cnt,
901 object_store_metric,
902 source_is_up,
903 source_enumerator_metrics,
904 actor_info,
905 table_info,
906 sink_info,
907 relation_info,
908 system_param_info,
909 l0_compact_level_count,
910 compact_task_size,
911 compact_task_file_count,
912 compact_task_batch_count,
913 compact_task_trivial_move_sst_count,
914 table_write_throughput,
915 split_compaction_group_count,
916 state_table_count,
917 branched_sst_count,
918 compaction_event_consumed_latency,
919 compaction_event_loop_iteration_latency,
920 auto_schema_change_failure_cnt,
921 auto_schema_change_success_cnt,
922 auto_schema_change_latency,
923 merge_compaction_group_count,
924 time_travel_version_replay_latency,
925 compaction_group_count,
926 compaction_group_size,
927 compaction_group_file_count,
928 compaction_group_throughput,
929 }
930 }
931
932 #[cfg(test)]
933 pub fn for_test(registry: &Registry) -> Self {
934 Self::new(registry)
935 }
936}
937impl Default for MetaMetrics {
938 fn default() -> Self {
939 GLOBAL_META_METRICS.clone()
940 }
941}
942
943pub async fn refresh_system_param_info_metrics(
945 system_params_controller: &SystemParamsControllerRef,
946 meta_metrics: Arc<MetaMetrics>,
947) {
948 let params_info = system_params_controller.get_params().await.get_all();
949
950 meta_metrics.system_param_info.reset();
951 for info in params_info {
952 meta_metrics
953 .system_param_info
954 .with_label_values(&[info.name, &info.value])
955 .set(1);
956 }
957}
958
959pub fn start_worker_info_monitor(
960 metadata_manager: MetadataManager,
961 election_client: ElectionClientRef,
962 interval: Duration,
963 meta_metrics: Arc<MetaMetrics>,
964) -> (JoinHandle<()>, Sender<()>) {
965 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
966 let join_handle = tokio::spawn(async move {
967 let mut monitor_interval = tokio::time::interval(interval);
968 monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
969 loop {
970 tokio::select! {
971 _ = monitor_interval.tick() => {},
973 _ = &mut shutdown_rx => {
975 tracing::info!("Worker number monitor is stopped");
976 return;
977 }
978 }
979
980 let node_map = match metadata_manager.count_worker_node().await {
981 Ok(node_map) => node_map,
982 Err(err) => {
983 tracing::warn!(error = %err.as_report(), "fail to count worker node");
984 continue;
985 }
986 };
987
988 meta_metrics.worker_num.reset();
990 meta_metrics.meta_type.reset();
991
992 for (worker_type, worker_num) in node_map {
993 meta_metrics
994 .worker_num
995 .with_label_values(&[(worker_type.as_str_name())])
996 .set(worker_num as i64);
997 }
998 if let Ok(meta_members) = election_client.get_members().await {
999 meta_metrics
1000 .worker_num
1001 .with_label_values(&[WorkerType::Meta.as_str_name()])
1002 .set(meta_members.len() as i64);
1003 meta_members.into_iter().for_each(|m| {
1004 let role = if m.is_leader { "leader" } else { "follower" };
1005 meta_metrics
1006 .meta_type
1007 .with_label_values(&[m.id.as_str(), role])
1008 .set(1);
1009 });
1010 }
1011 }
1012 });
1013
1014 (join_handle, shutdown_tx)
1015}
1016
1017pub async fn refresh_fragment_info_metrics(
1018 catalog_controller: &CatalogControllerRef,
1019 cluster_controller: &ClusterControllerRef,
1020 hummock_manager: &HummockManagerRef,
1021 meta_metrics: Arc<MetaMetrics>,
1022) {
1023 let worker_nodes = match cluster_controller
1024 .list_workers(Some(WorkerType::ComputeNode.into()), None)
1025 .await
1026 {
1027 Ok(worker_nodes) => worker_nodes,
1028 Err(err) => {
1029 tracing::warn!(error=%err.as_report(), "fail to list worker node");
1030 return;
1031 }
1032 };
1033 let actor_locations = match catalog_controller.list_actor_locations() {
1034 Ok(actor_locations) => actor_locations,
1035 Err(err) => {
1036 tracing::warn!(error=%err.as_report(), "fail to get actor locations");
1037 return;
1038 }
1039 };
1040 let sink_actor_mapping = match catalog_controller.list_sink_actor_mapping().await {
1041 Ok(sink_actor_mapping) => sink_actor_mapping,
1042 Err(err) => {
1043 tracing::warn!(error=%err.as_report(), "fail to get sink actor mapping");
1044 return;
1045 }
1046 };
1047 let fragment_state_tables = match catalog_controller.list_fragment_state_tables().await {
1048 Ok(fragment_state_tables) => fragment_state_tables,
1049 Err(err) => {
1050 tracing::warn!(error=%err.as_report(), "fail to get fragment state tables");
1051 return;
1052 }
1053 };
1054 let table_name_and_type_mapping = match catalog_controller.get_table_name_type_mapping().await {
1055 Ok(mapping) => mapping,
1056 Err(err) => {
1057 tracing::warn!(error=%err.as_report(), "fail to get table name mapping");
1058 return;
1059 }
1060 };
1061
1062 let worker_addr_mapping: HashMap<WorkerId, String> = worker_nodes
1063 .into_iter()
1064 .map(|worker_node| {
1065 let addr = match worker_node.host {
1066 Some(host) => format!("{}:{}", host.host, host.port),
1067 None => "".to_owned(),
1068 };
1069 (worker_node.id as WorkerId, addr)
1070 })
1071 .collect();
1072 let table_compaction_group_id_mapping = hummock_manager
1073 .get_table_compaction_group_id_mapping()
1074 .await;
1075
1076 meta_metrics.actor_info.reset();
1079 meta_metrics.table_info.reset();
1080 meta_metrics.sink_info.reset();
1081 for actor_location in actor_locations {
1082 let actor_id_str = actor_location.actor_id.to_string();
1083 let fragment_id_str = actor_location.fragment_id.to_string();
1084 if let Some(address) = worker_addr_mapping.get(&actor_location.worker_id) {
1087 meta_metrics
1088 .actor_info
1089 .with_label_values(&[&actor_id_str, &fragment_id_str, address])
1090 .set(1);
1091 }
1092 }
1093 for (sink_id, (sink_name, actor_ids)) in sink_actor_mapping {
1094 let sink_id_str = sink_id.to_string();
1095 for actor_id in actor_ids {
1096 let actor_id_str = actor_id.to_string();
1097 meta_metrics
1098 .sink_info
1099 .with_label_values(&[&actor_id_str, &sink_id_str, &sink_name])
1100 .set(1);
1101 }
1102 }
1103 for PartialFragmentStateTables {
1104 fragment_id,
1105 job_id,
1106 state_table_ids,
1107 } in fragment_state_tables
1108 {
1109 let fragment_id_str = fragment_id.to_string();
1110 let job_id_str = job_id.to_string();
1111 for table_id in state_table_ids.into_inner() {
1112 let table_id_str = table_id.to_string();
1113 let (table_name, table_type) = table_name_and_type_mapping
1114 .get(&table_id)
1115 .cloned()
1116 .unwrap_or_else(|| ("unknown".to_owned(), "unknown".to_owned()));
1117 let compaction_group_id = table_compaction_group_id_mapping
1118 .get(&(table_id as u32))
1119 .map(|cg_id| cg_id.to_string())
1120 .unwrap_or_else(|| "unknown".to_owned());
1121 meta_metrics
1122 .table_info
1123 .with_label_values(&[
1124 &job_id_str,
1125 &table_id_str,
1126 &fragment_id_str,
1127 &table_name,
1128 &table_type,
1129 &compaction_group_id,
1130 ])
1131 .set(1);
1132 }
1133 }
1134}
1135
1136pub async fn refresh_relation_info_metrics(
1137 catalog_controller: &CatalogControllerRef,
1138 meta_metrics: Arc<MetaMetrics>,
1139) {
1140 let table_objects = match catalog_controller.list_table_objects().await {
1141 Ok(table_objects) => table_objects,
1142 Err(err) => {
1143 tracing::warn!(error=%err.as_report(), "fail to get table objects");
1144 return;
1145 }
1146 };
1147
1148 let source_objects = match catalog_controller.list_source_objects().await {
1149 Ok(source_objects) => source_objects,
1150 Err(err) => {
1151 tracing::warn!(error=%err.as_report(), "fail to get source objects");
1152 return;
1153 }
1154 };
1155
1156 let sink_objects = match catalog_controller.list_sink_objects().await {
1157 Ok(sink_objects) => sink_objects,
1158 Err(err) => {
1159 tracing::warn!(error=%err.as_report(), "fail to get sink objects");
1160 return;
1161 }
1162 };
1163
1164 meta_metrics.relation_info.reset();
1165
1166 for (id, db, schema, name, resource_group) in table_objects {
1167 meta_metrics
1168 .relation_info
1169 .with_label_values(&[
1170 &id.to_string(),
1171 &db,
1172 &schema,
1173 &name,
1174 &resource_group,
1175 &"table".to_owned(),
1176 ])
1177 .set(1);
1178 }
1179
1180 for (id, db, schema, name, resource_group) in source_objects {
1181 meta_metrics
1182 .relation_info
1183 .with_label_values(&[
1184 &id.to_string(),
1185 &db,
1186 &schema,
1187 &name,
1188 &resource_group,
1189 &"source".to_owned(),
1190 ])
1191 .set(1);
1192 }
1193
1194 for (id, db, schema, name, resource_group) in sink_objects {
1195 meta_metrics
1196 .relation_info
1197 .with_label_values(&[
1198 &id.to_string(),
1199 &db,
1200 &schema,
1201 &name,
1202 &resource_group,
1203 &"sink".to_owned(),
1204 ])
1205 .set(1);
1206 }
1207}
1208
1209pub fn start_info_monitor(
1210 metadata_manager: MetadataManager,
1211 hummock_manager: HummockManagerRef,
1212 system_params_controller: SystemParamsControllerRef,
1213 meta_metrics: Arc<MetaMetrics>,
1214) -> (JoinHandle<()>, Sender<()>) {
1215 const COLLECT_INTERVAL_SECONDS: u64 = 60;
1216
1217 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1218 let join_handle = tokio::spawn(async move {
1219 let mut monitor_interval =
1220 tokio::time::interval(Duration::from_secs(COLLECT_INTERVAL_SECONDS));
1221 monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1222 loop {
1223 tokio::select! {
1224 _ = monitor_interval.tick() => {},
1226 _ = &mut shutdown_rx => {
1228 tracing::info!("Meta info monitor is stopped");
1229 return;
1230 }
1231 }
1232
1233 refresh_fragment_info_metrics(
1235 &metadata_manager.catalog_controller,
1236 &metadata_manager.cluster_controller,
1237 &hummock_manager,
1238 meta_metrics.clone(),
1239 )
1240 .await;
1241
1242 refresh_relation_info_metrics(
1243 &metadata_manager.catalog_controller,
1244 meta_metrics.clone(),
1245 )
1246 .await;
1247
1248 refresh_system_param_info_metrics(&system_params_controller, meta_metrics.clone())
1250 .await;
1251 }
1252 });
1253
1254 (join_handle, shutdown_tx)
1255}