risingwave_meta/rpc/
metrics.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::atomic::AtomicU64;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use prometheus::core::{AtomicF64, GenericGaugeVec};
21use prometheus::{
22    Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry, exponential_buckets,
23    histogram_opts, register_gauge_vec_with_registry, register_histogram_vec_with_registry,
24    register_histogram_with_registry, register_int_counter_vec_with_registry,
25    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
26};
27use risingwave_common::metrics::{
28    LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
29};
30use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
31use risingwave_common::{
32    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
33    register_guarded_int_gauge_vec_with_registry,
34};
35use risingwave_connector::source::monitor::EnumeratorMetrics as SourceEnumeratorMetrics;
36use risingwave_meta_model::WorkerId;
37use risingwave_object_store::object::object_metrics::{
38    GLOBAL_OBJECT_STORE_METRICS, ObjectStoreMetrics,
39};
40use risingwave_pb::common::WorkerType;
41use thiserror_ext::AsReport;
42use tokio::sync::oneshot::Sender;
43use tokio::task::JoinHandle;
44
45use crate::controller::catalog::CatalogControllerRef;
46use crate::controller::cluster::ClusterControllerRef;
47use crate::controller::utils::PartialFragmentStateTables;
48use crate::hummock::HummockManagerRef;
49use crate::manager::MetadataManager;
50use crate::rpc::ElectionClientRef;
51
52#[derive(Clone)]
53pub struct MetaMetrics {
54    // ********************************** Meta ************************************
55    /// The number of workers in the cluster.
56    pub worker_num: IntGaugeVec,
57    /// The roles of all meta nodes in the cluster.
58    pub meta_type: IntGaugeVec,
59
60    // ********************************** gRPC ************************************
61    /// gRPC latency of meta services
62    pub grpc_latency: HistogramVec,
63
64    // ********************************** Barrier ************************************
65    /// The duration from barrier injection to commit
66    /// It is the sum of inflight-latency, sync-latency and wait-commit-latency
67    pub barrier_latency: LabelGuardedHistogramVec,
68    /// The duration from barrier complete to commit
69    pub barrier_wait_commit_latency: Histogram,
70    /// Latency between each barrier send
71    pub barrier_send_latency: LabelGuardedHistogramVec,
72    /// The number of all barriers. It is the sum of barriers that are in-flight or completed but
73    /// waiting for other barriers
74    pub all_barrier_nums: LabelGuardedIntGaugeVec,
75    /// The number of in-flight barriers
76    pub in_flight_barrier_nums: LabelGuardedIntGaugeVec,
77    /// The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.
78    pub last_committed_barrier_time: IntGaugeVec,
79
80    // ********************************** Snapshot Backfill ***************************
81    /// The barrier latency in second of `table_id` and snapshto backfill `barrier_type`
82    pub snapshot_backfill_barrier_latency: LabelGuardedHistogramVec, // (table_id, barrier_type)
83    /// The latency of commit epoch of `table_id`
84    pub snapshot_backfill_wait_commit_latency: LabelGuardedHistogramVec, // (table_id, )
85    /// The lags between the upstream epoch and the downstream epoch.
86    pub snapshot_backfill_lag: LabelGuardedIntGaugeVec, // (table_id, )
87    /// The number of inflight barriers of `table_id`
88    pub snapshot_backfill_inflight_barrier_num: LabelGuardedIntGaugeVec, // (table_id, _)
89
90    // ********************************** Recovery ************************************
91    pub recovery_failure_cnt: IntCounterVec,
92    pub recovery_latency: HistogramVec,
93
94    // ********************************** Hummock ************************************
95    /// Max committed epoch
96    pub max_committed_epoch: IntGauge,
97    /// Min committed epoch
98    pub min_committed_epoch: IntGauge,
99    /// The number of SSTs in each level
100    pub level_sst_num: IntGaugeVec,
101    /// The number of SSTs to be merged to next level in each level
102    pub level_compact_cnt: IntGaugeVec,
103    /// The number of compact tasks
104    pub compact_frequency: IntCounterVec,
105    /// Size of each level
106    pub level_file_size: IntGaugeVec,
107    /// Hummock version size
108    pub version_size: IntGauge,
109    /// The version Id of current version.
110    pub current_version_id: IntGauge,
111    /// The version id of checkpoint version.
112    pub checkpoint_version_id: IntGauge,
113    /// The smallest version id that is being pinned by worker nodes.
114    pub min_pinned_version_id: IntGauge,
115    /// The smallest version id that is being guarded by meta node safe points.
116    pub min_safepoint_version_id: IntGauge,
117    /// Compaction groups that is in write stop state.
118    pub write_stop_compaction_groups: IntGaugeVec,
119    /// The number of attempts to trigger full GC.
120    pub full_gc_trigger_count: IntGauge,
121    /// The number of candidate object to delete after scanning object store.
122    pub full_gc_candidate_object_count: Histogram,
123    /// The number of object to delete after filtering by meta node.
124    pub full_gc_selected_object_count: Histogram,
125    /// Hummock version stats
126    pub version_stats: IntGaugeVec,
127    /// Hummock version stats
128    pub materialized_view_stats: IntGaugeVec,
129    /// Total number of objects that is no longer referenced by versions.
130    pub stale_object_count: IntGauge,
131    /// Total size of objects that is no longer referenced by versions.
132    pub stale_object_size: IntGauge,
133    /// Total number of objects that is still referenced by non-current versions.
134    pub old_version_object_count: IntGauge,
135    /// Total size of objects that is still referenced by non-current versions.
136    pub old_version_object_size: IntGauge,
137    /// Total number of objects that is referenced by time travel.
138    pub time_travel_object_count: IntGauge,
139    /// Total number of objects that is referenced by current version.
140    pub current_version_object_count: IntGauge,
141    /// Total size of objects that is referenced by current version.
142    pub current_version_object_size: IntGauge,
143    /// Total number of objects that includes dangling objects.
144    pub total_object_count: IntGauge,
145    /// Total size of objects that includes dangling objects.
146    pub total_object_size: IntGauge,
147    /// Number of objects per table change log.
148    pub table_change_log_object_count: IntGaugeVec,
149    /// Size of objects per table change log.
150    pub table_change_log_object_size: IntGaugeVec,
151    /// The number of hummock version delta log.
152    pub delta_log_count: IntGauge,
153    /// latency of version checkpoint
154    pub version_checkpoint_latency: Histogram,
155    /// Latency for hummock manager to acquire lock
156    pub hummock_manager_lock_time: HistogramVec,
157    /// Latency for hummock manager to really process a request after acquire the lock
158    pub hummock_manager_real_process_time: HistogramVec,
159    /// The number of compactions from one level to another level that have been skipped
160    pub compact_skip_frequency: IntCounterVec,
161    /// Bytes of lsm tree needed to reach balance
162    pub compact_pending_bytes: IntGaugeVec,
163    /// Per level compression ratio
164    pub compact_level_compression_ratio: GenericGaugeVec<AtomicF64>,
165    /// Per level number of running compaction task
166    pub level_compact_task_cnt: IntGaugeVec,
167    pub time_after_last_observation: Arc<AtomicU64>,
168    pub l0_compact_level_count: HistogramVec,
169    pub compact_task_size: HistogramVec,
170    pub compact_task_file_count: HistogramVec,
171    pub compact_task_batch_count: HistogramVec,
172    pub split_compaction_group_count: IntCounterVec,
173    pub state_table_count: IntGaugeVec,
174    pub branched_sst_count: IntGaugeVec,
175    pub compact_task_trivial_move_sst_count: HistogramVec,
176
177    pub compaction_event_consumed_latency: Histogram,
178    pub compaction_event_loop_iteration_latency: Histogram,
179
180    // ********************************** Object Store ************************************
181    // Object store related metrics (for backup/restore and version checkpoint)
182    pub object_store_metric: Arc<ObjectStoreMetrics>,
183
184    // ********************************** Source ************************************
185    /// supervisor for which source is still up.
186    pub source_is_up: LabelGuardedIntGaugeVec,
187    pub source_enumerator_metrics: Arc<SourceEnumeratorMetrics>,
188
189    // ********************************** Fragment ************************************
190    /// A dummy gauge metrics with its label to be the mapping from actor id to fragment id
191    pub actor_info: IntGaugeVec,
192    /// A dummy gauge metrics with its label to be the mapping from table id to actor id
193    pub table_info: IntGaugeVec,
194    /// A dummy gauge metrics with its label to be the mapping from actor id to sink id
195    pub sink_info: IntGaugeVec,
196    /// A dummy gauge metrics with its label to be relation info
197    pub relation_info: IntGaugeVec,
198
199    /// Write throughput of commit epoch for each stable
200    pub table_write_throughput: IntCounterVec,
201
202    /// The number of compaction groups that have been triggered to move
203    pub merge_compaction_group_count: IntCounterVec,
204
205    // ********************************** Auto Schema Change ************************************
206    pub auto_schema_change_failure_cnt: LabelGuardedIntCounterVec,
207    pub auto_schema_change_success_cnt: LabelGuardedIntCounterVec,
208    pub auto_schema_change_latency: LabelGuardedHistogramVec,
209
210    pub time_travel_version_replay_latency: Histogram,
211
212    pub compaction_group_count: IntGauge,
213    pub compaction_group_size: IntGaugeVec,
214    pub compaction_group_file_count: IntGaugeVec,
215    pub compaction_group_throughput: IntGaugeVec,
216}
217
218pub static GLOBAL_META_METRICS: LazyLock<MetaMetrics> =
219    LazyLock::new(|| MetaMetrics::new(&GLOBAL_METRICS_REGISTRY));
220
221impl MetaMetrics {
222    fn new(registry: &Registry) -> Self {
223        let opts = histogram_opts!(
224            "meta_grpc_duration_seconds",
225            "gRPC latency of meta services",
226            exponential_buckets(0.0001, 2.0, 20).unwrap() // max 52s
227        );
228        let grpc_latency =
229            register_histogram_vec_with_registry!(opts, &["path"], registry).unwrap();
230
231        let opts = histogram_opts!(
232            "meta_barrier_duration_seconds",
233            "barrier latency",
234            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
235        );
236        let barrier_latency =
237            register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
238                .unwrap();
239
240        let opts = histogram_opts!(
241            "meta_barrier_wait_commit_duration_seconds",
242            "barrier_wait_commit_latency",
243            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
244        );
245        let barrier_wait_commit_latency =
246            register_histogram_with_registry!(opts, registry).unwrap();
247
248        let opts = histogram_opts!(
249            "meta_barrier_send_duration_seconds",
250            "barrier send latency",
251            exponential_buckets(0.1, 1.5, 19).unwrap() // max 148s
252        );
253        let barrier_send_latency =
254            register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
255                .unwrap();
256
257        let all_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
258            "all_barrier_nums",
259            "num of of all_barrier",
260            &["database_id"],
261            registry
262        )
263        .unwrap();
264        let in_flight_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
265            "in_flight_barrier_nums",
266            "num of of in_flight_barrier",
267            &["database_id"],
268            registry
269        )
270        .unwrap();
271        let last_committed_barrier_time = register_int_gauge_vec_with_registry!(
272            "last_committed_barrier_time",
273            "The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.",
274            &["database_id"],
275            registry
276        )
277        .unwrap();
278
279        // snapshot backfill metrics
280        let opts = histogram_opts!(
281            "meta_snapshot_backfill_barrier_duration_seconds",
282            "snapshot backfill barrier latency",
283            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
284        );
285        let snapshot_backfill_barrier_latency = register_guarded_histogram_vec_with_registry!(
286            opts,
287            &["table_id", "barrier_type"],
288            registry
289        )
290        .unwrap();
291        let opts = histogram_opts!(
292            "meta_snapshot_backfill_barrier_wait_commit_duration_seconds",
293            "snapshot backfill barrier_wait_commit_latency",
294            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
295        );
296        let snapshot_backfill_wait_commit_latency =
297            register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
298
299        let snapshot_backfill_lag = register_guarded_int_gauge_vec_with_registry!(
300            "meta_snapshot_backfill_upstream_lag",
301            "snapshot backfill upstream_lag",
302            &["table_id"],
303            registry
304        )
305        .unwrap();
306        let snapshot_backfill_inflight_barrier_num = register_guarded_int_gauge_vec_with_registry!(
307            "meta_snapshot_backfill_inflight_barrier_num",
308            "snapshot backfill inflight_barrier_num",
309            &["table_id"],
310            registry
311        )
312        .unwrap();
313
314        let max_committed_epoch = register_int_gauge_with_registry!(
315            "storage_max_committed_epoch",
316            "max committed epoch",
317            registry
318        )
319        .unwrap();
320
321        let min_committed_epoch = register_int_gauge_with_registry!(
322            "storage_min_committed_epoch",
323            "min committed epoch",
324            registry
325        )
326        .unwrap();
327
328        let level_sst_num = register_int_gauge_vec_with_registry!(
329            "storage_level_sst_num",
330            "num of SSTs in each level",
331            &["level_index"],
332            registry
333        )
334        .unwrap();
335
336        let level_compact_cnt = register_int_gauge_vec_with_registry!(
337            "storage_level_compact_cnt",
338            "num of SSTs to be merged to next level in each level",
339            &["level_index"],
340            registry
341        )
342        .unwrap();
343
344        let compact_frequency = register_int_counter_vec_with_registry!(
345            "storage_level_compact_frequency",
346            "The number of compactions from one level to another level that have completed or failed.",
347            &["compactor", "group", "task_type", "result"],
348            registry
349        )
350        .unwrap();
351        let compact_skip_frequency = register_int_counter_vec_with_registry!(
352            "storage_skip_compact_frequency",
353            "The number of compactions from one level to another level that have been skipped.",
354            &["level", "type"],
355            registry
356        )
357        .unwrap();
358
359        let version_size =
360            register_int_gauge_with_registry!("storage_version_size", "version size", registry)
361                .unwrap();
362
363        let current_version_id = register_int_gauge_with_registry!(
364            "storage_current_version_id",
365            "current version id",
366            registry
367        )
368        .unwrap();
369
370        let checkpoint_version_id = register_int_gauge_with_registry!(
371            "storage_checkpoint_version_id",
372            "checkpoint version id",
373            registry
374        )
375        .unwrap();
376
377        let min_pinned_version_id = register_int_gauge_with_registry!(
378            "storage_min_pinned_version_id",
379            "min pinned version id",
380            registry
381        )
382        .unwrap();
383
384        let write_stop_compaction_groups = register_int_gauge_vec_with_registry!(
385            "storage_write_stop_compaction_groups",
386            "compaction groups of write stop state",
387            &["compaction_group_id"],
388            registry
389        )
390        .unwrap();
391
392        let full_gc_trigger_count = register_int_gauge_with_registry!(
393            "storage_full_gc_trigger_count",
394            "the number of attempts to trigger full GC",
395            registry
396        )
397        .unwrap();
398
399        let opts = histogram_opts!(
400            "storage_full_gc_candidate_object_count",
401            "the number of candidate object to delete after scanning object store",
402            exponential_buckets(1.0, 10.0, 6).unwrap()
403        );
404        let full_gc_candidate_object_count =
405            register_histogram_with_registry!(opts, registry).unwrap();
406
407        let opts = histogram_opts!(
408            "storage_full_gc_selected_object_count",
409            "the number of object to delete after filtering by meta node",
410            exponential_buckets(1.0, 10.0, 6).unwrap()
411        );
412        let full_gc_selected_object_count =
413            register_histogram_with_registry!(opts, registry).unwrap();
414
415        let min_safepoint_version_id = register_int_gauge_with_registry!(
416            "storage_min_safepoint_version_id",
417            "min safepoint version id",
418            registry
419        )
420        .unwrap();
421
422        let level_file_size = register_int_gauge_vec_with_registry!(
423            "storage_level_total_file_size",
424            "KBs total file bytes in each level",
425            &["level_index"],
426            registry
427        )
428        .unwrap();
429
430        let version_stats = register_int_gauge_vec_with_registry!(
431            "storage_version_stats",
432            "per table stats in current hummock version",
433            &["table_id", "metric"],
434            registry
435        )
436        .unwrap();
437
438        let materialized_view_stats = register_int_gauge_vec_with_registry!(
439            "storage_materialized_view_stats",
440            "per materialized view stats in current hummock version",
441            &["table_id", "metric"],
442            registry
443        )
444        .unwrap();
445
446        let stale_object_count = register_int_gauge_with_registry!(
447            "storage_stale_object_count",
448            "total number of objects that is no longer referenced by versions.",
449            registry
450        )
451        .unwrap();
452
453        let stale_object_size = register_int_gauge_with_registry!(
454            "storage_stale_object_size",
455            "total size of objects that is no longer referenced by versions.",
456            registry
457        )
458        .unwrap();
459
460        let old_version_object_count = register_int_gauge_with_registry!(
461            "storage_old_version_object_count",
462            "total number of objects that is still referenced by non-current versions",
463            registry
464        )
465        .unwrap();
466
467        let old_version_object_size = register_int_gauge_with_registry!(
468            "storage_old_version_object_size",
469            "total size of objects that is still referenced by non-current versions",
470            registry
471        )
472        .unwrap();
473
474        let current_version_object_count = register_int_gauge_with_registry!(
475            "storage_current_version_object_count",
476            "total number of objects that is referenced by current version",
477            registry
478        )
479        .unwrap();
480
481        let current_version_object_size = register_int_gauge_with_registry!(
482            "storage_current_version_object_size",
483            "total size of objects that is referenced by current version",
484            registry
485        )
486        .unwrap();
487
488        let total_object_count = register_int_gauge_with_registry!(
489            "storage_total_object_count",
490            "Total number of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
491            registry
492        ).unwrap();
493
494        let total_object_size = register_int_gauge_with_registry!(
495            "storage_total_object_size",
496            "Total size of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
497            registry
498        ).unwrap();
499
500        let table_change_log_object_count = register_int_gauge_vec_with_registry!(
501            "storage_table_change_log_object_count",
502            "per table change log object count",
503            &["table_id"],
504            registry
505        )
506        .unwrap();
507
508        let table_change_log_object_size = register_int_gauge_vec_with_registry!(
509            "storage_table_change_log_object_size",
510            "per table change log object size",
511            &["table_id"],
512            registry
513        )
514        .unwrap();
515
516        let time_travel_object_count = register_int_gauge_with_registry!(
517            "storage_time_travel_object_count",
518            "total number of objects that is referenced by time travel.",
519            registry
520        )
521        .unwrap();
522
523        let delta_log_count = register_int_gauge_with_registry!(
524            "storage_delta_log_count",
525            "total number of hummock version delta log",
526            registry
527        )
528        .unwrap();
529
530        let opts = histogram_opts!(
531            "storage_version_checkpoint_latency",
532            "hummock version checkpoint latency",
533            exponential_buckets(0.1, 1.5, 20).unwrap()
534        );
535        let version_checkpoint_latency = register_histogram_with_registry!(opts, registry).unwrap();
536
537        let hummock_manager_lock_time = register_histogram_vec_with_registry!(
538            "hummock_manager_lock_time",
539            "latency for hummock manager to acquire the rwlock",
540            &["lock_name", "lock_type"],
541            registry
542        )
543        .unwrap();
544
545        let hummock_manager_real_process_time = register_histogram_vec_with_registry!(
546            "meta_hummock_manager_real_process_time",
547            "latency for hummock manager to really process the request",
548            &["method"],
549            registry
550        )
551        .unwrap();
552
553        let worker_num = register_int_gauge_vec_with_registry!(
554            "worker_num",
555            "number of nodes in the cluster",
556            &["worker_type"],
557            registry,
558        )
559        .unwrap();
560
561        let meta_type = register_int_gauge_vec_with_registry!(
562            "meta_num",
563            "role of meta nodes in the cluster",
564            &["worker_addr", "role"],
565            registry,
566        )
567        .unwrap();
568
569        let compact_pending_bytes = register_int_gauge_vec_with_registry!(
570            "storage_compact_pending_bytes",
571            "bytes of lsm tree needed to reach balance",
572            &["group"],
573            registry
574        )
575        .unwrap();
576
577        let compact_level_compression_ratio = register_gauge_vec_with_registry!(
578            "storage_compact_level_compression_ratio",
579            "compression ratio of each level of the lsm tree",
580            &["group", "level", "algorithm"],
581            registry
582        )
583        .unwrap();
584
585        let level_compact_task_cnt = register_int_gauge_vec_with_registry!(
586            "storage_level_compact_task_cnt",
587            "num of compact_task organized by group and level",
588            &["task"],
589            registry
590        )
591        .unwrap();
592        let object_store_metric = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
593
594        let recovery_failure_cnt = register_int_counter_vec_with_registry!(
595            "recovery_failure_cnt",
596            "Number of failed recovery attempts",
597            &["recovery_type"],
598            registry
599        )
600        .unwrap();
601        let opts = histogram_opts!(
602            "recovery_latency",
603            "Latency of the recovery process",
604            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
605        );
606        let recovery_latency =
607            register_histogram_vec_with_registry!(opts, &["recovery_type"], registry).unwrap();
608
609        let auto_schema_change_failure_cnt = register_guarded_int_counter_vec_with_registry!(
610            "auto_schema_change_failure_cnt",
611            "Number of failed auto schema change",
612            &["table_id", "table_name"],
613            registry
614        )
615        .unwrap();
616
617        let auto_schema_change_success_cnt = register_guarded_int_counter_vec_with_registry!(
618            "auto_schema_change_success_cnt",
619            "Number of success auto schema change",
620            &["table_id", "table_name"],
621            registry
622        )
623        .unwrap();
624
625        let opts = histogram_opts!(
626            "auto_schema_change_latency",
627            "Latency of the auto schema change process",
628            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
629        );
630        let auto_schema_change_latency = register_guarded_histogram_vec_with_registry!(
631            opts,
632            &["table_id", "table_name"],
633            registry
634        )
635        .unwrap();
636
637        let source_is_up = register_guarded_int_gauge_vec_with_registry!(
638            "source_status_is_up",
639            "source is up or not",
640            &["source_id", "source_name"],
641            registry
642        )
643        .unwrap();
644        let source_enumerator_metrics = Arc::new(SourceEnumeratorMetrics::default());
645
646        let actor_info = register_int_gauge_vec_with_registry!(
647            "actor_info",
648            "Mapping from actor id to (fragment id, compute node)",
649            &["actor_id", "fragment_id", "compute_node"],
650            registry
651        )
652        .unwrap();
653
654        let table_info = register_int_gauge_vec_with_registry!(
655            "table_info",
656            "Mapping from table id to (actor id, table name)",
657            &[
658                "materialized_view_id",
659                "table_id",
660                "fragment_id",
661                "table_name",
662                "table_type",
663                "compaction_group_id"
664            ],
665            registry
666        )
667        .unwrap();
668
669        let sink_info = register_int_gauge_vec_with_registry!(
670            "sink_info",
671            "Mapping from actor id to (actor id, sink name)",
672            &["actor_id", "sink_id", "sink_name",],
673            registry
674        )
675        .unwrap();
676
677        let relation_info = register_int_gauge_vec_with_registry!(
678            "relation_info",
679            "Information of the database relation (table/source/sink)",
680            &["id", "database", "schema", "name", "resource_group", "type"],
681            registry
682        )
683        .unwrap();
684
685        let l0_compact_level_count = register_histogram_vec_with_registry!(
686            "storage_l0_compact_level_count",
687            "level_count of l0 compact task",
688            &["group", "type"],
689            registry
690        )
691        .unwrap();
692
693        let opts = histogram_opts!(
694            "storage_compact_task_size",
695            "Total size of compact that have been issued to state store",
696            exponential_buckets(1048576.0, 2.0, 16).unwrap()
697        );
698
699        let compact_task_size =
700            register_histogram_vec_with_registry!(opts, &["group", "type"], registry).unwrap();
701
702        let compact_task_file_count = register_histogram_vec_with_registry!(
703            "storage_compact_task_file_count",
704            "file count of compact task",
705            &["group", "type"],
706            registry
707        )
708        .unwrap();
709        let opts = histogram_opts!(
710            "storage_compact_task_batch_count",
711            "count of compact task batch",
712            exponential_buckets(1.0, 2.0, 8).unwrap()
713        );
714        let compact_task_batch_count =
715            register_histogram_vec_with_registry!(opts, &["type"], registry).unwrap();
716
717        let table_write_throughput = register_int_counter_vec_with_registry!(
718            "storage_commit_write_throughput",
719            "The number of compactions from one level to another level that have been skipped.",
720            &["table_id"],
721            registry
722        )
723        .unwrap();
724
725        let split_compaction_group_count = register_int_counter_vec_with_registry!(
726            "storage_split_compaction_group_count",
727            "Count of trigger split compaction group",
728            &["group"],
729            registry
730        )
731        .unwrap();
732
733        let state_table_count = register_int_gauge_vec_with_registry!(
734            "storage_state_table_count",
735            "Count of stable table per compaction group",
736            &["group"],
737            registry
738        )
739        .unwrap();
740
741        let branched_sst_count = register_int_gauge_vec_with_registry!(
742            "storage_branched_sst_count",
743            "Count of branched sst per compaction group",
744            &["group"],
745            registry
746        )
747        .unwrap();
748
749        let opts = histogram_opts!(
750            "storage_compaction_event_consumed_latency",
751            "The latency(ms) of each event being consumed",
752            exponential_buckets(1.0, 1.5, 30).unwrap() // max 191s
753        );
754        let compaction_event_consumed_latency =
755            register_histogram_with_registry!(opts, registry).unwrap();
756
757        let opts = histogram_opts!(
758            "storage_compaction_event_loop_iteration_latency",
759            "The latency(ms) of each iteration of the compaction event loop",
760            exponential_buckets(1.0, 1.5, 30).unwrap() // max 191s
761        );
762        let compaction_event_loop_iteration_latency =
763            register_histogram_with_registry!(opts, registry).unwrap();
764
765        let merge_compaction_group_count = register_int_counter_vec_with_registry!(
766            "storage_merge_compaction_group_count",
767            "Count of trigger merge compaction group",
768            &["group"],
769            registry
770        )
771        .unwrap();
772
773        let opts = histogram_opts!(
774            "storage_time_travel_version_replay_latency",
775            "The latency(ms) of replaying a hummock version for time travel",
776            exponential_buckets(0.01, 10.0, 6).unwrap()
777        );
778        let time_travel_version_replay_latency =
779            register_histogram_with_registry!(opts, registry).unwrap();
780
781        let compaction_group_count = register_int_gauge_with_registry!(
782            "storage_compaction_group_count",
783            "The number of compaction groups",
784            registry,
785        )
786        .unwrap();
787
788        let compaction_group_size = register_int_gauge_vec_with_registry!(
789            "storage_compaction_group_size",
790            "The size of compaction group",
791            &["group"],
792            registry
793        )
794        .unwrap();
795
796        let compaction_group_file_count = register_int_gauge_vec_with_registry!(
797            "storage_compaction_group_file_count",
798            "The file count of compaction group",
799            &["group"],
800            registry
801        )
802        .unwrap();
803
804        let compaction_group_throughput = register_int_gauge_vec_with_registry!(
805            "storage_compaction_group_throughput",
806            "The throughput of compaction group",
807            &["group"],
808            registry
809        )
810        .unwrap();
811
812        let opts = histogram_opts!(
813            "storage_compact_task_trivial_move_sst_count",
814            "sst count of compact trivial-move task",
815            exponential_buckets(1.0, 2.0, 8).unwrap()
816        );
817        let compact_task_trivial_move_sst_count =
818            register_histogram_vec_with_registry!(opts, &["group"], registry).unwrap();
819
820        Self {
821            grpc_latency,
822            barrier_latency,
823            barrier_wait_commit_latency,
824            barrier_send_latency,
825            all_barrier_nums,
826            in_flight_barrier_nums,
827            last_committed_barrier_time,
828            snapshot_backfill_barrier_latency,
829            snapshot_backfill_wait_commit_latency,
830            snapshot_backfill_lag,
831            snapshot_backfill_inflight_barrier_num,
832            recovery_failure_cnt,
833            recovery_latency,
834
835            max_committed_epoch,
836            min_committed_epoch,
837            level_sst_num,
838            level_compact_cnt,
839            compact_frequency,
840            compact_skip_frequency,
841            level_file_size,
842            version_size,
843            version_stats,
844            materialized_view_stats,
845            stale_object_count,
846            stale_object_size,
847            old_version_object_count,
848            old_version_object_size,
849            time_travel_object_count,
850            current_version_object_count,
851            current_version_object_size,
852            total_object_count,
853            total_object_size,
854            table_change_log_object_count,
855            table_change_log_object_size,
856            delta_log_count,
857            version_checkpoint_latency,
858            current_version_id,
859            checkpoint_version_id,
860            min_pinned_version_id,
861            min_safepoint_version_id,
862            write_stop_compaction_groups,
863            full_gc_trigger_count,
864            full_gc_candidate_object_count,
865            full_gc_selected_object_count,
866            hummock_manager_lock_time,
867            hummock_manager_real_process_time,
868            time_after_last_observation: Arc::new(AtomicU64::new(0)),
869            worker_num,
870            meta_type,
871            compact_pending_bytes,
872            compact_level_compression_ratio,
873            level_compact_task_cnt,
874            object_store_metric,
875            source_is_up,
876            source_enumerator_metrics,
877            actor_info,
878            table_info,
879            sink_info,
880            relation_info,
881            l0_compact_level_count,
882            compact_task_size,
883            compact_task_file_count,
884            compact_task_batch_count,
885            compact_task_trivial_move_sst_count,
886            table_write_throughput,
887            split_compaction_group_count,
888            state_table_count,
889            branched_sst_count,
890            compaction_event_consumed_latency,
891            compaction_event_loop_iteration_latency,
892            auto_schema_change_failure_cnt,
893            auto_schema_change_success_cnt,
894            auto_schema_change_latency,
895            merge_compaction_group_count,
896            time_travel_version_replay_latency,
897            compaction_group_count,
898            compaction_group_size,
899            compaction_group_file_count,
900            compaction_group_throughput,
901        }
902    }
903
904    #[cfg(test)]
905    pub fn for_test(registry: &Registry) -> Self {
906        Self::new(registry)
907    }
908}
909impl Default for MetaMetrics {
910    fn default() -> Self {
911        GLOBAL_META_METRICS.clone()
912    }
913}
914
915pub fn start_worker_info_monitor(
916    metadata_manager: MetadataManager,
917    election_client: ElectionClientRef,
918    interval: Duration,
919    meta_metrics: Arc<MetaMetrics>,
920) -> (JoinHandle<()>, Sender<()>) {
921    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
922    let join_handle = tokio::spawn(async move {
923        let mut monitor_interval = tokio::time::interval(interval);
924        monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
925        loop {
926            tokio::select! {
927                // Wait for interval
928                _ = monitor_interval.tick() => {},
929                // Shutdown monitor
930                _ = &mut shutdown_rx => {
931                    tracing::info!("Worker number monitor is stopped");
932                    return;
933                }
934            }
935
936            let node_map = match metadata_manager.count_worker_node().await {
937                Ok(node_map) => node_map,
938                Err(err) => {
939                    tracing::warn!(error = %err.as_report(), "fail to count worker node");
940                    continue;
941                }
942            };
943
944            // Reset metrics to clean the stale labels e.g. invalid lease ids
945            meta_metrics.worker_num.reset();
946            meta_metrics.meta_type.reset();
947
948            for (worker_type, worker_num) in node_map {
949                meta_metrics
950                    .worker_num
951                    .with_label_values(&[(worker_type.as_str_name())])
952                    .set(worker_num as i64);
953            }
954            if let Ok(meta_members) = election_client.get_members().await {
955                meta_metrics
956                    .worker_num
957                    .with_label_values(&[WorkerType::Meta.as_str_name()])
958                    .set(meta_members.len() as i64);
959                meta_members.into_iter().for_each(|m| {
960                    let role = if m.is_leader { "leader" } else { "follower" };
961                    meta_metrics
962                        .meta_type
963                        .with_label_values(&[m.id.as_str(), role])
964                        .set(1);
965                });
966            }
967        }
968    });
969
970    (join_handle, shutdown_tx)
971}
972
973pub async fn refresh_fragment_info_metrics(
974    catalog_controller: &CatalogControllerRef,
975    cluster_controller: &ClusterControllerRef,
976    hummock_manager: &HummockManagerRef,
977    meta_metrics: Arc<MetaMetrics>,
978) {
979    let worker_nodes = match cluster_controller
980        .list_workers(Some(WorkerType::ComputeNode.into()), None)
981        .await
982    {
983        Ok(worker_nodes) => worker_nodes,
984        Err(err) => {
985            tracing::warn!(error=%err.as_report(), "fail to list worker node");
986            return;
987        }
988    };
989    let actor_locations = match catalog_controller.list_actor_locations().await {
990        Ok(actor_locations) => actor_locations,
991        Err(err) => {
992            tracing::warn!(error=%err.as_report(), "fail to get actor locations");
993            return;
994        }
995    };
996    let sink_actor_mapping = match catalog_controller.list_sink_actor_mapping().await {
997        Ok(sink_actor_mapping) => sink_actor_mapping,
998        Err(err) => {
999            tracing::warn!(error=%err.as_report(), "fail to get sink actor mapping");
1000            return;
1001        }
1002    };
1003    let fragment_state_tables = match catalog_controller.list_fragment_state_tables().await {
1004        Ok(fragment_state_tables) => fragment_state_tables,
1005        Err(err) => {
1006            tracing::warn!(error=%err.as_report(), "fail to get fragment state tables");
1007            return;
1008        }
1009    };
1010    let table_name_and_type_mapping = match catalog_controller.get_table_name_type_mapping().await {
1011        Ok(mapping) => mapping,
1012        Err(err) => {
1013            tracing::warn!(error=%err.as_report(), "fail to get table name mapping");
1014            return;
1015        }
1016    };
1017
1018    let worker_addr_mapping: HashMap<WorkerId, String> = worker_nodes
1019        .into_iter()
1020        .map(|worker_node| {
1021            let addr = match worker_node.host {
1022                Some(host) => format!("{}:{}", host.host, host.port),
1023                None => "".to_owned(),
1024            };
1025            (worker_node.id as WorkerId, addr)
1026        })
1027        .collect();
1028    let table_compaction_group_id_mapping = hummock_manager
1029        .get_table_compaction_group_id_mapping()
1030        .await;
1031
1032    // Start fresh with a reset to clear all outdated labels. This is safe since we always
1033    // report full info on each interval.
1034    meta_metrics.actor_info.reset();
1035    meta_metrics.table_info.reset();
1036    meta_metrics.sink_info.reset();
1037    for actor_location in actor_locations {
1038        let actor_id_str = actor_location.actor_id.to_string();
1039        let fragment_id_str = actor_location.fragment_id.to_string();
1040        // Report a dummy gauge metrics with (fragment id, actor id, node
1041        // address) as its label
1042        if let Some(address) = worker_addr_mapping.get(&actor_location.worker_id) {
1043            meta_metrics
1044                .actor_info
1045                .with_label_values(&[&actor_id_str, &fragment_id_str, address])
1046                .set(1);
1047        }
1048    }
1049    for (sink_id, (sink_name, actor_ids)) in sink_actor_mapping {
1050        let sink_id_str = sink_id.to_string();
1051        for actor_id in actor_ids {
1052            let actor_id_str = actor_id.to_string();
1053            meta_metrics
1054                .sink_info
1055                .with_label_values(&[&actor_id_str, &sink_id_str, &sink_name])
1056                .set(1);
1057        }
1058    }
1059    for PartialFragmentStateTables {
1060        fragment_id,
1061        job_id,
1062        state_table_ids,
1063    } in fragment_state_tables
1064    {
1065        let fragment_id_str = fragment_id.to_string();
1066        let job_id_str = job_id.to_string();
1067        for table_id in state_table_ids.into_inner() {
1068            let table_id_str = table_id.to_string();
1069            let (table_name, table_type) = table_name_and_type_mapping
1070                .get(&table_id)
1071                .cloned()
1072                .unwrap_or_else(|| ("unknown".to_owned(), "unknown".to_owned()));
1073            let compaction_group_id = table_compaction_group_id_mapping
1074                .get(&(table_id as u32))
1075                .map(|cg_id| cg_id.to_string())
1076                .unwrap_or_else(|| "unknown".to_owned());
1077            meta_metrics
1078                .table_info
1079                .with_label_values(&[
1080                    &job_id_str,
1081                    &table_id_str,
1082                    &fragment_id_str,
1083                    &table_name,
1084                    &table_type,
1085                    &compaction_group_id,
1086                ])
1087                .set(1);
1088        }
1089    }
1090}
1091
1092pub async fn refresh_relation_info_metrics(
1093    catalog_controller: &CatalogControllerRef,
1094    meta_metrics: Arc<MetaMetrics>,
1095) {
1096    let table_objects = match catalog_controller.list_table_objects().await {
1097        Ok(table_objects) => table_objects,
1098        Err(err) => {
1099            tracing::warn!(error=%err.as_report(), "fail to get table objects");
1100            return;
1101        }
1102    };
1103
1104    let source_objects = match catalog_controller.list_source_objects().await {
1105        Ok(source_objects) => source_objects,
1106        Err(err) => {
1107            tracing::warn!(error=%err.as_report(), "fail to get source objects");
1108            return;
1109        }
1110    };
1111
1112    let sink_objects = match catalog_controller.list_sink_objects().await {
1113        Ok(sink_objects) => sink_objects,
1114        Err(err) => {
1115            tracing::warn!(error=%err.as_report(), "fail to get sink objects");
1116            return;
1117        }
1118    };
1119
1120    meta_metrics.relation_info.reset();
1121
1122    for (id, db, schema, name, resource_group) in table_objects {
1123        meta_metrics
1124            .relation_info
1125            .with_label_values(&[
1126                &id.to_string(),
1127                &db,
1128                &schema,
1129                &name,
1130                &resource_group,
1131                &"table".to_owned(),
1132            ])
1133            .set(1);
1134    }
1135
1136    for (id, db, schema, name, resource_group) in source_objects {
1137        meta_metrics
1138            .relation_info
1139            .with_label_values(&[
1140                &id.to_string(),
1141                &db,
1142                &schema,
1143                &name,
1144                &resource_group,
1145                &"source".to_owned(),
1146            ])
1147            .set(1);
1148    }
1149
1150    for (id, db, schema, name, resource_group) in sink_objects {
1151        meta_metrics
1152            .relation_info
1153            .with_label_values(&[
1154                &id.to_string(),
1155                &db,
1156                &schema,
1157                &name,
1158                &resource_group,
1159                &"sink".to_owned(),
1160            ])
1161            .set(1);
1162    }
1163}
1164
1165pub fn start_fragment_info_monitor(
1166    metadata_manager: MetadataManager,
1167    hummock_manager: HummockManagerRef,
1168    meta_metrics: Arc<MetaMetrics>,
1169) -> (JoinHandle<()>, Sender<()>) {
1170    const COLLECT_INTERVAL_SECONDS: u64 = 60;
1171
1172    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1173    let join_handle = tokio::spawn(async move {
1174        let mut monitor_interval =
1175            tokio::time::interval(Duration::from_secs(COLLECT_INTERVAL_SECONDS));
1176        monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1177        loop {
1178            tokio::select! {
1179                // Wait for interval
1180                _ = monitor_interval.tick() => {},
1181                // Shutdown monitor
1182                _ = &mut shutdown_rx => {
1183                    tracing::info!("Fragment info monitor is stopped");
1184                    return;
1185                }
1186            }
1187
1188            refresh_fragment_info_metrics(
1189                &metadata_manager.catalog_controller,
1190                &metadata_manager.cluster_controller,
1191                &hummock_manager,
1192                meta_metrics.clone(),
1193            )
1194            .await;
1195
1196            refresh_relation_info_metrics(
1197                &metadata_manager.catalog_controller,
1198                meta_metrics.clone(),
1199            )
1200            .await;
1201        }
1202    });
1203
1204    (join_handle, shutdown_tx)
1205}