1use std::collections::HashMap;
16use std::sync::atomic::AtomicU64;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use prometheus::core::{AtomicF64, GenericGaugeVec};
21use prometheus::{
22 GaugeVec, Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
23 exponential_buckets, histogram_opts, register_gauge_vec_with_registry,
24 register_histogram_vec_with_registry, register_histogram_with_registry,
25 register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
26 register_int_gauge_with_registry,
27};
28use risingwave_common::metrics::{
29 LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
30};
31use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
32use risingwave_common::{
33 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
34 register_guarded_int_gauge_vec_with_registry,
35};
36use risingwave_connector::source::monitor::EnumeratorMetrics as SourceEnumeratorMetrics;
37use risingwave_meta_model::WorkerId;
38use risingwave_object_store::object::object_metrics::{
39 GLOBAL_OBJECT_STORE_METRICS, ObjectStoreMetrics,
40};
41use risingwave_pb::common::WorkerType;
42use thiserror_ext::AsReport;
43use tokio::sync::oneshot::Sender;
44use tokio::task::JoinHandle;
45
46use crate::controller::catalog::CatalogControllerRef;
47use crate::controller::cluster::ClusterControllerRef;
48use crate::controller::utils::PartialFragmentStateTables;
49use crate::hummock::HummockManagerRef;
50use crate::manager::MetadataManager;
51use crate::rpc::ElectionClientRef;
52
53#[derive(Clone)]
54pub struct MetaMetrics {
55 pub worker_num: IntGaugeVec,
58 pub meta_type: IntGaugeVec,
60
61 pub grpc_latency: HistogramVec,
64
65 pub barrier_latency: LabelGuardedHistogramVec,
69 pub barrier_wait_commit_latency: Histogram,
71 pub barrier_send_latency: LabelGuardedHistogramVec,
73 pub all_barrier_nums: LabelGuardedIntGaugeVec,
76 pub in_flight_barrier_nums: LabelGuardedIntGaugeVec,
78 pub last_committed_barrier_time: IntGaugeVec,
80 pub barrier_interval_by_database: GaugeVec,
82
83 pub snapshot_backfill_barrier_latency: LabelGuardedHistogramVec, pub snapshot_backfill_wait_commit_latency: LabelGuardedHistogramVec, pub snapshot_backfill_lag: LabelGuardedIntGaugeVec, pub snapshot_backfill_inflight_barrier_num: LabelGuardedIntGaugeVec, pub recovery_failure_cnt: IntCounterVec,
95 pub recovery_latency: HistogramVec,
96
97 pub max_committed_epoch: IntGauge,
100 pub min_committed_epoch: IntGauge,
102 pub level_sst_num: IntGaugeVec,
104 pub level_compact_cnt: IntGaugeVec,
106 pub compact_frequency: IntCounterVec,
108 pub level_file_size: IntGaugeVec,
110 pub version_size: IntGauge,
112 pub current_version_id: IntGauge,
114 pub checkpoint_version_id: IntGauge,
116 pub min_pinned_version_id: IntGauge,
118 pub min_safepoint_version_id: IntGauge,
120 pub write_stop_compaction_groups: IntGaugeVec,
122 pub full_gc_trigger_count: IntGauge,
124 pub full_gc_candidate_object_count: Histogram,
126 pub full_gc_selected_object_count: Histogram,
128 pub version_stats: IntGaugeVec,
130 pub materialized_view_stats: IntGaugeVec,
132 pub stale_object_count: IntGauge,
134 pub stale_object_size: IntGauge,
136 pub old_version_object_count: IntGauge,
138 pub old_version_object_size: IntGauge,
140 pub time_travel_object_count: IntGauge,
142 pub current_version_object_count: IntGauge,
144 pub current_version_object_size: IntGauge,
146 pub total_object_count: IntGauge,
148 pub total_object_size: IntGauge,
150 pub table_change_log_object_count: IntGaugeVec,
152 pub table_change_log_object_size: IntGaugeVec,
154 pub delta_log_count: IntGauge,
156 pub version_checkpoint_latency: Histogram,
158 pub hummock_manager_lock_time: HistogramVec,
160 pub hummock_manager_real_process_time: HistogramVec,
162 pub compact_skip_frequency: IntCounterVec,
164 pub compact_pending_bytes: IntGaugeVec,
166 pub compact_level_compression_ratio: GenericGaugeVec<AtomicF64>,
168 pub level_compact_task_cnt: IntGaugeVec,
170 pub time_after_last_observation: Arc<AtomicU64>,
171 pub l0_compact_level_count: HistogramVec,
172 pub compact_task_size: HistogramVec,
173 pub compact_task_file_count: HistogramVec,
174 pub compact_task_batch_count: HistogramVec,
175 pub split_compaction_group_count: IntCounterVec,
176 pub state_table_count: IntGaugeVec,
177 pub branched_sst_count: IntGaugeVec,
178 pub compact_task_trivial_move_sst_count: HistogramVec,
179
180 pub compaction_event_consumed_latency: Histogram,
181 pub compaction_event_loop_iteration_latency: Histogram,
182
183 pub object_store_metric: Arc<ObjectStoreMetrics>,
186
187 pub source_is_up: LabelGuardedIntGaugeVec,
190 pub source_enumerator_metrics: Arc<SourceEnumeratorMetrics>,
191
192 pub actor_info: IntGaugeVec,
195 pub table_info: IntGaugeVec,
197 pub sink_info: IntGaugeVec,
199 pub relation_info: IntGaugeVec,
201
202 pub table_write_throughput: IntCounterVec,
204
205 pub merge_compaction_group_count: IntCounterVec,
207
208 pub auto_schema_change_failure_cnt: LabelGuardedIntCounterVec,
210 pub auto_schema_change_success_cnt: LabelGuardedIntCounterVec,
211 pub auto_schema_change_latency: LabelGuardedHistogramVec,
212
213 pub time_travel_version_replay_latency: Histogram,
214
215 pub compaction_group_count: IntGauge,
216 pub compaction_group_size: IntGaugeVec,
217 pub compaction_group_file_count: IntGaugeVec,
218 pub compaction_group_throughput: IntGaugeVec,
219}
220
221pub static GLOBAL_META_METRICS: LazyLock<MetaMetrics> =
222 LazyLock::new(|| MetaMetrics::new(&GLOBAL_METRICS_REGISTRY));
223
224impl MetaMetrics {
225 fn new(registry: &Registry) -> Self {
226 let opts = histogram_opts!(
227 "meta_grpc_duration_seconds",
228 "gRPC latency of meta services",
229 exponential_buckets(0.0001, 2.0, 20).unwrap() );
231 let grpc_latency =
232 register_histogram_vec_with_registry!(opts, &["path"], registry).unwrap();
233
234 let opts = histogram_opts!(
235 "meta_barrier_duration_seconds",
236 "barrier latency",
237 exponential_buckets(0.1, 1.5, 20).unwrap() );
239 let barrier_latency =
240 register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
241 .unwrap();
242
243 let opts = histogram_opts!(
244 "meta_barrier_wait_commit_duration_seconds",
245 "barrier_wait_commit_latency",
246 exponential_buckets(0.1, 1.5, 20).unwrap() );
248 let barrier_wait_commit_latency =
249 register_histogram_with_registry!(opts, registry).unwrap();
250
251 let opts = histogram_opts!(
252 "meta_barrier_send_duration_seconds",
253 "barrier send latency",
254 exponential_buckets(0.1, 1.5, 19).unwrap() );
256 let barrier_send_latency =
257 register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
258 .unwrap();
259 let barrier_interval_by_database = register_gauge_vec_with_registry!(
260 "meta_barrier_interval_by_database",
261 "barrier interval of each database",
262 &["database_id"],
263 registry
264 )
265 .unwrap();
266
267 let all_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
268 "all_barrier_nums",
269 "num of of all_barrier",
270 &["database_id"],
271 registry
272 )
273 .unwrap();
274 let in_flight_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
275 "in_flight_barrier_nums",
276 "num of of in_flight_barrier",
277 &["database_id"],
278 registry
279 )
280 .unwrap();
281 let last_committed_barrier_time = register_int_gauge_vec_with_registry!(
282 "last_committed_barrier_time",
283 "The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.",
284 &["database_id"],
285 registry
286 )
287 .unwrap();
288
289 let opts = histogram_opts!(
291 "meta_snapshot_backfill_barrier_duration_seconds",
292 "snapshot backfill barrier latency",
293 exponential_buckets(0.1, 1.5, 20).unwrap() );
295 let snapshot_backfill_barrier_latency = register_guarded_histogram_vec_with_registry!(
296 opts,
297 &["table_id", "barrier_type"],
298 registry
299 )
300 .unwrap();
301 let opts = histogram_opts!(
302 "meta_snapshot_backfill_barrier_wait_commit_duration_seconds",
303 "snapshot backfill barrier_wait_commit_latency",
304 exponential_buckets(0.1, 1.5, 20).unwrap() );
306 let snapshot_backfill_wait_commit_latency =
307 register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
308
309 let snapshot_backfill_lag = register_guarded_int_gauge_vec_with_registry!(
310 "meta_snapshot_backfill_upstream_lag",
311 "snapshot backfill upstream_lag",
312 &["table_id"],
313 registry
314 )
315 .unwrap();
316 let snapshot_backfill_inflight_barrier_num = register_guarded_int_gauge_vec_with_registry!(
317 "meta_snapshot_backfill_inflight_barrier_num",
318 "snapshot backfill inflight_barrier_num",
319 &["table_id"],
320 registry
321 )
322 .unwrap();
323
324 let max_committed_epoch = register_int_gauge_with_registry!(
325 "storage_max_committed_epoch",
326 "max committed epoch",
327 registry
328 )
329 .unwrap();
330
331 let min_committed_epoch = register_int_gauge_with_registry!(
332 "storage_min_committed_epoch",
333 "min committed epoch",
334 registry
335 )
336 .unwrap();
337
338 let level_sst_num = register_int_gauge_vec_with_registry!(
339 "storage_level_sst_num",
340 "num of SSTs in each level",
341 &["level_index"],
342 registry
343 )
344 .unwrap();
345
346 let level_compact_cnt = register_int_gauge_vec_with_registry!(
347 "storage_level_compact_cnt",
348 "num of SSTs to be merged to next level in each level",
349 &["level_index"],
350 registry
351 )
352 .unwrap();
353
354 let compact_frequency = register_int_counter_vec_with_registry!(
355 "storage_level_compact_frequency",
356 "The number of compactions from one level to another level that have completed or failed.",
357 &["compactor", "group", "task_type", "result"],
358 registry
359 )
360 .unwrap();
361 let compact_skip_frequency = register_int_counter_vec_with_registry!(
362 "storage_skip_compact_frequency",
363 "The number of compactions from one level to another level that have been skipped.",
364 &["level", "type"],
365 registry
366 )
367 .unwrap();
368
369 let version_size =
370 register_int_gauge_with_registry!("storage_version_size", "version size", registry)
371 .unwrap();
372
373 let current_version_id = register_int_gauge_with_registry!(
374 "storage_current_version_id",
375 "current version id",
376 registry
377 )
378 .unwrap();
379
380 let checkpoint_version_id = register_int_gauge_with_registry!(
381 "storage_checkpoint_version_id",
382 "checkpoint version id",
383 registry
384 )
385 .unwrap();
386
387 let min_pinned_version_id = register_int_gauge_with_registry!(
388 "storage_min_pinned_version_id",
389 "min pinned version id",
390 registry
391 )
392 .unwrap();
393
394 let write_stop_compaction_groups = register_int_gauge_vec_with_registry!(
395 "storage_write_stop_compaction_groups",
396 "compaction groups of write stop state",
397 &["compaction_group_id"],
398 registry
399 )
400 .unwrap();
401
402 let full_gc_trigger_count = register_int_gauge_with_registry!(
403 "storage_full_gc_trigger_count",
404 "the number of attempts to trigger full GC",
405 registry
406 )
407 .unwrap();
408
409 let opts = histogram_opts!(
410 "storage_full_gc_candidate_object_count",
411 "the number of candidate object to delete after scanning object store",
412 exponential_buckets(1.0, 10.0, 6).unwrap()
413 );
414 let full_gc_candidate_object_count =
415 register_histogram_with_registry!(opts, registry).unwrap();
416
417 let opts = histogram_opts!(
418 "storage_full_gc_selected_object_count",
419 "the number of object to delete after filtering by meta node",
420 exponential_buckets(1.0, 10.0, 6).unwrap()
421 );
422 let full_gc_selected_object_count =
423 register_histogram_with_registry!(opts, registry).unwrap();
424
425 let min_safepoint_version_id = register_int_gauge_with_registry!(
426 "storage_min_safepoint_version_id",
427 "min safepoint version id",
428 registry
429 )
430 .unwrap();
431
432 let level_file_size = register_int_gauge_vec_with_registry!(
433 "storage_level_total_file_size",
434 "KBs total file bytes in each level",
435 &["level_index"],
436 registry
437 )
438 .unwrap();
439
440 let version_stats = register_int_gauge_vec_with_registry!(
441 "storage_version_stats",
442 "per table stats in current hummock version",
443 &["table_id", "metric"],
444 registry
445 )
446 .unwrap();
447
448 let materialized_view_stats = register_int_gauge_vec_with_registry!(
449 "storage_materialized_view_stats",
450 "per materialized view stats in current hummock version",
451 &["table_id", "metric"],
452 registry
453 )
454 .unwrap();
455
456 let stale_object_count = register_int_gauge_with_registry!(
457 "storage_stale_object_count",
458 "total number of objects that is no longer referenced by versions.",
459 registry
460 )
461 .unwrap();
462
463 let stale_object_size = register_int_gauge_with_registry!(
464 "storage_stale_object_size",
465 "total size of objects that is no longer referenced by versions.",
466 registry
467 )
468 .unwrap();
469
470 let old_version_object_count = register_int_gauge_with_registry!(
471 "storage_old_version_object_count",
472 "total number of objects that is still referenced by non-current versions",
473 registry
474 )
475 .unwrap();
476
477 let old_version_object_size = register_int_gauge_with_registry!(
478 "storage_old_version_object_size",
479 "total size of objects that is still referenced by non-current versions",
480 registry
481 )
482 .unwrap();
483
484 let current_version_object_count = register_int_gauge_with_registry!(
485 "storage_current_version_object_count",
486 "total number of objects that is referenced by current version",
487 registry
488 )
489 .unwrap();
490
491 let current_version_object_size = register_int_gauge_with_registry!(
492 "storage_current_version_object_size",
493 "total size of objects that is referenced by current version",
494 registry
495 )
496 .unwrap();
497
498 let total_object_count = register_int_gauge_with_registry!(
499 "storage_total_object_count",
500 "Total number of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
501 registry
502 ).unwrap();
503
504 let total_object_size = register_int_gauge_with_registry!(
505 "storage_total_object_size",
506 "Total size of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
507 registry
508 ).unwrap();
509
510 let table_change_log_object_count = register_int_gauge_vec_with_registry!(
511 "storage_table_change_log_object_count",
512 "per table change log object count",
513 &["table_id"],
514 registry
515 )
516 .unwrap();
517
518 let table_change_log_object_size = register_int_gauge_vec_with_registry!(
519 "storage_table_change_log_object_size",
520 "per table change log object size",
521 &["table_id"],
522 registry
523 )
524 .unwrap();
525
526 let time_travel_object_count = register_int_gauge_with_registry!(
527 "storage_time_travel_object_count",
528 "total number of objects that is referenced by time travel.",
529 registry
530 )
531 .unwrap();
532
533 let delta_log_count = register_int_gauge_with_registry!(
534 "storage_delta_log_count",
535 "total number of hummock version delta log",
536 registry
537 )
538 .unwrap();
539
540 let opts = histogram_opts!(
541 "storage_version_checkpoint_latency",
542 "hummock version checkpoint latency",
543 exponential_buckets(0.1, 1.5, 20).unwrap()
544 );
545 let version_checkpoint_latency = register_histogram_with_registry!(opts, registry).unwrap();
546
547 let hummock_manager_lock_time = register_histogram_vec_with_registry!(
548 "hummock_manager_lock_time",
549 "latency for hummock manager to acquire the rwlock",
550 &["lock_name", "lock_type"],
551 registry
552 )
553 .unwrap();
554
555 let hummock_manager_real_process_time = register_histogram_vec_with_registry!(
556 "meta_hummock_manager_real_process_time",
557 "latency for hummock manager to really process the request",
558 &["method"],
559 registry
560 )
561 .unwrap();
562
563 let worker_num = register_int_gauge_vec_with_registry!(
564 "worker_num",
565 "number of nodes in the cluster",
566 &["worker_type"],
567 registry,
568 )
569 .unwrap();
570
571 let meta_type = register_int_gauge_vec_with_registry!(
572 "meta_num",
573 "role of meta nodes in the cluster",
574 &["worker_addr", "role"],
575 registry,
576 )
577 .unwrap();
578
579 let compact_pending_bytes = register_int_gauge_vec_with_registry!(
580 "storage_compact_pending_bytes",
581 "bytes of lsm tree needed to reach balance",
582 &["group"],
583 registry
584 )
585 .unwrap();
586
587 let compact_level_compression_ratio = register_gauge_vec_with_registry!(
588 "storage_compact_level_compression_ratio",
589 "compression ratio of each level of the lsm tree",
590 &["group", "level", "algorithm"],
591 registry
592 )
593 .unwrap();
594
595 let level_compact_task_cnt = register_int_gauge_vec_with_registry!(
596 "storage_level_compact_task_cnt",
597 "num of compact_task organized by group and level",
598 &["task"],
599 registry
600 )
601 .unwrap();
602 let object_store_metric = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
603
604 let recovery_failure_cnt = register_int_counter_vec_with_registry!(
605 "recovery_failure_cnt",
606 "Number of failed recovery attempts",
607 &["recovery_type"],
608 registry
609 )
610 .unwrap();
611 let opts = histogram_opts!(
612 "recovery_latency",
613 "Latency of the recovery process",
614 exponential_buckets(0.1, 1.5, 20).unwrap() );
616 let recovery_latency =
617 register_histogram_vec_with_registry!(opts, &["recovery_type"], registry).unwrap();
618
619 let auto_schema_change_failure_cnt = register_guarded_int_counter_vec_with_registry!(
620 "auto_schema_change_failure_cnt",
621 "Number of failed auto schema change",
622 &["table_id", "table_name"],
623 registry
624 )
625 .unwrap();
626
627 let auto_schema_change_success_cnt = register_guarded_int_counter_vec_with_registry!(
628 "auto_schema_change_success_cnt",
629 "Number of success auto schema change",
630 &["table_id", "table_name"],
631 registry
632 )
633 .unwrap();
634
635 let opts = histogram_opts!(
636 "auto_schema_change_latency",
637 "Latency of the auto schema change process",
638 exponential_buckets(0.1, 1.5, 20).unwrap() );
640 let auto_schema_change_latency = register_guarded_histogram_vec_with_registry!(
641 opts,
642 &["table_id", "table_name"],
643 registry
644 )
645 .unwrap();
646
647 let source_is_up = register_guarded_int_gauge_vec_with_registry!(
648 "source_status_is_up",
649 "source is up or not",
650 &["source_id", "source_name"],
651 registry
652 )
653 .unwrap();
654 let source_enumerator_metrics = Arc::new(SourceEnumeratorMetrics::default());
655
656 let actor_info = register_int_gauge_vec_with_registry!(
657 "actor_info",
658 "Mapping from actor id to (fragment id, compute node)",
659 &["actor_id", "fragment_id", "compute_node"],
660 registry
661 )
662 .unwrap();
663
664 let table_info = register_int_gauge_vec_with_registry!(
665 "table_info",
666 "Mapping from table id to (actor id, table name)",
667 &[
668 "materialized_view_id",
669 "table_id",
670 "fragment_id",
671 "table_name",
672 "table_type",
673 "compaction_group_id"
674 ],
675 registry
676 )
677 .unwrap();
678
679 let sink_info = register_int_gauge_vec_with_registry!(
680 "sink_info",
681 "Mapping from actor id to (actor id, sink name)",
682 &["actor_id", "sink_id", "sink_name",],
683 registry
684 )
685 .unwrap();
686
687 let relation_info = register_int_gauge_vec_with_registry!(
688 "relation_info",
689 "Information of the database relation (table/source/sink)",
690 &["id", "database", "schema", "name", "resource_group", "type"],
691 registry
692 )
693 .unwrap();
694
695 let l0_compact_level_count = register_histogram_vec_with_registry!(
696 "storage_l0_compact_level_count",
697 "level_count of l0 compact task",
698 &["group", "type"],
699 registry
700 )
701 .unwrap();
702
703 let opts = histogram_opts!(
704 "storage_compact_task_size",
705 "Total size of compact that have been issued to state store",
706 exponential_buckets(1048576.0, 2.0, 16).unwrap()
707 );
708
709 let compact_task_size =
710 register_histogram_vec_with_registry!(opts, &["group", "type"], registry).unwrap();
711
712 let compact_task_file_count = register_histogram_vec_with_registry!(
713 "storage_compact_task_file_count",
714 "file count of compact task",
715 &["group", "type"],
716 registry
717 )
718 .unwrap();
719 let opts = histogram_opts!(
720 "storage_compact_task_batch_count",
721 "count of compact task batch",
722 exponential_buckets(1.0, 2.0, 8).unwrap()
723 );
724 let compact_task_batch_count =
725 register_histogram_vec_with_registry!(opts, &["type"], registry).unwrap();
726
727 let table_write_throughput = register_int_counter_vec_with_registry!(
728 "storage_commit_write_throughput",
729 "The number of compactions from one level to another level that have been skipped.",
730 &["table_id"],
731 registry
732 )
733 .unwrap();
734
735 let split_compaction_group_count = register_int_counter_vec_with_registry!(
736 "storage_split_compaction_group_count",
737 "Count of trigger split compaction group",
738 &["group"],
739 registry
740 )
741 .unwrap();
742
743 let state_table_count = register_int_gauge_vec_with_registry!(
744 "storage_state_table_count",
745 "Count of stable table per compaction group",
746 &["group"],
747 registry
748 )
749 .unwrap();
750
751 let branched_sst_count = register_int_gauge_vec_with_registry!(
752 "storage_branched_sst_count",
753 "Count of branched sst per compaction group",
754 &["group"],
755 registry
756 )
757 .unwrap();
758
759 let opts = histogram_opts!(
760 "storage_compaction_event_consumed_latency",
761 "The latency(ms) of each event being consumed",
762 exponential_buckets(1.0, 1.5, 30).unwrap() );
764 let compaction_event_consumed_latency =
765 register_histogram_with_registry!(opts, registry).unwrap();
766
767 let opts = histogram_opts!(
768 "storage_compaction_event_loop_iteration_latency",
769 "The latency(ms) of each iteration of the compaction event loop",
770 exponential_buckets(1.0, 1.5, 30).unwrap() );
772 let compaction_event_loop_iteration_latency =
773 register_histogram_with_registry!(opts, registry).unwrap();
774
775 let merge_compaction_group_count = register_int_counter_vec_with_registry!(
776 "storage_merge_compaction_group_count",
777 "Count of trigger merge compaction group",
778 &["group"],
779 registry
780 )
781 .unwrap();
782
783 let opts = histogram_opts!(
784 "storage_time_travel_version_replay_latency",
785 "The latency(ms) of replaying a hummock version for time travel",
786 exponential_buckets(0.01, 10.0, 6).unwrap()
787 );
788 let time_travel_version_replay_latency =
789 register_histogram_with_registry!(opts, registry).unwrap();
790
791 let compaction_group_count = register_int_gauge_with_registry!(
792 "storage_compaction_group_count",
793 "The number of compaction groups",
794 registry,
795 )
796 .unwrap();
797
798 let compaction_group_size = register_int_gauge_vec_with_registry!(
799 "storage_compaction_group_size",
800 "The size of compaction group",
801 &["group"],
802 registry
803 )
804 .unwrap();
805
806 let compaction_group_file_count = register_int_gauge_vec_with_registry!(
807 "storage_compaction_group_file_count",
808 "The file count of compaction group",
809 &["group"],
810 registry
811 )
812 .unwrap();
813
814 let compaction_group_throughput = register_int_gauge_vec_with_registry!(
815 "storage_compaction_group_throughput",
816 "The throughput of compaction group",
817 &["group"],
818 registry
819 )
820 .unwrap();
821
822 let opts = histogram_opts!(
823 "storage_compact_task_trivial_move_sst_count",
824 "sst count of compact trivial-move task",
825 exponential_buckets(1.0, 2.0, 8).unwrap()
826 );
827 let compact_task_trivial_move_sst_count =
828 register_histogram_vec_with_registry!(opts, &["group"], registry).unwrap();
829
830 Self {
831 grpc_latency,
832 barrier_latency,
833 barrier_wait_commit_latency,
834 barrier_send_latency,
835 all_barrier_nums,
836 in_flight_barrier_nums,
837 last_committed_barrier_time,
838 barrier_interval_by_database,
839 snapshot_backfill_barrier_latency,
840 snapshot_backfill_wait_commit_latency,
841 snapshot_backfill_lag,
842 snapshot_backfill_inflight_barrier_num,
843 recovery_failure_cnt,
844 recovery_latency,
845
846 max_committed_epoch,
847 min_committed_epoch,
848 level_sst_num,
849 level_compact_cnt,
850 compact_frequency,
851 compact_skip_frequency,
852 level_file_size,
853 version_size,
854 version_stats,
855 materialized_view_stats,
856 stale_object_count,
857 stale_object_size,
858 old_version_object_count,
859 old_version_object_size,
860 time_travel_object_count,
861 current_version_object_count,
862 current_version_object_size,
863 total_object_count,
864 total_object_size,
865 table_change_log_object_count,
866 table_change_log_object_size,
867 delta_log_count,
868 version_checkpoint_latency,
869 current_version_id,
870 checkpoint_version_id,
871 min_pinned_version_id,
872 min_safepoint_version_id,
873 write_stop_compaction_groups,
874 full_gc_trigger_count,
875 full_gc_candidate_object_count,
876 full_gc_selected_object_count,
877 hummock_manager_lock_time,
878 hummock_manager_real_process_time,
879 time_after_last_observation: Arc::new(AtomicU64::new(0)),
880 worker_num,
881 meta_type,
882 compact_pending_bytes,
883 compact_level_compression_ratio,
884 level_compact_task_cnt,
885 object_store_metric,
886 source_is_up,
887 source_enumerator_metrics,
888 actor_info,
889 table_info,
890 sink_info,
891 relation_info,
892 l0_compact_level_count,
893 compact_task_size,
894 compact_task_file_count,
895 compact_task_batch_count,
896 compact_task_trivial_move_sst_count,
897 table_write_throughput,
898 split_compaction_group_count,
899 state_table_count,
900 branched_sst_count,
901 compaction_event_consumed_latency,
902 compaction_event_loop_iteration_latency,
903 auto_schema_change_failure_cnt,
904 auto_schema_change_success_cnt,
905 auto_schema_change_latency,
906 merge_compaction_group_count,
907 time_travel_version_replay_latency,
908 compaction_group_count,
909 compaction_group_size,
910 compaction_group_file_count,
911 compaction_group_throughput,
912 }
913 }
914
915 #[cfg(test)]
916 pub fn for_test(registry: &Registry) -> Self {
917 Self::new(registry)
918 }
919}
920impl Default for MetaMetrics {
921 fn default() -> Self {
922 GLOBAL_META_METRICS.clone()
923 }
924}
925
926pub fn start_worker_info_monitor(
927 metadata_manager: MetadataManager,
928 election_client: ElectionClientRef,
929 interval: Duration,
930 meta_metrics: Arc<MetaMetrics>,
931) -> (JoinHandle<()>, Sender<()>) {
932 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
933 let join_handle = tokio::spawn(async move {
934 let mut monitor_interval = tokio::time::interval(interval);
935 monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
936 loop {
937 tokio::select! {
938 _ = monitor_interval.tick() => {},
940 _ = &mut shutdown_rx => {
942 tracing::info!("Worker number monitor is stopped");
943 return;
944 }
945 }
946
947 let node_map = match metadata_manager.count_worker_node().await {
948 Ok(node_map) => node_map,
949 Err(err) => {
950 tracing::warn!(error = %err.as_report(), "fail to count worker node");
951 continue;
952 }
953 };
954
955 meta_metrics.worker_num.reset();
957 meta_metrics.meta_type.reset();
958
959 for (worker_type, worker_num) in node_map {
960 meta_metrics
961 .worker_num
962 .with_label_values(&[(worker_type.as_str_name())])
963 .set(worker_num as i64);
964 }
965 if let Ok(meta_members) = election_client.get_members().await {
966 meta_metrics
967 .worker_num
968 .with_label_values(&[WorkerType::Meta.as_str_name()])
969 .set(meta_members.len() as i64);
970 meta_members.into_iter().for_each(|m| {
971 let role = if m.is_leader { "leader" } else { "follower" };
972 meta_metrics
973 .meta_type
974 .with_label_values(&[m.id.as_str(), role])
975 .set(1);
976 });
977 }
978 }
979 });
980
981 (join_handle, shutdown_tx)
982}
983
984pub async fn refresh_fragment_info_metrics(
985 catalog_controller: &CatalogControllerRef,
986 cluster_controller: &ClusterControllerRef,
987 hummock_manager: &HummockManagerRef,
988 meta_metrics: Arc<MetaMetrics>,
989) {
990 let worker_nodes = match cluster_controller
991 .list_workers(Some(WorkerType::ComputeNode.into()), None)
992 .await
993 {
994 Ok(worker_nodes) => worker_nodes,
995 Err(err) => {
996 tracing::warn!(error=%err.as_report(), "fail to list worker node");
997 return;
998 }
999 };
1000 let actor_locations = match catalog_controller.list_actor_locations().await {
1001 Ok(actor_locations) => actor_locations,
1002 Err(err) => {
1003 tracing::warn!(error=%err.as_report(), "fail to get actor locations");
1004 return;
1005 }
1006 };
1007 let sink_actor_mapping = match catalog_controller.list_sink_actor_mapping().await {
1008 Ok(sink_actor_mapping) => sink_actor_mapping,
1009 Err(err) => {
1010 tracing::warn!(error=%err.as_report(), "fail to get sink actor mapping");
1011 return;
1012 }
1013 };
1014 let fragment_state_tables = match catalog_controller.list_fragment_state_tables().await {
1015 Ok(fragment_state_tables) => fragment_state_tables,
1016 Err(err) => {
1017 tracing::warn!(error=%err.as_report(), "fail to get fragment state tables");
1018 return;
1019 }
1020 };
1021 let table_name_and_type_mapping = match catalog_controller.get_table_name_type_mapping().await {
1022 Ok(mapping) => mapping,
1023 Err(err) => {
1024 tracing::warn!(error=%err.as_report(), "fail to get table name mapping");
1025 return;
1026 }
1027 };
1028
1029 let worker_addr_mapping: HashMap<WorkerId, String> = worker_nodes
1030 .into_iter()
1031 .map(|worker_node| {
1032 let addr = match worker_node.host {
1033 Some(host) => format!("{}:{}", host.host, host.port),
1034 None => "".to_owned(),
1035 };
1036 (worker_node.id as WorkerId, addr)
1037 })
1038 .collect();
1039 let table_compaction_group_id_mapping = hummock_manager
1040 .get_table_compaction_group_id_mapping()
1041 .await;
1042
1043 meta_metrics.actor_info.reset();
1046 meta_metrics.table_info.reset();
1047 meta_metrics.sink_info.reset();
1048 for actor_location in actor_locations {
1049 let actor_id_str = actor_location.actor_id.to_string();
1050 let fragment_id_str = actor_location.fragment_id.to_string();
1051 if let Some(address) = worker_addr_mapping.get(&actor_location.worker_id) {
1054 meta_metrics
1055 .actor_info
1056 .with_label_values(&[&actor_id_str, &fragment_id_str, address])
1057 .set(1);
1058 }
1059 }
1060 for (sink_id, (sink_name, actor_ids)) in sink_actor_mapping {
1061 let sink_id_str = sink_id.to_string();
1062 for actor_id in actor_ids {
1063 let actor_id_str = actor_id.to_string();
1064 meta_metrics
1065 .sink_info
1066 .with_label_values(&[&actor_id_str, &sink_id_str, &sink_name])
1067 .set(1);
1068 }
1069 }
1070 for PartialFragmentStateTables {
1071 fragment_id,
1072 job_id,
1073 state_table_ids,
1074 } in fragment_state_tables
1075 {
1076 let fragment_id_str = fragment_id.to_string();
1077 let job_id_str = job_id.to_string();
1078 for table_id in state_table_ids.into_inner() {
1079 let table_id_str = table_id.to_string();
1080 let (table_name, table_type) = table_name_and_type_mapping
1081 .get(&table_id)
1082 .cloned()
1083 .unwrap_or_else(|| ("unknown".to_owned(), "unknown".to_owned()));
1084 let compaction_group_id = table_compaction_group_id_mapping
1085 .get(&(table_id as u32))
1086 .map(|cg_id| cg_id.to_string())
1087 .unwrap_or_else(|| "unknown".to_owned());
1088 meta_metrics
1089 .table_info
1090 .with_label_values(&[
1091 &job_id_str,
1092 &table_id_str,
1093 &fragment_id_str,
1094 &table_name,
1095 &table_type,
1096 &compaction_group_id,
1097 ])
1098 .set(1);
1099 }
1100 }
1101}
1102
1103pub async fn refresh_relation_info_metrics(
1104 catalog_controller: &CatalogControllerRef,
1105 meta_metrics: Arc<MetaMetrics>,
1106) {
1107 let table_objects = match catalog_controller.list_table_objects().await {
1108 Ok(table_objects) => table_objects,
1109 Err(err) => {
1110 tracing::warn!(error=%err.as_report(), "fail to get table objects");
1111 return;
1112 }
1113 };
1114
1115 let source_objects = match catalog_controller.list_source_objects().await {
1116 Ok(source_objects) => source_objects,
1117 Err(err) => {
1118 tracing::warn!(error=%err.as_report(), "fail to get source objects");
1119 return;
1120 }
1121 };
1122
1123 let sink_objects = match catalog_controller.list_sink_objects().await {
1124 Ok(sink_objects) => sink_objects,
1125 Err(err) => {
1126 tracing::warn!(error=%err.as_report(), "fail to get sink objects");
1127 return;
1128 }
1129 };
1130
1131 meta_metrics.relation_info.reset();
1132
1133 for (id, db, schema, name, resource_group) in table_objects {
1134 meta_metrics
1135 .relation_info
1136 .with_label_values(&[
1137 &id.to_string(),
1138 &db,
1139 &schema,
1140 &name,
1141 &resource_group,
1142 &"table".to_owned(),
1143 ])
1144 .set(1);
1145 }
1146
1147 for (id, db, schema, name, resource_group) in source_objects {
1148 meta_metrics
1149 .relation_info
1150 .with_label_values(&[
1151 &id.to_string(),
1152 &db,
1153 &schema,
1154 &name,
1155 &resource_group,
1156 &"source".to_owned(),
1157 ])
1158 .set(1);
1159 }
1160
1161 for (id, db, schema, name, resource_group) in sink_objects {
1162 meta_metrics
1163 .relation_info
1164 .with_label_values(&[
1165 &id.to_string(),
1166 &db,
1167 &schema,
1168 &name,
1169 &resource_group,
1170 &"sink".to_owned(),
1171 ])
1172 .set(1);
1173 }
1174}
1175
1176pub fn start_fragment_info_monitor(
1177 metadata_manager: MetadataManager,
1178 hummock_manager: HummockManagerRef,
1179 meta_metrics: Arc<MetaMetrics>,
1180) -> (JoinHandle<()>, Sender<()>) {
1181 const COLLECT_INTERVAL_SECONDS: u64 = 60;
1182
1183 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1184 let join_handle = tokio::spawn(async move {
1185 let mut monitor_interval =
1186 tokio::time::interval(Duration::from_secs(COLLECT_INTERVAL_SECONDS));
1187 monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1188 loop {
1189 tokio::select! {
1190 _ = monitor_interval.tick() => {},
1192 _ = &mut shutdown_rx => {
1194 tracing::info!("Fragment info monitor is stopped");
1195 return;
1196 }
1197 }
1198
1199 refresh_fragment_info_metrics(
1200 &metadata_manager.catalog_controller,
1201 &metadata_manager.cluster_controller,
1202 &hummock_manager,
1203 meta_metrics.clone(),
1204 )
1205 .await;
1206
1207 refresh_relation_info_metrics(
1208 &metadata_manager.catalog_controller,
1209 meta_metrics.clone(),
1210 )
1211 .await;
1212 }
1213 });
1214
1215 (join_handle, shutdown_tx)
1216}