1use std::collections::HashMap;
16use std::sync::atomic::AtomicU64;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use prometheus::core::{AtomicF64, GenericGaugeVec};
21use prometheus::{
22 GaugeVec, Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
23 exponential_buckets, histogram_opts, register_gauge_vec_with_registry,
24 register_histogram_vec_with_registry, register_histogram_with_registry,
25 register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
26 register_int_gauge_with_registry,
27};
28use risingwave_common::metrics::{
29 LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
30 LabelGuardedUintGaugeVec,
31};
32use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::{
35 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
36 register_guarded_int_gauge_vec_with_registry, register_guarded_uint_gauge_vec_with_registry,
37};
38use risingwave_connector::source::monitor::EnumeratorMetrics as SourceEnumeratorMetrics;
39use risingwave_meta_model::WorkerId;
40use risingwave_meta_model::table::TableType;
41use risingwave_object_store::object::object_metrics::{
42 GLOBAL_OBJECT_STORE_METRICS, ObjectStoreMetrics,
43};
44use risingwave_pb::common::WorkerType;
45use thiserror_ext::AsReport;
46use tokio::sync::oneshot::Sender;
47use tokio::task::JoinHandle;
48
49use crate::controller::catalog::CatalogControllerRef;
50use crate::controller::cluster::ClusterControllerRef;
51use crate::controller::system_param::SystemParamsControllerRef;
52use crate::controller::utils::PartialFragmentStateTables;
53use crate::hummock::HummockManagerRef;
54use crate::manager::MetadataManager;
55use crate::rpc::ElectionClientRef;
56
57#[derive(Clone)]
58pub struct MetaMetrics {
59 pub worker_num: IntGaugeVec,
62 pub meta_type: IntGaugeVec,
64
65 pub grpc_latency: HistogramVec,
68
69 pub barrier_latency: LabelGuardedHistogramVec,
73 pub barrier_wait_commit_latency: Histogram,
75 pub barrier_send_latency: LabelGuardedHistogramVec,
77 pub all_barrier_nums: LabelGuardedIntGaugeVec,
80 pub in_flight_barrier_nums: LabelGuardedIntGaugeVec,
82 pub last_committed_barrier_time: IntGaugeVec,
84 pub barrier_interval_by_database: GaugeVec,
86
87 pub snapshot_backfill_barrier_latency: LabelGuardedHistogramVec, pub snapshot_backfill_wait_commit_latency: LabelGuardedHistogramVec, pub snapshot_backfill_lag: LabelGuardedIntGaugeVec, pub snapshot_backfill_inflight_barrier_num: LabelGuardedIntGaugeVec, pub recovery_failure_cnt: IntCounterVec,
99 pub recovery_latency: HistogramVec,
100
101 pub max_committed_epoch: IntGauge,
104 pub min_committed_epoch: IntGauge,
106 pub level_sst_num: IntGaugeVec,
108 pub level_compact_cnt: IntGaugeVec,
110 pub compact_frequency: IntCounterVec,
112 pub level_file_size: IntGaugeVec,
114 pub version_size: IntGauge,
116 pub current_version_id: IntGauge,
118 pub checkpoint_version_id: IntGauge,
120 pub min_pinned_version_id: IntGauge,
122 pub min_safepoint_version_id: IntGauge,
124 pub write_stop_compaction_groups: IntGaugeVec,
126 pub full_gc_trigger_count: IntGauge,
128 pub full_gc_candidate_object_count: Histogram,
130 pub full_gc_selected_object_count: Histogram,
132 pub version_stats: IntGaugeVec,
134 pub materialized_view_stats: IntGaugeVec,
136 pub stale_object_count: IntGauge,
138 pub stale_object_size: IntGauge,
140 pub old_version_object_count: IntGauge,
142 pub old_version_object_size: IntGauge,
144 pub time_travel_object_count: IntGauge,
146 pub current_version_object_count: IntGauge,
148 pub current_version_object_size: IntGauge,
150 pub total_object_count: IntGauge,
152 pub total_object_size: IntGauge,
154 pub table_change_log_object_count: IntGaugeVec,
156 pub table_change_log_object_size: IntGaugeVec,
158 pub delta_log_count: IntGauge,
160 pub version_checkpoint_latency: Histogram,
162 pub hummock_manager_lock_time: HistogramVec,
164 pub hummock_manager_real_process_time: HistogramVec,
166 pub compact_skip_frequency: IntCounterVec,
168 pub compact_pending_bytes: IntGaugeVec,
170 pub compact_level_compression_ratio: GenericGaugeVec<AtomicF64>,
172 pub level_compact_task_cnt: IntGaugeVec,
174 pub time_after_last_observation: Arc<AtomicU64>,
175 pub l0_compact_level_count: HistogramVec,
176 pub compact_task_size: HistogramVec,
177 pub compact_task_file_count: HistogramVec,
178 pub compact_task_batch_count: HistogramVec,
179 pub split_compaction_group_count: IntCounterVec,
180 pub state_table_count: IntGaugeVec,
181 pub branched_sst_count: IntGaugeVec,
182 pub compact_task_trivial_move_sst_count: HistogramVec,
183
184 pub compaction_event_consumed_latency: Histogram,
185 pub compaction_event_loop_iteration_latency: Histogram,
186 pub time_travel_vacuum_metadata_latency: Histogram,
187 pub time_travel_write_metadata_latency: Histogram,
188
189 pub object_store_metric: Arc<ObjectStoreMetrics>,
192
193 pub source_is_up: LabelGuardedIntGaugeVec,
196 pub source_enumerator_metrics: Arc<SourceEnumeratorMetrics>,
197
198 pub actor_info: IntGaugeVec,
201 pub table_info: IntGaugeVec,
203 pub sink_info: IntGaugeVec,
205 pub relation_info: IntGaugeVec,
207
208 pub system_param_info: IntGaugeVec,
212
213 pub table_write_throughput: IntCounterVec,
215
216 pub merge_compaction_group_count: IntCounterVec,
218
219 pub auto_schema_change_failure_cnt: LabelGuardedIntCounterVec,
221 pub auto_schema_change_success_cnt: LabelGuardedIntCounterVec,
222 pub auto_schema_change_latency: LabelGuardedHistogramVec,
223
224 pub time_travel_version_replay_latency: Histogram,
225
226 pub compaction_group_count: IntGauge,
227 pub compaction_group_size: IntGaugeVec,
228 pub compaction_group_file_count: IntGaugeVec,
229 pub compaction_group_throughput: IntGaugeVec,
230
231 pub refresh_job_duration: LabelGuardedUintGaugeVec,
233 pub refresh_job_finish_cnt: LabelGuardedIntCounterVec,
234 pub refresh_cron_job_trigger_cnt: LabelGuardedIntCounterVec,
235 pub refresh_cron_job_miss_cnt: LabelGuardedIntCounterVec,
236}
237
238pub static GLOBAL_META_METRICS: LazyLock<MetaMetrics> =
239 LazyLock::new(|| MetaMetrics::new(&GLOBAL_METRICS_REGISTRY));
240
241impl MetaMetrics {
242 fn new(registry: &Registry) -> Self {
243 let opts = histogram_opts!(
244 "meta_grpc_duration_seconds",
245 "gRPC latency of meta services",
246 exponential_buckets(0.0001, 2.0, 20).unwrap() );
248 let grpc_latency =
249 register_histogram_vec_with_registry!(opts, &["path"], registry).unwrap();
250
251 let opts = histogram_opts!(
252 "meta_barrier_duration_seconds",
253 "barrier latency",
254 exponential_buckets(0.1, 1.5, 20).unwrap() );
256 let barrier_latency =
257 register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
258 .unwrap();
259
260 let opts = histogram_opts!(
261 "meta_barrier_wait_commit_duration_seconds",
262 "barrier_wait_commit_latency",
263 exponential_buckets(0.1, 1.5, 20).unwrap() );
265 let barrier_wait_commit_latency =
266 register_histogram_with_registry!(opts, registry).unwrap();
267
268 let opts = histogram_opts!(
269 "meta_barrier_send_duration_seconds",
270 "barrier send latency",
271 exponential_buckets(0.1, 1.5, 19).unwrap() );
273 let barrier_send_latency =
274 register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
275 .unwrap();
276 let barrier_interval_by_database = register_gauge_vec_with_registry!(
277 "meta_barrier_interval_by_database",
278 "barrier interval of each database",
279 &["database_id"],
280 registry
281 )
282 .unwrap();
283
284 let all_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
285 "all_barrier_nums",
286 "num of of all_barrier",
287 &["database_id"],
288 registry
289 )
290 .unwrap();
291 let in_flight_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
292 "in_flight_barrier_nums",
293 "num of of in_flight_barrier",
294 &["database_id"],
295 registry
296 )
297 .unwrap();
298 let last_committed_barrier_time = register_int_gauge_vec_with_registry!(
299 "last_committed_barrier_time",
300 "The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.",
301 &["database_id"],
302 registry
303 )
304 .unwrap();
305
306 let opts = histogram_opts!(
308 "meta_snapshot_backfill_barrier_duration_seconds",
309 "snapshot backfill barrier latency",
310 exponential_buckets(0.1, 1.5, 20).unwrap() );
312 let snapshot_backfill_barrier_latency = register_guarded_histogram_vec_with_registry!(
313 opts,
314 &["table_id", "barrier_type"],
315 registry
316 )
317 .unwrap();
318 let opts = histogram_opts!(
319 "meta_snapshot_backfill_barrier_wait_commit_duration_seconds",
320 "snapshot backfill barrier_wait_commit_latency",
321 exponential_buckets(0.1, 1.5, 20).unwrap() );
323 let snapshot_backfill_wait_commit_latency =
324 register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
325
326 let snapshot_backfill_lag = register_guarded_int_gauge_vec_with_registry!(
327 "meta_snapshot_backfill_upstream_lag",
328 "snapshot backfill upstream_lag",
329 &["table_id"],
330 registry
331 )
332 .unwrap();
333 let snapshot_backfill_inflight_barrier_num = register_guarded_int_gauge_vec_with_registry!(
334 "meta_snapshot_backfill_inflight_barrier_num",
335 "snapshot backfill inflight_barrier_num",
336 &["table_id"],
337 registry
338 )
339 .unwrap();
340
341 let max_committed_epoch = register_int_gauge_with_registry!(
342 "storage_max_committed_epoch",
343 "max committed epoch",
344 registry
345 )
346 .unwrap();
347
348 let min_committed_epoch = register_int_gauge_with_registry!(
349 "storage_min_committed_epoch",
350 "min committed epoch",
351 registry
352 )
353 .unwrap();
354
355 let level_sst_num = register_int_gauge_vec_with_registry!(
356 "storage_level_sst_num",
357 "num of SSTs in each level",
358 &["level_index"],
359 registry
360 )
361 .unwrap();
362
363 let level_compact_cnt = register_int_gauge_vec_with_registry!(
364 "storage_level_compact_cnt",
365 "num of SSTs to be merged to next level in each level",
366 &["level_index"],
367 registry
368 )
369 .unwrap();
370
371 let compact_frequency = register_int_counter_vec_with_registry!(
372 "storage_level_compact_frequency",
373 "The number of compactions from one level to another level that have completed or failed.",
374 &["compactor", "group", "task_type", "result"],
375 registry
376 )
377 .unwrap();
378 let compact_skip_frequency = register_int_counter_vec_with_registry!(
379 "storage_skip_compact_frequency",
380 "The number of compactions from one level to another level that have been skipped.",
381 &["level", "type"],
382 registry
383 )
384 .unwrap();
385
386 let version_size =
387 register_int_gauge_with_registry!("storage_version_size", "version size", registry)
388 .unwrap();
389
390 let current_version_id = register_int_gauge_with_registry!(
391 "storage_current_version_id",
392 "current version id",
393 registry
394 )
395 .unwrap();
396
397 let checkpoint_version_id = register_int_gauge_with_registry!(
398 "storage_checkpoint_version_id",
399 "checkpoint version id",
400 registry
401 )
402 .unwrap();
403
404 let min_pinned_version_id = register_int_gauge_with_registry!(
405 "storage_min_pinned_version_id",
406 "min pinned version id",
407 registry
408 )
409 .unwrap();
410
411 let write_stop_compaction_groups = register_int_gauge_vec_with_registry!(
412 "storage_write_stop_compaction_groups",
413 "compaction groups of write stop state",
414 &["compaction_group_id"],
415 registry
416 )
417 .unwrap();
418
419 let full_gc_trigger_count = register_int_gauge_with_registry!(
420 "storage_full_gc_trigger_count",
421 "the number of attempts to trigger full GC",
422 registry
423 )
424 .unwrap();
425
426 let opts = histogram_opts!(
427 "storage_full_gc_candidate_object_count",
428 "the number of candidate object to delete after scanning object store",
429 exponential_buckets(1.0, 10.0, 6).unwrap()
430 );
431 let full_gc_candidate_object_count =
432 register_histogram_with_registry!(opts, registry).unwrap();
433
434 let opts = histogram_opts!(
435 "storage_full_gc_selected_object_count",
436 "the number of object to delete after filtering by meta node",
437 exponential_buckets(1.0, 10.0, 6).unwrap()
438 );
439 let full_gc_selected_object_count =
440 register_histogram_with_registry!(opts, registry).unwrap();
441
442 let min_safepoint_version_id = register_int_gauge_with_registry!(
443 "storage_min_safepoint_version_id",
444 "min safepoint version id",
445 registry
446 )
447 .unwrap();
448
449 let level_file_size = register_int_gauge_vec_with_registry!(
450 "storage_level_total_file_size",
451 "KBs total file bytes in each level",
452 &["level_index"],
453 registry
454 )
455 .unwrap();
456
457 let version_stats = register_int_gauge_vec_with_registry!(
458 "storage_version_stats",
459 "per table stats in current hummock version",
460 &["table_id", "metric"],
461 registry
462 )
463 .unwrap();
464
465 let materialized_view_stats = register_int_gauge_vec_with_registry!(
466 "storage_materialized_view_stats",
467 "per materialized view stats in current hummock version",
468 &["table_id", "metric"],
469 registry
470 )
471 .unwrap();
472
473 let stale_object_count = register_int_gauge_with_registry!(
474 "storage_stale_object_count",
475 "total number of objects that is no longer referenced by versions.",
476 registry
477 )
478 .unwrap();
479
480 let stale_object_size = register_int_gauge_with_registry!(
481 "storage_stale_object_size",
482 "total size of objects that is no longer referenced by versions.",
483 registry
484 )
485 .unwrap();
486
487 let old_version_object_count = register_int_gauge_with_registry!(
488 "storage_old_version_object_count",
489 "total number of objects that is still referenced by non-current versions",
490 registry
491 )
492 .unwrap();
493
494 let old_version_object_size = register_int_gauge_with_registry!(
495 "storage_old_version_object_size",
496 "total size of objects that is still referenced by non-current versions",
497 registry
498 )
499 .unwrap();
500
501 let current_version_object_count = register_int_gauge_with_registry!(
502 "storage_current_version_object_count",
503 "total number of objects that is referenced by current version",
504 registry
505 )
506 .unwrap();
507
508 let current_version_object_size = register_int_gauge_with_registry!(
509 "storage_current_version_object_size",
510 "total size of objects that is referenced by current version",
511 registry
512 )
513 .unwrap();
514
515 let total_object_count = register_int_gauge_with_registry!(
516 "storage_total_object_count",
517 "Total number of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
518 registry
519 ).unwrap();
520
521 let total_object_size = register_int_gauge_with_registry!(
522 "storage_total_object_size",
523 "Total size of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
524 registry
525 ).unwrap();
526
527 let table_change_log_object_count = register_int_gauge_vec_with_registry!(
528 "storage_table_change_log_object_count",
529 "per table change log object count",
530 &["table_id"],
531 registry
532 )
533 .unwrap();
534
535 let table_change_log_object_size = register_int_gauge_vec_with_registry!(
536 "storage_table_change_log_object_size",
537 "per table change log object size",
538 &["table_id"],
539 registry
540 )
541 .unwrap();
542
543 let time_travel_object_count = register_int_gauge_with_registry!(
544 "storage_time_travel_object_count",
545 "total number of objects that is referenced by time travel.",
546 registry
547 )
548 .unwrap();
549
550 let delta_log_count = register_int_gauge_with_registry!(
551 "storage_delta_log_count",
552 "total number of hummock version delta log",
553 registry
554 )
555 .unwrap();
556
557 let opts = histogram_opts!(
558 "storage_version_checkpoint_latency",
559 "hummock version checkpoint latency",
560 exponential_buckets(0.1, 1.5, 20).unwrap()
561 );
562 let version_checkpoint_latency = register_histogram_with_registry!(opts, registry).unwrap();
563
564 let hummock_manager_lock_time = register_histogram_vec_with_registry!(
565 "hummock_manager_lock_time",
566 "latency for hummock manager to acquire the rwlock",
567 &["lock_name", "lock_type"],
568 registry
569 )
570 .unwrap();
571
572 let hummock_manager_real_process_time = register_histogram_vec_with_registry!(
573 "meta_hummock_manager_real_process_time",
574 "latency for hummock manager to really process the request",
575 &["method"],
576 registry
577 )
578 .unwrap();
579
580 let worker_num = register_int_gauge_vec_with_registry!(
581 "worker_num",
582 "number of nodes in the cluster",
583 &["worker_type"],
584 registry,
585 )
586 .unwrap();
587
588 let meta_type = register_int_gauge_vec_with_registry!(
589 "meta_num",
590 "role of meta nodes in the cluster",
591 &["worker_addr", "role"],
592 registry,
593 )
594 .unwrap();
595
596 let compact_pending_bytes = register_int_gauge_vec_with_registry!(
597 "storage_compact_pending_bytes",
598 "bytes of lsm tree needed to reach balance",
599 &["group"],
600 registry
601 )
602 .unwrap();
603
604 let compact_level_compression_ratio = register_gauge_vec_with_registry!(
605 "storage_compact_level_compression_ratio",
606 "compression ratio of each level of the lsm tree",
607 &["group", "level", "algorithm"],
608 registry
609 )
610 .unwrap();
611
612 let level_compact_task_cnt = register_int_gauge_vec_with_registry!(
613 "storage_level_compact_task_cnt",
614 "num of compact_task organized by group and level",
615 &["task"],
616 registry
617 )
618 .unwrap();
619
620 let time_travel_vacuum_metadata_latency = register_histogram_with_registry!(
621 histogram_opts!(
622 "storage_time_travel_vacuum_metadata_latency",
623 "Latency of vacuuming metadata for time travel",
624 exponential_buckets(0.1, 1.5, 20).unwrap()
625 ),
626 registry
627 )
628 .unwrap();
629 let time_travel_write_metadata_latency = register_histogram_with_registry!(
630 histogram_opts!(
631 "storage_time_travel_write_metadata_latency",
632 "Latency of writing metadata for time travel",
633 exponential_buckets(0.1, 1.5, 20).unwrap()
634 ),
635 registry
636 )
637 .unwrap();
638
639 let object_store_metric = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
640
641 let recovery_failure_cnt = register_int_counter_vec_with_registry!(
642 "recovery_failure_cnt",
643 "Number of failed recovery attempts",
644 &["recovery_type"],
645 registry
646 )
647 .unwrap();
648 let opts = histogram_opts!(
649 "recovery_latency",
650 "Latency of the recovery process",
651 exponential_buckets(0.1, 1.5, 20).unwrap() );
653 let recovery_latency =
654 register_histogram_vec_with_registry!(opts, &["recovery_type"], registry).unwrap();
655
656 let auto_schema_change_failure_cnt = register_guarded_int_counter_vec_with_registry!(
657 "auto_schema_change_failure_cnt",
658 "Number of failed auto schema change",
659 &["table_id", "table_name"],
660 registry
661 )
662 .unwrap();
663
664 let auto_schema_change_success_cnt = register_guarded_int_counter_vec_with_registry!(
665 "auto_schema_change_success_cnt",
666 "Number of success auto schema change",
667 &["table_id", "table_name"],
668 registry
669 )
670 .unwrap();
671
672 let opts = histogram_opts!(
673 "auto_schema_change_latency",
674 "Latency of the auto schema change process",
675 exponential_buckets(0.1, 1.5, 20).unwrap() );
677 let auto_schema_change_latency = register_guarded_histogram_vec_with_registry!(
678 opts,
679 &["table_id", "table_name"],
680 registry
681 )
682 .unwrap();
683
684 let source_is_up = register_guarded_int_gauge_vec_with_registry!(
685 "source_status_is_up",
686 "source is up or not",
687 &["source_id", "source_name"],
688 registry
689 )
690 .unwrap();
691 let source_enumerator_metrics = Arc::new(SourceEnumeratorMetrics::default());
692
693 let actor_info = register_int_gauge_vec_with_registry!(
694 "actor_info",
695 "Mapping from actor id to (fragment id, compute node)",
696 &["actor_id", "fragment_id", "compute_node"],
697 registry
698 )
699 .unwrap();
700
701 let table_info = register_int_gauge_vec_with_registry!(
702 "table_info",
703 "Mapping from table id to (actor id, table name)",
704 &[
705 "materialized_view_id",
706 "table_id",
707 "fragment_id",
708 "table_name",
709 "table_type",
710 "compaction_group_id"
711 ],
712 registry
713 )
714 .unwrap();
715
716 let sink_info = register_int_gauge_vec_with_registry!(
717 "sink_info",
718 "Mapping from actor id to (actor id, sink name)",
719 &["actor_id", "sink_id", "sink_name",],
720 registry
721 )
722 .unwrap();
723
724 let relation_info = register_int_gauge_vec_with_registry!(
725 "relation_info",
726 "Information of the database relation (table/source/sink/materialized view/index/internal)",
727 &["id", "database", "schema", "name", "resource_group", "type"],
728 registry
729 )
730 .unwrap();
731
732 let l0_compact_level_count = register_histogram_vec_with_registry!(
733 "storage_l0_compact_level_count",
734 "level_count of l0 compact task",
735 &["group", "type"],
736 registry
737 )
738 .unwrap();
739
740 let system_param_info = register_int_gauge_vec_with_registry!(
742 "system_param_info",
743 "Information of system parameters",
744 &["name", "value"],
745 registry
746 )
747 .unwrap();
748
749 let opts = histogram_opts!(
750 "storage_compact_task_size",
751 "Total size of compact that have been issued to state store",
752 exponential_buckets(1048576.0, 2.0, 16).unwrap()
753 );
754
755 let compact_task_size =
756 register_histogram_vec_with_registry!(opts, &["group", "type"], registry).unwrap();
757
758 let compact_task_file_count = register_histogram_vec_with_registry!(
759 "storage_compact_task_file_count",
760 "file count of compact task",
761 &["group", "type"],
762 registry
763 )
764 .unwrap();
765 let opts = histogram_opts!(
766 "storage_compact_task_batch_count",
767 "count of compact task batch",
768 exponential_buckets(1.0, 2.0, 8).unwrap()
769 );
770 let compact_task_batch_count =
771 register_histogram_vec_with_registry!(opts, &["type"], registry).unwrap();
772
773 let table_write_throughput = register_int_counter_vec_with_registry!(
774 "storage_commit_write_throughput",
775 "The number of compactions from one level to another level that have been skipped.",
776 &["table_id"],
777 registry
778 )
779 .unwrap();
780
781 let split_compaction_group_count = register_int_counter_vec_with_registry!(
782 "storage_split_compaction_group_count",
783 "Count of trigger split compaction group",
784 &["group"],
785 registry
786 )
787 .unwrap();
788
789 let state_table_count = register_int_gauge_vec_with_registry!(
790 "storage_state_table_count",
791 "Count of stable table per compaction group",
792 &["group"],
793 registry
794 )
795 .unwrap();
796
797 let branched_sst_count = register_int_gauge_vec_with_registry!(
798 "storage_branched_sst_count",
799 "Count of branched sst per compaction group",
800 &["group"],
801 registry
802 )
803 .unwrap();
804
805 let opts = histogram_opts!(
806 "storage_compaction_event_consumed_latency",
807 "The latency(ms) of each event being consumed",
808 exponential_buckets(1.0, 1.5, 30).unwrap() );
810 let compaction_event_consumed_latency =
811 register_histogram_with_registry!(opts, registry).unwrap();
812
813 let opts = histogram_opts!(
814 "storage_compaction_event_loop_iteration_latency",
815 "The latency(ms) of each iteration of the compaction event loop",
816 exponential_buckets(1.0, 1.5, 30).unwrap() );
818 let compaction_event_loop_iteration_latency =
819 register_histogram_with_registry!(opts, registry).unwrap();
820
821 let merge_compaction_group_count = register_int_counter_vec_with_registry!(
822 "storage_merge_compaction_group_count",
823 "Count of trigger merge compaction group",
824 &["group"],
825 registry
826 )
827 .unwrap();
828
829 let opts = histogram_opts!(
830 "storage_time_travel_version_replay_latency",
831 "The latency(ms) of replaying a hummock version for time travel",
832 exponential_buckets(0.01, 10.0, 6).unwrap()
833 );
834 let time_travel_version_replay_latency =
835 register_histogram_with_registry!(opts, registry).unwrap();
836
837 let compaction_group_count = register_int_gauge_with_registry!(
838 "storage_compaction_group_count",
839 "The number of compaction groups",
840 registry,
841 )
842 .unwrap();
843
844 let compaction_group_size = register_int_gauge_vec_with_registry!(
845 "storage_compaction_group_size",
846 "The size of compaction group",
847 &["group"],
848 registry
849 )
850 .unwrap();
851
852 let compaction_group_file_count = register_int_gauge_vec_with_registry!(
853 "storage_compaction_group_file_count",
854 "The file count of compaction group",
855 &["group"],
856 registry
857 )
858 .unwrap();
859
860 let compaction_group_throughput = register_int_gauge_vec_with_registry!(
861 "storage_compaction_group_throughput",
862 "The throughput of compaction group",
863 &["group"],
864 registry
865 )
866 .unwrap();
867
868 let opts = histogram_opts!(
869 "storage_compact_task_trivial_move_sst_count",
870 "sst count of compact trivial-move task",
871 exponential_buckets(1.0, 2.0, 8).unwrap()
872 );
873 let compact_task_trivial_move_sst_count =
874 register_histogram_vec_with_registry!(opts, &["group"], registry).unwrap();
875
876 let refresh_job_duration = register_guarded_uint_gauge_vec_with_registry!(
877 "meta_refresh_job_duration",
878 "The duration of refresh job",
879 &["table_id", "status"],
880 registry
881 )
882 .unwrap();
883 let refresh_job_finish_cnt = register_guarded_int_counter_vec_with_registry!(
884 "meta_refresh_job_finish_cnt",
885 "The number of finished refresh jobs",
886 &["table_id", "status"],
887 registry
888 )
889 .unwrap();
890 let refresh_cron_job_trigger_cnt = register_guarded_int_counter_vec_with_registry!(
891 "meta_refresh_cron_job_trigger_cnt",
892 "The number of cron refresh jobs triggered",
893 &["table_id"],
894 registry
895 )
896 .unwrap();
897 let refresh_cron_job_miss_cnt = register_guarded_int_counter_vec_with_registry!(
898 "meta_refresh_cron_job_miss_cnt",
899 "The number of cron refresh jobs missed",
900 &["table_id"],
901 registry
902 )
903 .unwrap();
904
905 Self {
906 grpc_latency,
907 barrier_latency,
908 barrier_wait_commit_latency,
909 barrier_send_latency,
910 all_barrier_nums,
911 in_flight_barrier_nums,
912 last_committed_barrier_time,
913 barrier_interval_by_database,
914 snapshot_backfill_barrier_latency,
915 snapshot_backfill_wait_commit_latency,
916 snapshot_backfill_lag,
917 snapshot_backfill_inflight_barrier_num,
918 recovery_failure_cnt,
919 recovery_latency,
920
921 max_committed_epoch,
922 min_committed_epoch,
923 level_sst_num,
924 level_compact_cnt,
925 compact_frequency,
926 compact_skip_frequency,
927 level_file_size,
928 version_size,
929 version_stats,
930 materialized_view_stats,
931 stale_object_count,
932 stale_object_size,
933 old_version_object_count,
934 old_version_object_size,
935 time_travel_object_count,
936 current_version_object_count,
937 current_version_object_size,
938 total_object_count,
939 total_object_size,
940 table_change_log_object_count,
941 table_change_log_object_size,
942 delta_log_count,
943 version_checkpoint_latency,
944 current_version_id,
945 checkpoint_version_id,
946 min_pinned_version_id,
947 min_safepoint_version_id,
948 write_stop_compaction_groups,
949 full_gc_trigger_count,
950 full_gc_candidate_object_count,
951 full_gc_selected_object_count,
952 hummock_manager_lock_time,
953 hummock_manager_real_process_time,
954 time_after_last_observation: Arc::new(AtomicU64::new(0)),
955 worker_num,
956 meta_type,
957 compact_pending_bytes,
958 compact_level_compression_ratio,
959 level_compact_task_cnt,
960 object_store_metric,
961 source_is_up,
962 source_enumerator_metrics,
963 actor_info,
964 table_info,
965 sink_info,
966 relation_info,
967 system_param_info,
968 l0_compact_level_count,
969 compact_task_size,
970 compact_task_file_count,
971 compact_task_batch_count,
972 compact_task_trivial_move_sst_count,
973 table_write_throughput,
974 split_compaction_group_count,
975 state_table_count,
976 branched_sst_count,
977 compaction_event_consumed_latency,
978 compaction_event_loop_iteration_latency,
979 auto_schema_change_failure_cnt,
980 auto_schema_change_success_cnt,
981 auto_schema_change_latency,
982 merge_compaction_group_count,
983 time_travel_version_replay_latency,
984 compaction_group_count,
985 compaction_group_size,
986 compaction_group_file_count,
987 compaction_group_throughput,
988 refresh_job_duration,
989 refresh_job_finish_cnt,
990 refresh_cron_job_trigger_cnt,
991 refresh_cron_job_miss_cnt,
992 time_travel_vacuum_metadata_latency,
993 time_travel_write_metadata_latency,
994 }
995 }
996
997 #[cfg(test)]
998 pub fn for_test(registry: &Registry) -> Self {
999 Self::new(registry)
1000 }
1001}
1002impl Default for MetaMetrics {
1003 fn default() -> Self {
1004 GLOBAL_META_METRICS.clone()
1005 }
1006}
1007
1008pub async fn refresh_system_param_info_metrics(
1010 system_params_controller: &SystemParamsControllerRef,
1011 meta_metrics: Arc<MetaMetrics>,
1012) {
1013 let params_info = system_params_controller.get_params().await.get_all();
1014
1015 meta_metrics.system_param_info.reset();
1016 for info in params_info {
1017 meta_metrics
1018 .system_param_info
1019 .with_label_values(&[info.name, &info.value])
1020 .set(1);
1021 }
1022}
1023
1024pub fn start_worker_info_monitor(
1025 metadata_manager: MetadataManager,
1026 election_client: ElectionClientRef,
1027 interval: Duration,
1028 meta_metrics: Arc<MetaMetrics>,
1029) -> (JoinHandle<()>, Sender<()>) {
1030 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1031 let join_handle = tokio::spawn(async move {
1032 let mut monitor_interval = tokio::time::interval(interval);
1033 monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1034 loop {
1035 tokio::select! {
1036 _ = monitor_interval.tick() => {},
1038 _ = &mut shutdown_rx => {
1040 tracing::info!("Worker number monitor is stopped");
1041 return;
1042 }
1043 }
1044
1045 let node_map = match metadata_manager.count_worker_node().await {
1046 Ok(node_map) => node_map,
1047 Err(err) => {
1048 tracing::warn!(error = %err.as_report(), "fail to count worker node");
1049 continue;
1050 }
1051 };
1052
1053 meta_metrics.worker_num.reset();
1055 meta_metrics.meta_type.reset();
1056
1057 for (worker_type, worker_num) in node_map {
1058 meta_metrics
1059 .worker_num
1060 .with_label_values(&[(worker_type.as_str_name())])
1061 .set(worker_num as i64);
1062 }
1063 if let Ok(meta_members) = election_client.get_members().await {
1064 meta_metrics
1065 .worker_num
1066 .with_label_values(&[WorkerType::Meta.as_str_name()])
1067 .set(meta_members.len() as i64);
1068 meta_members.into_iter().for_each(|m| {
1069 let role = if m.is_leader { "leader" } else { "follower" };
1070 meta_metrics
1071 .meta_type
1072 .with_label_values(&[m.id.as_str(), role])
1073 .set(1);
1074 });
1075 }
1076 }
1077 });
1078
1079 (join_handle, shutdown_tx)
1080}
1081
1082pub async fn refresh_fragment_info_metrics(
1083 catalog_controller: &CatalogControllerRef,
1084 cluster_controller: &ClusterControllerRef,
1085 hummock_manager: &HummockManagerRef,
1086 meta_metrics: Arc<MetaMetrics>,
1087) {
1088 let worker_nodes = match cluster_controller
1089 .list_workers(Some(WorkerType::ComputeNode.into()), None)
1090 .await
1091 {
1092 Ok(worker_nodes) => worker_nodes,
1093 Err(err) => {
1094 tracing::warn!(error=%err.as_report(), "fail to list worker node");
1095 return;
1096 }
1097 };
1098 let actor_locations = match catalog_controller.list_actor_locations() {
1099 Ok(actor_locations) => actor_locations,
1100 Err(err) => {
1101 tracing::warn!(error=%err.as_report(), "fail to get actor locations");
1102 return;
1103 }
1104 };
1105 let sink_actor_mapping = match catalog_controller.list_sink_actor_mapping().await {
1106 Ok(sink_actor_mapping) => sink_actor_mapping,
1107 Err(err) => {
1108 tracing::warn!(error=%err.as_report(), "fail to get sink actor mapping");
1109 return;
1110 }
1111 };
1112 let fragment_state_tables = match catalog_controller.list_fragment_state_tables().await {
1113 Ok(fragment_state_tables) => fragment_state_tables,
1114 Err(err) => {
1115 tracing::warn!(error=%err.as_report(), "fail to get fragment state tables");
1116 return;
1117 }
1118 };
1119 let table_name_and_type_mapping = match catalog_controller.get_table_name_type_mapping().await {
1120 Ok(mapping) => mapping,
1121 Err(err) => {
1122 tracing::warn!(error=%err.as_report(), "fail to get table name mapping");
1123 return;
1124 }
1125 };
1126
1127 let worker_addr_mapping: HashMap<WorkerId, String> = worker_nodes
1128 .into_iter()
1129 .map(|worker_node| {
1130 let addr = match worker_node.host {
1131 Some(host) => format!("{}:{}", host.host, host.port),
1132 None => "".to_owned(),
1133 };
1134 (worker_node.id, addr)
1135 })
1136 .collect();
1137 let table_compaction_group_id_mapping = hummock_manager
1138 .get_table_compaction_group_id_mapping()
1139 .await;
1140
1141 meta_metrics.actor_info.reset();
1144 meta_metrics.table_info.reset();
1145 meta_metrics.sink_info.reset();
1146 for actor_location in actor_locations {
1147 let actor_id_str = actor_location.actor_id.to_string();
1148 let fragment_id_str = actor_location.fragment_id.to_string();
1149 if let Some(address) = worker_addr_mapping.get(&actor_location.worker_id) {
1152 meta_metrics
1153 .actor_info
1154 .with_label_values(&[&actor_id_str, &fragment_id_str, address])
1155 .set(1);
1156 }
1157 }
1158 for (sink_id, (sink_name, actor_ids)) in sink_actor_mapping {
1159 let sink_id_str = sink_id.to_string();
1160 for actor_id in actor_ids {
1161 let actor_id_str = actor_id.to_string();
1162 meta_metrics
1163 .sink_info
1164 .with_label_values(&[&actor_id_str, &sink_id_str, &sink_name])
1165 .set(1);
1166 }
1167 }
1168 for PartialFragmentStateTables {
1169 fragment_id,
1170 job_id,
1171 state_table_ids,
1172 } in fragment_state_tables
1173 {
1174 let fragment_id_str = fragment_id.to_string();
1175 let job_id_str = job_id.to_string();
1176 for table_id in state_table_ids.into_inner() {
1177 let table_id_str = table_id.to_string();
1178 let (table_name, table_type) = table_name_and_type_mapping
1179 .get(&table_id)
1180 .cloned()
1181 .unwrap_or_else(|| ("unknown".to_owned(), "unknown".to_owned()));
1182 let compaction_group_id = table_compaction_group_id_mapping
1183 .get(&table_id)
1184 .map(|cg_id| cg_id.to_string())
1185 .unwrap_or_else(|| "unknown".to_owned());
1186 meta_metrics
1187 .table_info
1188 .with_label_values(&[
1189 &job_id_str,
1190 &table_id_str,
1191 &fragment_id_str,
1192 &table_name,
1193 &table_type,
1194 &compaction_group_id,
1195 ])
1196 .set(1);
1197 }
1198 }
1199}
1200
1201pub async fn refresh_relation_info_metrics(
1202 catalog_controller: &CatalogControllerRef,
1203 meta_metrics: Arc<MetaMetrics>,
1204) {
1205 let table_objects = match catalog_controller.list_table_objects().await {
1206 Ok(table_objects) => table_objects,
1207 Err(err) => {
1208 tracing::warn!(error=%err.as_report(), "fail to get table objects");
1209 return;
1210 }
1211 };
1212
1213 let source_objects = match catalog_controller.list_source_objects().await {
1214 Ok(source_objects) => source_objects,
1215 Err(err) => {
1216 tracing::warn!(error=%err.as_report(), "fail to get source objects");
1217 return;
1218 }
1219 };
1220
1221 let sink_objects = match catalog_controller.list_sink_objects().await {
1222 Ok(sink_objects) => sink_objects,
1223 Err(err) => {
1224 tracing::warn!(error=%err.as_report(), "fail to get sink objects");
1225 return;
1226 }
1227 };
1228
1229 meta_metrics.relation_info.reset();
1230
1231 for (id, db, schema, name, resource_group, table_type) in table_objects {
1232 let relation_type = match table_type {
1233 TableType::Table => "table",
1234 TableType::MaterializedView => "materialized_view",
1235 TableType::Index | TableType::VectorIndex => "index",
1236 TableType::Internal => "internal",
1237 };
1238 meta_metrics
1239 .relation_info
1240 .with_label_values(&[
1241 &id.to_string(),
1242 &db,
1243 &schema,
1244 &name,
1245 &resource_group,
1246 &relation_type.to_owned(),
1247 ])
1248 .set(1);
1249 }
1250
1251 for (id, db, schema, name, resource_group) in source_objects {
1252 meta_metrics
1253 .relation_info
1254 .with_label_values(&[
1255 &id.to_string(),
1256 &db,
1257 &schema,
1258 &name,
1259 &resource_group,
1260 &"source".to_owned(),
1261 ])
1262 .set(1);
1263 }
1264
1265 for (id, db, schema, name, resource_group) in sink_objects {
1266 meta_metrics
1267 .relation_info
1268 .with_label_values(&[
1269 &id.to_string(),
1270 &db,
1271 &schema,
1272 &name,
1273 &resource_group,
1274 &"sink".to_owned(),
1275 ])
1276 .set(1);
1277 }
1278}
1279
1280pub fn start_info_monitor(
1281 metadata_manager: MetadataManager,
1282 hummock_manager: HummockManagerRef,
1283 system_params_controller: SystemParamsControllerRef,
1284 meta_metrics: Arc<MetaMetrics>,
1285) -> (JoinHandle<()>, Sender<()>) {
1286 const COLLECT_INTERVAL_SECONDS: u64 = 60;
1287
1288 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1289 let join_handle = tokio::spawn(async move {
1290 let mut monitor_interval =
1291 tokio::time::interval(Duration::from_secs(COLLECT_INTERVAL_SECONDS));
1292 monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1293 loop {
1294 tokio::select! {
1295 _ = monitor_interval.tick() => {},
1297 _ = &mut shutdown_rx => {
1299 tracing::info!("Meta info monitor is stopped");
1300 return;
1301 }
1302 }
1303
1304 refresh_fragment_info_metrics(
1306 &metadata_manager.catalog_controller,
1307 &metadata_manager.cluster_controller,
1308 &hummock_manager,
1309 meta_metrics.clone(),
1310 )
1311 .await;
1312
1313 refresh_relation_info_metrics(
1314 &metadata_manager.catalog_controller,
1315 meta_metrics.clone(),
1316 )
1317 .await;
1318
1319 refresh_system_param_info_metrics(&system_params_controller, meta_metrics.clone())
1321 .await;
1322 }
1323 });
1324
1325 (join_handle, shutdown_tx)
1326}