1use std::collections::HashMap;
16use std::sync::atomic::AtomicU64;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use prometheus::core::{AtomicF64, GenericGaugeVec};
21use prometheus::{
22 Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry, exponential_buckets,
23 histogram_opts, register_gauge_vec_with_registry, register_histogram_vec_with_registry,
24 register_histogram_with_registry, register_int_counter_vec_with_registry,
25 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
26};
27use risingwave_common::metrics::{
28 LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
29};
30use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
31use risingwave_common::{
32 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
33 register_guarded_int_gauge_vec_with_registry,
34};
35use risingwave_connector::source::monitor::EnumeratorMetrics as SourceEnumeratorMetrics;
36use risingwave_meta_model::WorkerId;
37use risingwave_object_store::object::object_metrics::{
38 GLOBAL_OBJECT_STORE_METRICS, ObjectStoreMetrics,
39};
40use risingwave_pb::common::WorkerType;
41use thiserror_ext::AsReport;
42use tokio::sync::oneshot::Sender;
43use tokio::task::JoinHandle;
44
45use crate::controller::catalog::CatalogControllerRef;
46use crate::controller::cluster::ClusterControllerRef;
47use crate::controller::utils::PartialFragmentStateTables;
48use crate::hummock::HummockManagerRef;
49use crate::manager::MetadataManager;
50use crate::rpc::ElectionClientRef;
51
52#[derive(Clone)]
53pub struct MetaMetrics {
54 pub worker_num: IntGaugeVec,
57 pub meta_type: IntGaugeVec,
59
60 pub grpc_latency: HistogramVec,
63
64 pub barrier_latency: LabelGuardedHistogramVec,
68 pub barrier_wait_commit_latency: Histogram,
70 pub barrier_send_latency: LabelGuardedHistogramVec,
72 pub all_barrier_nums: LabelGuardedIntGaugeVec,
75 pub in_flight_barrier_nums: LabelGuardedIntGaugeVec,
77 pub last_committed_barrier_time: IntGaugeVec,
79
80 pub snapshot_backfill_barrier_latency: LabelGuardedHistogramVec, pub snapshot_backfill_wait_commit_latency: LabelGuardedHistogramVec, pub snapshot_backfill_lag: LabelGuardedIntGaugeVec, pub snapshot_backfill_inflight_barrier_num: LabelGuardedIntGaugeVec, pub recovery_failure_cnt: IntCounterVec,
92 pub recovery_latency: HistogramVec,
93
94 pub max_committed_epoch: IntGauge,
97 pub min_committed_epoch: IntGauge,
99 pub level_sst_num: IntGaugeVec,
101 pub level_compact_cnt: IntGaugeVec,
103 pub compact_frequency: IntCounterVec,
105 pub level_file_size: IntGaugeVec,
107 pub version_size: IntGauge,
109 pub current_version_id: IntGauge,
111 pub checkpoint_version_id: IntGauge,
113 pub min_pinned_version_id: IntGauge,
115 pub min_safepoint_version_id: IntGauge,
117 pub write_stop_compaction_groups: IntGaugeVec,
119 pub full_gc_trigger_count: IntGauge,
121 pub full_gc_candidate_object_count: Histogram,
123 pub full_gc_selected_object_count: Histogram,
125 pub version_stats: IntGaugeVec,
127 pub materialized_view_stats: IntGaugeVec,
129 pub stale_object_count: IntGauge,
131 pub stale_object_size: IntGauge,
133 pub old_version_object_count: IntGauge,
135 pub old_version_object_size: IntGauge,
137 pub time_travel_object_count: IntGauge,
139 pub current_version_object_count: IntGauge,
141 pub current_version_object_size: IntGauge,
143 pub total_object_count: IntGauge,
145 pub total_object_size: IntGauge,
147 pub table_change_log_object_count: IntGaugeVec,
149 pub table_change_log_object_size: IntGaugeVec,
151 pub delta_log_count: IntGauge,
153 pub version_checkpoint_latency: Histogram,
155 pub hummock_manager_lock_time: HistogramVec,
157 pub hummock_manager_real_process_time: HistogramVec,
159 pub compact_skip_frequency: IntCounterVec,
161 pub compact_pending_bytes: IntGaugeVec,
163 pub compact_level_compression_ratio: GenericGaugeVec<AtomicF64>,
165 pub level_compact_task_cnt: IntGaugeVec,
167 pub time_after_last_observation: Arc<AtomicU64>,
168 pub l0_compact_level_count: HistogramVec,
169 pub compact_task_size: HistogramVec,
170 pub compact_task_file_count: HistogramVec,
171 pub compact_task_batch_count: HistogramVec,
172 pub split_compaction_group_count: IntCounterVec,
173 pub state_table_count: IntGaugeVec,
174 pub branched_sst_count: IntGaugeVec,
175 pub compact_task_trivial_move_sst_count: HistogramVec,
176
177 pub compaction_event_consumed_latency: Histogram,
178 pub compaction_event_loop_iteration_latency: Histogram,
179
180 pub object_store_metric: Arc<ObjectStoreMetrics>,
183
184 pub source_is_up: LabelGuardedIntGaugeVec,
187 pub source_enumerator_metrics: Arc<SourceEnumeratorMetrics>,
188
189 pub actor_info: IntGaugeVec,
192 pub table_info: IntGaugeVec,
194 pub sink_info: IntGaugeVec,
196 pub relation_info: IntGaugeVec,
198
199 pub table_write_throughput: IntCounterVec,
201
202 pub merge_compaction_group_count: IntCounterVec,
204
205 pub auto_schema_change_failure_cnt: LabelGuardedIntCounterVec,
207 pub auto_schema_change_success_cnt: LabelGuardedIntCounterVec,
208 pub auto_schema_change_latency: LabelGuardedHistogramVec,
209
210 pub time_travel_version_replay_latency: Histogram,
211
212 pub compaction_group_count: IntGauge,
213 pub compaction_group_size: IntGaugeVec,
214 pub compaction_group_file_count: IntGaugeVec,
215 pub compaction_group_throughput: IntGaugeVec,
216}
217
218pub static GLOBAL_META_METRICS: LazyLock<MetaMetrics> =
219 LazyLock::new(|| MetaMetrics::new(&GLOBAL_METRICS_REGISTRY));
220
221impl MetaMetrics {
222 fn new(registry: &Registry) -> Self {
223 let opts = histogram_opts!(
224 "meta_grpc_duration_seconds",
225 "gRPC latency of meta services",
226 exponential_buckets(0.0001, 2.0, 20).unwrap() );
228 let grpc_latency =
229 register_histogram_vec_with_registry!(opts, &["path"], registry).unwrap();
230
231 let opts = histogram_opts!(
232 "meta_barrier_duration_seconds",
233 "barrier latency",
234 exponential_buckets(0.1, 1.5, 20).unwrap() );
236 let barrier_latency =
237 register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
238 .unwrap();
239
240 let opts = histogram_opts!(
241 "meta_barrier_wait_commit_duration_seconds",
242 "barrier_wait_commit_latency",
243 exponential_buckets(0.1, 1.5, 20).unwrap() );
245 let barrier_wait_commit_latency =
246 register_histogram_with_registry!(opts, registry).unwrap();
247
248 let opts = histogram_opts!(
249 "meta_barrier_send_duration_seconds",
250 "barrier send latency",
251 exponential_buckets(0.1, 1.5, 19).unwrap() );
253 let barrier_send_latency =
254 register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
255 .unwrap();
256
257 let all_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
258 "all_barrier_nums",
259 "num of of all_barrier",
260 &["database_id"],
261 registry
262 )
263 .unwrap();
264 let in_flight_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
265 "in_flight_barrier_nums",
266 "num of of in_flight_barrier",
267 &["database_id"],
268 registry
269 )
270 .unwrap();
271 let last_committed_barrier_time = register_int_gauge_vec_with_registry!(
272 "last_committed_barrier_time",
273 "The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.",
274 &["database_id"],
275 registry
276 )
277 .unwrap();
278
279 let opts = histogram_opts!(
281 "meta_snapshot_backfill_barrier_duration_seconds",
282 "snapshot backfill barrier latency",
283 exponential_buckets(0.1, 1.5, 20).unwrap() );
285 let snapshot_backfill_barrier_latency = register_guarded_histogram_vec_with_registry!(
286 opts,
287 &["table_id", "barrier_type"],
288 registry
289 )
290 .unwrap();
291 let opts = histogram_opts!(
292 "meta_snapshot_backfill_barrier_wait_commit_duration_seconds",
293 "snapshot backfill barrier_wait_commit_latency",
294 exponential_buckets(0.1, 1.5, 20).unwrap() );
296 let snapshot_backfill_wait_commit_latency =
297 register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
298
299 let snapshot_backfill_lag = register_guarded_int_gauge_vec_with_registry!(
300 "meta_snapshot_backfill_upstream_lag",
301 "snapshot backfill upstream_lag",
302 &["table_id"],
303 registry
304 )
305 .unwrap();
306 let snapshot_backfill_inflight_barrier_num = register_guarded_int_gauge_vec_with_registry!(
307 "meta_snapshot_backfill_inflight_barrier_num",
308 "snapshot backfill inflight_barrier_num",
309 &["table_id"],
310 registry
311 )
312 .unwrap();
313
314 let max_committed_epoch = register_int_gauge_with_registry!(
315 "storage_max_committed_epoch",
316 "max committed epoch",
317 registry
318 )
319 .unwrap();
320
321 let min_committed_epoch = register_int_gauge_with_registry!(
322 "storage_min_committed_epoch",
323 "min committed epoch",
324 registry
325 )
326 .unwrap();
327
328 let level_sst_num = register_int_gauge_vec_with_registry!(
329 "storage_level_sst_num",
330 "num of SSTs in each level",
331 &["level_index"],
332 registry
333 )
334 .unwrap();
335
336 let level_compact_cnt = register_int_gauge_vec_with_registry!(
337 "storage_level_compact_cnt",
338 "num of SSTs to be merged to next level in each level",
339 &["level_index"],
340 registry
341 )
342 .unwrap();
343
344 let compact_frequency = register_int_counter_vec_with_registry!(
345 "storage_level_compact_frequency",
346 "The number of compactions from one level to another level that have completed or failed.",
347 &["compactor", "group", "task_type", "result"],
348 registry
349 )
350 .unwrap();
351 let compact_skip_frequency = register_int_counter_vec_with_registry!(
352 "storage_skip_compact_frequency",
353 "The number of compactions from one level to another level that have been skipped.",
354 &["level", "type"],
355 registry
356 )
357 .unwrap();
358
359 let version_size =
360 register_int_gauge_with_registry!("storage_version_size", "version size", registry)
361 .unwrap();
362
363 let current_version_id = register_int_gauge_with_registry!(
364 "storage_current_version_id",
365 "current version id",
366 registry
367 )
368 .unwrap();
369
370 let checkpoint_version_id = register_int_gauge_with_registry!(
371 "storage_checkpoint_version_id",
372 "checkpoint version id",
373 registry
374 )
375 .unwrap();
376
377 let min_pinned_version_id = register_int_gauge_with_registry!(
378 "storage_min_pinned_version_id",
379 "min pinned version id",
380 registry
381 )
382 .unwrap();
383
384 let write_stop_compaction_groups = register_int_gauge_vec_with_registry!(
385 "storage_write_stop_compaction_groups",
386 "compaction groups of write stop state",
387 &["compaction_group_id"],
388 registry
389 )
390 .unwrap();
391
392 let full_gc_trigger_count = register_int_gauge_with_registry!(
393 "storage_full_gc_trigger_count",
394 "the number of attempts to trigger full GC",
395 registry
396 )
397 .unwrap();
398
399 let opts = histogram_opts!(
400 "storage_full_gc_candidate_object_count",
401 "the number of candidate object to delete after scanning object store",
402 exponential_buckets(1.0, 10.0, 6).unwrap()
403 );
404 let full_gc_candidate_object_count =
405 register_histogram_with_registry!(opts, registry).unwrap();
406
407 let opts = histogram_opts!(
408 "storage_full_gc_selected_object_count",
409 "the number of object to delete after filtering by meta node",
410 exponential_buckets(1.0, 10.0, 6).unwrap()
411 );
412 let full_gc_selected_object_count =
413 register_histogram_with_registry!(opts, registry).unwrap();
414
415 let min_safepoint_version_id = register_int_gauge_with_registry!(
416 "storage_min_safepoint_version_id",
417 "min safepoint version id",
418 registry
419 )
420 .unwrap();
421
422 let level_file_size = register_int_gauge_vec_with_registry!(
423 "storage_level_total_file_size",
424 "KBs total file bytes in each level",
425 &["level_index"],
426 registry
427 )
428 .unwrap();
429
430 let version_stats = register_int_gauge_vec_with_registry!(
431 "storage_version_stats",
432 "per table stats in current hummock version",
433 &["table_id", "metric"],
434 registry
435 )
436 .unwrap();
437
438 let materialized_view_stats = register_int_gauge_vec_with_registry!(
439 "storage_materialized_view_stats",
440 "per materialized view stats in current hummock version",
441 &["table_id", "metric"],
442 registry
443 )
444 .unwrap();
445
446 let stale_object_count = register_int_gauge_with_registry!(
447 "storage_stale_object_count",
448 "total number of objects that is no longer referenced by versions.",
449 registry
450 )
451 .unwrap();
452
453 let stale_object_size = register_int_gauge_with_registry!(
454 "storage_stale_object_size",
455 "total size of objects that is no longer referenced by versions.",
456 registry
457 )
458 .unwrap();
459
460 let old_version_object_count = register_int_gauge_with_registry!(
461 "storage_old_version_object_count",
462 "total number of objects that is still referenced by non-current versions",
463 registry
464 )
465 .unwrap();
466
467 let old_version_object_size = register_int_gauge_with_registry!(
468 "storage_old_version_object_size",
469 "total size of objects that is still referenced by non-current versions",
470 registry
471 )
472 .unwrap();
473
474 let current_version_object_count = register_int_gauge_with_registry!(
475 "storage_current_version_object_count",
476 "total number of objects that is referenced by current version",
477 registry
478 )
479 .unwrap();
480
481 let current_version_object_size = register_int_gauge_with_registry!(
482 "storage_current_version_object_size",
483 "total size of objects that is referenced by current version",
484 registry
485 )
486 .unwrap();
487
488 let total_object_count = register_int_gauge_with_registry!(
489 "storage_total_object_count",
490 "Total number of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
491 registry
492 ).unwrap();
493
494 let total_object_size = register_int_gauge_with_registry!(
495 "storage_total_object_size",
496 "Total size of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
497 registry
498 ).unwrap();
499
500 let table_change_log_object_count = register_int_gauge_vec_with_registry!(
501 "storage_table_change_log_object_count",
502 "per table change log object count",
503 &["table_id"],
504 registry
505 )
506 .unwrap();
507
508 let table_change_log_object_size = register_int_gauge_vec_with_registry!(
509 "storage_table_change_log_object_size",
510 "per table change log object size",
511 &["table_id"],
512 registry
513 )
514 .unwrap();
515
516 let time_travel_object_count = register_int_gauge_with_registry!(
517 "storage_time_travel_object_count",
518 "total number of objects that is referenced by time travel.",
519 registry
520 )
521 .unwrap();
522
523 let delta_log_count = register_int_gauge_with_registry!(
524 "storage_delta_log_count",
525 "total number of hummock version delta log",
526 registry
527 )
528 .unwrap();
529
530 let opts = histogram_opts!(
531 "storage_version_checkpoint_latency",
532 "hummock version checkpoint latency",
533 exponential_buckets(0.1, 1.5, 20).unwrap()
534 );
535 let version_checkpoint_latency = register_histogram_with_registry!(opts, registry).unwrap();
536
537 let hummock_manager_lock_time = register_histogram_vec_with_registry!(
538 "hummock_manager_lock_time",
539 "latency for hummock manager to acquire the rwlock",
540 &["lock_name", "lock_type"],
541 registry
542 )
543 .unwrap();
544
545 let hummock_manager_real_process_time = register_histogram_vec_with_registry!(
546 "meta_hummock_manager_real_process_time",
547 "latency for hummock manager to really process the request",
548 &["method"],
549 registry
550 )
551 .unwrap();
552
553 let worker_num = register_int_gauge_vec_with_registry!(
554 "worker_num",
555 "number of nodes in the cluster",
556 &["worker_type"],
557 registry,
558 )
559 .unwrap();
560
561 let meta_type = register_int_gauge_vec_with_registry!(
562 "meta_num",
563 "role of meta nodes in the cluster",
564 &["worker_addr", "role"],
565 registry,
566 )
567 .unwrap();
568
569 let compact_pending_bytes = register_int_gauge_vec_with_registry!(
570 "storage_compact_pending_bytes",
571 "bytes of lsm tree needed to reach balance",
572 &["group"],
573 registry
574 )
575 .unwrap();
576
577 let compact_level_compression_ratio = register_gauge_vec_with_registry!(
578 "storage_compact_level_compression_ratio",
579 "compression ratio of each level of the lsm tree",
580 &["group", "level", "algorithm"],
581 registry
582 )
583 .unwrap();
584
585 let level_compact_task_cnt = register_int_gauge_vec_with_registry!(
586 "storage_level_compact_task_cnt",
587 "num of compact_task organized by group and level",
588 &["task"],
589 registry
590 )
591 .unwrap();
592 let object_store_metric = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
593
594 let recovery_failure_cnt = register_int_counter_vec_with_registry!(
595 "recovery_failure_cnt",
596 "Number of failed recovery attempts",
597 &["recovery_type"],
598 registry
599 )
600 .unwrap();
601 let opts = histogram_opts!(
602 "recovery_latency",
603 "Latency of the recovery process",
604 exponential_buckets(0.1, 1.5, 20).unwrap() );
606 let recovery_latency =
607 register_histogram_vec_with_registry!(opts, &["recovery_type"], registry).unwrap();
608
609 let auto_schema_change_failure_cnt = register_guarded_int_counter_vec_with_registry!(
610 "auto_schema_change_failure_cnt",
611 "Number of failed auto schema change",
612 &["table_id", "table_name"],
613 registry
614 )
615 .unwrap();
616
617 let auto_schema_change_success_cnt = register_guarded_int_counter_vec_with_registry!(
618 "auto_schema_change_success_cnt",
619 "Number of success auto schema change",
620 &["table_id", "table_name"],
621 registry
622 )
623 .unwrap();
624
625 let opts = histogram_opts!(
626 "auto_schema_change_latency",
627 "Latency of the auto schema change process",
628 exponential_buckets(0.1, 1.5, 20).unwrap() );
630 let auto_schema_change_latency = register_guarded_histogram_vec_with_registry!(
631 opts,
632 &["table_id", "table_name"],
633 registry
634 )
635 .unwrap();
636
637 let source_is_up = register_guarded_int_gauge_vec_with_registry!(
638 "source_status_is_up",
639 "source is up or not",
640 &["source_id", "source_name"],
641 registry
642 )
643 .unwrap();
644 let source_enumerator_metrics = Arc::new(SourceEnumeratorMetrics::default());
645
646 let actor_info = register_int_gauge_vec_with_registry!(
647 "actor_info",
648 "Mapping from actor id to (fragment id, compute node)",
649 &["actor_id", "fragment_id", "compute_node"],
650 registry
651 )
652 .unwrap();
653
654 let table_info = register_int_gauge_vec_with_registry!(
655 "table_info",
656 "Mapping from table id to (actor id, table name)",
657 &[
658 "materialized_view_id",
659 "table_id",
660 "fragment_id",
661 "table_name",
662 "table_type",
663 "compaction_group_id"
664 ],
665 registry
666 )
667 .unwrap();
668
669 let sink_info = register_int_gauge_vec_with_registry!(
670 "sink_info",
671 "Mapping from actor id to (actor id, sink name)",
672 &["actor_id", "sink_id", "sink_name",],
673 registry
674 )
675 .unwrap();
676
677 let relation_info = register_int_gauge_vec_with_registry!(
678 "relation_info",
679 "Information of the database relation (table/source/sink)",
680 &["id", "database", "schema", "name", "resource_group", "type"],
681 registry
682 )
683 .unwrap();
684
685 let l0_compact_level_count = register_histogram_vec_with_registry!(
686 "storage_l0_compact_level_count",
687 "level_count of l0 compact task",
688 &["group", "type"],
689 registry
690 )
691 .unwrap();
692
693 let opts = histogram_opts!(
694 "storage_compact_task_size",
695 "Total size of compact that have been issued to state store",
696 exponential_buckets(1048576.0, 2.0, 16).unwrap()
697 );
698
699 let compact_task_size =
700 register_histogram_vec_with_registry!(opts, &["group", "type"], registry).unwrap();
701
702 let compact_task_file_count = register_histogram_vec_with_registry!(
703 "storage_compact_task_file_count",
704 "file count of compact task",
705 &["group", "type"],
706 registry
707 )
708 .unwrap();
709 let opts = histogram_opts!(
710 "storage_compact_task_batch_count",
711 "count of compact task batch",
712 exponential_buckets(1.0, 2.0, 8).unwrap()
713 );
714 let compact_task_batch_count =
715 register_histogram_vec_with_registry!(opts, &["type"], registry).unwrap();
716
717 let table_write_throughput = register_int_counter_vec_with_registry!(
718 "storage_commit_write_throughput",
719 "The number of compactions from one level to another level that have been skipped.",
720 &["table_id"],
721 registry
722 )
723 .unwrap();
724
725 let split_compaction_group_count = register_int_counter_vec_with_registry!(
726 "storage_split_compaction_group_count",
727 "Count of trigger split compaction group",
728 &["group"],
729 registry
730 )
731 .unwrap();
732
733 let state_table_count = register_int_gauge_vec_with_registry!(
734 "storage_state_table_count",
735 "Count of stable table per compaction group",
736 &["group"],
737 registry
738 )
739 .unwrap();
740
741 let branched_sst_count = register_int_gauge_vec_with_registry!(
742 "storage_branched_sst_count",
743 "Count of branched sst per compaction group",
744 &["group"],
745 registry
746 )
747 .unwrap();
748
749 let opts = histogram_opts!(
750 "storage_compaction_event_consumed_latency",
751 "The latency(ms) of each event being consumed",
752 exponential_buckets(1.0, 1.5, 30).unwrap() );
754 let compaction_event_consumed_latency =
755 register_histogram_with_registry!(opts, registry).unwrap();
756
757 let opts = histogram_opts!(
758 "storage_compaction_event_loop_iteration_latency",
759 "The latency(ms) of each iteration of the compaction event loop",
760 exponential_buckets(1.0, 1.5, 30).unwrap() );
762 let compaction_event_loop_iteration_latency =
763 register_histogram_with_registry!(opts, registry).unwrap();
764
765 let merge_compaction_group_count = register_int_counter_vec_with_registry!(
766 "storage_merge_compaction_group_count",
767 "Count of trigger merge compaction group",
768 &["group"],
769 registry
770 )
771 .unwrap();
772
773 let opts = histogram_opts!(
774 "storage_time_travel_version_replay_latency",
775 "The latency(ms) of replaying a hummock version for time travel",
776 exponential_buckets(0.01, 10.0, 6).unwrap()
777 );
778 let time_travel_version_replay_latency =
779 register_histogram_with_registry!(opts, registry).unwrap();
780
781 let compaction_group_count = register_int_gauge_with_registry!(
782 "storage_compaction_group_count",
783 "The number of compaction groups",
784 registry,
785 )
786 .unwrap();
787
788 let compaction_group_size = register_int_gauge_vec_with_registry!(
789 "storage_compaction_group_size",
790 "The size of compaction group",
791 &["group"],
792 registry
793 )
794 .unwrap();
795
796 let compaction_group_file_count = register_int_gauge_vec_with_registry!(
797 "storage_compaction_group_file_count",
798 "The file count of compaction group",
799 &["group"],
800 registry
801 )
802 .unwrap();
803
804 let compaction_group_throughput = register_int_gauge_vec_with_registry!(
805 "storage_compaction_group_throughput",
806 "The throughput of compaction group",
807 &["group"],
808 registry
809 )
810 .unwrap();
811
812 let opts = histogram_opts!(
813 "storage_compact_task_trivial_move_sst_count",
814 "sst count of compact trivial-move task",
815 exponential_buckets(1.0, 2.0, 8).unwrap()
816 );
817 let compact_task_trivial_move_sst_count =
818 register_histogram_vec_with_registry!(opts, &["group"], registry).unwrap();
819
820 Self {
821 grpc_latency,
822 barrier_latency,
823 barrier_wait_commit_latency,
824 barrier_send_latency,
825 all_barrier_nums,
826 in_flight_barrier_nums,
827 last_committed_barrier_time,
828 snapshot_backfill_barrier_latency,
829 snapshot_backfill_wait_commit_latency,
830 snapshot_backfill_lag,
831 snapshot_backfill_inflight_barrier_num,
832 recovery_failure_cnt,
833 recovery_latency,
834
835 max_committed_epoch,
836 min_committed_epoch,
837 level_sst_num,
838 level_compact_cnt,
839 compact_frequency,
840 compact_skip_frequency,
841 level_file_size,
842 version_size,
843 version_stats,
844 materialized_view_stats,
845 stale_object_count,
846 stale_object_size,
847 old_version_object_count,
848 old_version_object_size,
849 time_travel_object_count,
850 current_version_object_count,
851 current_version_object_size,
852 total_object_count,
853 total_object_size,
854 table_change_log_object_count,
855 table_change_log_object_size,
856 delta_log_count,
857 version_checkpoint_latency,
858 current_version_id,
859 checkpoint_version_id,
860 min_pinned_version_id,
861 min_safepoint_version_id,
862 write_stop_compaction_groups,
863 full_gc_trigger_count,
864 full_gc_candidate_object_count,
865 full_gc_selected_object_count,
866 hummock_manager_lock_time,
867 hummock_manager_real_process_time,
868 time_after_last_observation: Arc::new(AtomicU64::new(0)),
869 worker_num,
870 meta_type,
871 compact_pending_bytes,
872 compact_level_compression_ratio,
873 level_compact_task_cnt,
874 object_store_metric,
875 source_is_up,
876 source_enumerator_metrics,
877 actor_info,
878 table_info,
879 sink_info,
880 relation_info,
881 l0_compact_level_count,
882 compact_task_size,
883 compact_task_file_count,
884 compact_task_batch_count,
885 compact_task_trivial_move_sst_count,
886 table_write_throughput,
887 split_compaction_group_count,
888 state_table_count,
889 branched_sst_count,
890 compaction_event_consumed_latency,
891 compaction_event_loop_iteration_latency,
892 auto_schema_change_failure_cnt,
893 auto_schema_change_success_cnt,
894 auto_schema_change_latency,
895 merge_compaction_group_count,
896 time_travel_version_replay_latency,
897 compaction_group_count,
898 compaction_group_size,
899 compaction_group_file_count,
900 compaction_group_throughput,
901 }
902 }
903
904 #[cfg(test)]
905 pub fn for_test(registry: &Registry) -> Self {
906 Self::new(registry)
907 }
908}
909impl Default for MetaMetrics {
910 fn default() -> Self {
911 GLOBAL_META_METRICS.clone()
912 }
913}
914
915pub fn start_worker_info_monitor(
916 metadata_manager: MetadataManager,
917 election_client: ElectionClientRef,
918 interval: Duration,
919 meta_metrics: Arc<MetaMetrics>,
920) -> (JoinHandle<()>, Sender<()>) {
921 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
922 let join_handle = tokio::spawn(async move {
923 let mut monitor_interval = tokio::time::interval(interval);
924 monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
925 loop {
926 tokio::select! {
927 _ = monitor_interval.tick() => {},
929 _ = &mut shutdown_rx => {
931 tracing::info!("Worker number monitor is stopped");
932 return;
933 }
934 }
935
936 let node_map = match metadata_manager.count_worker_node().await {
937 Ok(node_map) => node_map,
938 Err(err) => {
939 tracing::warn!(error = %err.as_report(), "fail to count worker node");
940 continue;
941 }
942 };
943
944 meta_metrics.worker_num.reset();
946 meta_metrics.meta_type.reset();
947
948 for (worker_type, worker_num) in node_map {
949 meta_metrics
950 .worker_num
951 .with_label_values(&[(worker_type.as_str_name())])
952 .set(worker_num as i64);
953 }
954 if let Ok(meta_members) = election_client.get_members().await {
955 meta_metrics
956 .worker_num
957 .with_label_values(&[WorkerType::Meta.as_str_name()])
958 .set(meta_members.len() as i64);
959 meta_members.into_iter().for_each(|m| {
960 let role = if m.is_leader { "leader" } else { "follower" };
961 meta_metrics
962 .meta_type
963 .with_label_values(&[m.id.as_str(), role])
964 .set(1);
965 });
966 }
967 }
968 });
969
970 (join_handle, shutdown_tx)
971}
972
973pub async fn refresh_fragment_info_metrics(
974 catalog_controller: &CatalogControllerRef,
975 cluster_controller: &ClusterControllerRef,
976 hummock_manager: &HummockManagerRef,
977 meta_metrics: Arc<MetaMetrics>,
978) {
979 let worker_nodes = match cluster_controller
980 .list_workers(Some(WorkerType::ComputeNode.into()), None)
981 .await
982 {
983 Ok(worker_nodes) => worker_nodes,
984 Err(err) => {
985 tracing::warn!(error=%err.as_report(), "fail to list worker node");
986 return;
987 }
988 };
989 let actor_locations = match catalog_controller.list_actor_locations().await {
990 Ok(actor_locations) => actor_locations,
991 Err(err) => {
992 tracing::warn!(error=%err.as_report(), "fail to get actor locations");
993 return;
994 }
995 };
996 let sink_actor_mapping = match catalog_controller.list_sink_actor_mapping().await {
997 Ok(sink_actor_mapping) => sink_actor_mapping,
998 Err(err) => {
999 tracing::warn!(error=%err.as_report(), "fail to get sink actor mapping");
1000 return;
1001 }
1002 };
1003 let fragment_state_tables = match catalog_controller.list_fragment_state_tables().await {
1004 Ok(fragment_state_tables) => fragment_state_tables,
1005 Err(err) => {
1006 tracing::warn!(error=%err.as_report(), "fail to get fragment state tables");
1007 return;
1008 }
1009 };
1010 let table_name_and_type_mapping = match catalog_controller.get_table_name_type_mapping().await {
1011 Ok(mapping) => mapping,
1012 Err(err) => {
1013 tracing::warn!(error=%err.as_report(), "fail to get table name mapping");
1014 return;
1015 }
1016 };
1017
1018 let worker_addr_mapping: HashMap<WorkerId, String> = worker_nodes
1019 .into_iter()
1020 .map(|worker_node| {
1021 let addr = match worker_node.host {
1022 Some(host) => format!("{}:{}", host.host, host.port),
1023 None => "".to_owned(),
1024 };
1025 (worker_node.id as WorkerId, addr)
1026 })
1027 .collect();
1028 let table_compaction_group_id_mapping = hummock_manager
1029 .get_table_compaction_group_id_mapping()
1030 .await;
1031
1032 meta_metrics.actor_info.reset();
1035 meta_metrics.table_info.reset();
1036 meta_metrics.sink_info.reset();
1037 for actor_location in actor_locations {
1038 let actor_id_str = actor_location.actor_id.to_string();
1039 let fragment_id_str = actor_location.fragment_id.to_string();
1040 if let Some(address) = worker_addr_mapping.get(&actor_location.worker_id) {
1043 meta_metrics
1044 .actor_info
1045 .with_label_values(&[&actor_id_str, &fragment_id_str, address])
1046 .set(1);
1047 }
1048 }
1049 for (sink_id, (sink_name, actor_ids)) in sink_actor_mapping {
1050 let sink_id_str = sink_id.to_string();
1051 for actor_id in actor_ids {
1052 let actor_id_str = actor_id.to_string();
1053 meta_metrics
1054 .sink_info
1055 .with_label_values(&[&actor_id_str, &sink_id_str, &sink_name])
1056 .set(1);
1057 }
1058 }
1059 for PartialFragmentStateTables {
1060 fragment_id,
1061 job_id,
1062 state_table_ids,
1063 } in fragment_state_tables
1064 {
1065 let fragment_id_str = fragment_id.to_string();
1066 let job_id_str = job_id.to_string();
1067 for table_id in state_table_ids.into_inner() {
1068 let table_id_str = table_id.to_string();
1069 let (table_name, table_type) = table_name_and_type_mapping
1070 .get(&table_id)
1071 .cloned()
1072 .unwrap_or_else(|| ("unknown".to_owned(), "unknown".to_owned()));
1073 let compaction_group_id = table_compaction_group_id_mapping
1074 .get(&(table_id as u32))
1075 .map(|cg_id| cg_id.to_string())
1076 .unwrap_or_else(|| "unknown".to_owned());
1077 meta_metrics
1078 .table_info
1079 .with_label_values(&[
1080 &job_id_str,
1081 &table_id_str,
1082 &fragment_id_str,
1083 &table_name,
1084 &table_type,
1085 &compaction_group_id,
1086 ])
1087 .set(1);
1088 }
1089 }
1090}
1091
1092pub async fn refresh_relation_info_metrics(
1093 catalog_controller: &CatalogControllerRef,
1094 meta_metrics: Arc<MetaMetrics>,
1095) {
1096 let table_objects = match catalog_controller.list_table_objects().await {
1097 Ok(table_objects) => table_objects,
1098 Err(err) => {
1099 tracing::warn!(error=%err.as_report(), "fail to get table objects");
1100 return;
1101 }
1102 };
1103
1104 let source_objects = match catalog_controller.list_source_objects().await {
1105 Ok(source_objects) => source_objects,
1106 Err(err) => {
1107 tracing::warn!(error=%err.as_report(), "fail to get source objects");
1108 return;
1109 }
1110 };
1111
1112 let sink_objects = match catalog_controller.list_sink_objects().await {
1113 Ok(sink_objects) => sink_objects,
1114 Err(err) => {
1115 tracing::warn!(error=%err.as_report(), "fail to get sink objects");
1116 return;
1117 }
1118 };
1119
1120 meta_metrics.relation_info.reset();
1121
1122 for (id, db, schema, name, resource_group) in table_objects {
1123 meta_metrics
1124 .relation_info
1125 .with_label_values(&[
1126 &id.to_string(),
1127 &db,
1128 &schema,
1129 &name,
1130 &resource_group,
1131 &"table".to_owned(),
1132 ])
1133 .set(1);
1134 }
1135
1136 for (id, db, schema, name, resource_group) in source_objects {
1137 meta_metrics
1138 .relation_info
1139 .with_label_values(&[
1140 &id.to_string(),
1141 &db,
1142 &schema,
1143 &name,
1144 &resource_group,
1145 &"source".to_owned(),
1146 ])
1147 .set(1);
1148 }
1149
1150 for (id, db, schema, name, resource_group) in sink_objects {
1151 meta_metrics
1152 .relation_info
1153 .with_label_values(&[
1154 &id.to_string(),
1155 &db,
1156 &schema,
1157 &name,
1158 &resource_group,
1159 &"sink".to_owned(),
1160 ])
1161 .set(1);
1162 }
1163}
1164
1165pub fn start_fragment_info_monitor(
1166 metadata_manager: MetadataManager,
1167 hummock_manager: HummockManagerRef,
1168 meta_metrics: Arc<MetaMetrics>,
1169) -> (JoinHandle<()>, Sender<()>) {
1170 const COLLECT_INTERVAL_SECONDS: u64 = 60;
1171
1172 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1173 let join_handle = tokio::spawn(async move {
1174 let mut monitor_interval =
1175 tokio::time::interval(Duration::from_secs(COLLECT_INTERVAL_SECONDS));
1176 monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1177 loop {
1178 tokio::select! {
1179 _ = monitor_interval.tick() => {},
1181 _ = &mut shutdown_rx => {
1183 tracing::info!("Fragment info monitor is stopped");
1184 return;
1185 }
1186 }
1187
1188 refresh_fragment_info_metrics(
1189 &metadata_manager.catalog_controller,
1190 &metadata_manager.cluster_controller,
1191 &hummock_manager,
1192 meta_metrics.clone(),
1193 )
1194 .await;
1195
1196 refresh_relation_info_metrics(
1197 &metadata_manager.catalog_controller,
1198 meta_metrics.clone(),
1199 )
1200 .await;
1201 }
1202 });
1203
1204 (join_handle, shutdown_tx)
1205}