risingwave_meta/rpc/
metrics.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::atomic::AtomicU64;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use prometheus::core::{AtomicF64, GenericGaugeVec};
21use prometheus::{
22    Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry, exponential_buckets,
23    histogram_opts, register_gauge_vec_with_registry, register_histogram_vec_with_registry,
24    register_histogram_with_registry, register_int_counter_vec_with_registry,
25    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
26};
27use risingwave_common::metrics::{
28    LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
29};
30use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
31use risingwave_common::{
32    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
33    register_guarded_int_gauge_vec_with_registry,
34};
35use risingwave_connector::source::monitor::EnumeratorMetrics as SourceEnumeratorMetrics;
36use risingwave_meta_model::WorkerId;
37use risingwave_object_store::object::object_metrics::{
38    GLOBAL_OBJECT_STORE_METRICS, ObjectStoreMetrics,
39};
40use risingwave_pb::common::WorkerType;
41use thiserror_ext::AsReport;
42use tokio::sync::oneshot::Sender;
43use tokio::task::JoinHandle;
44
45use crate::controller::catalog::CatalogControllerRef;
46use crate::controller::cluster::ClusterControllerRef;
47use crate::controller::utils::PartialFragmentStateTables;
48use crate::hummock::HummockManagerRef;
49use crate::manager::MetadataManager;
50use crate::rpc::ElectionClientRef;
51
52#[derive(Clone)]
53pub struct MetaMetrics {
54    // ********************************** Meta ************************************
55    /// The number of workers in the cluster.
56    pub worker_num: IntGaugeVec,
57    /// The roles of all meta nodes in the cluster.
58    pub meta_type: IntGaugeVec,
59
60    // ********************************** gRPC ************************************
61    /// gRPC latency of meta services
62    pub grpc_latency: HistogramVec,
63
64    // ********************************** Barrier ************************************
65    /// The duration from barrier injection to commit
66    /// It is the sum of inflight-latency, sync-latency and wait-commit-latency
67    pub barrier_latency: LabelGuardedHistogramVec<1>,
68    /// The duration from barrier complete to commit
69    pub barrier_wait_commit_latency: Histogram,
70    /// Latency between each barrier send
71    pub barrier_send_latency: LabelGuardedHistogramVec<1>,
72    /// The number of all barriers. It is the sum of barriers that are in-flight or completed but
73    /// waiting for other barriers
74    pub all_barrier_nums: LabelGuardedIntGaugeVec<1>,
75    /// The number of in-flight barriers
76    pub in_flight_barrier_nums: LabelGuardedIntGaugeVec<1>,
77    /// The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.
78    pub last_committed_barrier_time: IntGaugeVec,
79
80    // ********************************** Snapshot Backfill ***************************
81    /// The barrier latency in second of `table_id` and snapshto backfill `barrier_type`
82    pub snapshot_backfill_barrier_latency: LabelGuardedHistogramVec<2>, // (table_id, barrier_type)
83    /// The latency of commit epoch of `table_id`
84    pub snapshot_backfill_wait_commit_latency: LabelGuardedHistogramVec<1>, // (table_id, )
85    /// The lags between the upstream epoch and the downstream epoch.
86    pub snapshot_backfill_lag: LabelGuardedIntGaugeVec<1>, // (table_id, )
87    /// The number of inflight barriers of `table_id`
88    pub snapshot_backfill_inflight_barrier_num: LabelGuardedIntGaugeVec<1>, // (table_id, _)
89
90    // ********************************** Recovery ************************************
91    pub recovery_failure_cnt: IntCounterVec,
92    pub recovery_latency: HistogramVec,
93
94    // ********************************** Hummock ************************************
95    /// Max committed epoch
96    pub max_committed_epoch: IntGauge,
97    /// Min committed epoch
98    pub min_committed_epoch: IntGauge,
99    /// The number of SSTs in each level
100    pub level_sst_num: IntGaugeVec,
101    /// The number of SSTs to be merged to next level in each level
102    pub level_compact_cnt: IntGaugeVec,
103    /// The number of compact tasks
104    pub compact_frequency: IntCounterVec,
105    /// Size of each level
106    pub level_file_size: IntGaugeVec,
107    /// Hummock version size
108    pub version_size: IntGauge,
109    /// The version Id of current version.
110    pub current_version_id: IntGauge,
111    /// The version id of checkpoint version.
112    pub checkpoint_version_id: IntGauge,
113    /// The smallest version id that is being pinned by worker nodes.
114    pub min_pinned_version_id: IntGauge,
115    /// The smallest version id that is being guarded by meta node safe points.
116    pub min_safepoint_version_id: IntGauge,
117    /// Compaction groups that is in write stop state.
118    pub write_stop_compaction_groups: IntGaugeVec,
119    /// The number of attempts to trigger full GC.
120    pub full_gc_trigger_count: IntGauge,
121    /// The number of candidate object to delete after scanning object store.
122    pub full_gc_candidate_object_count: Histogram,
123    /// The number of object to delete after filtering by meta node.
124    pub full_gc_selected_object_count: Histogram,
125    /// Hummock version stats
126    pub version_stats: IntGaugeVec,
127    /// Hummock version stats
128    pub materialized_view_stats: IntGaugeVec,
129    /// Total number of objects that is no longer referenced by versions.
130    pub stale_object_count: IntGauge,
131    /// Total size of objects that is no longer referenced by versions.
132    pub stale_object_size: IntGauge,
133    /// Total number of objects that is still referenced by non-current versions.
134    pub old_version_object_count: IntGauge,
135    /// Total size of objects that is still referenced by non-current versions.
136    pub old_version_object_size: IntGauge,
137    /// Total number of objects that is referenced by time travel.
138    pub time_travel_object_count: IntGauge,
139    /// Total number of objects that is referenced by current version.
140    pub current_version_object_count: IntGauge,
141    /// Total size of objects that is referenced by current version.
142    pub current_version_object_size: IntGauge,
143    /// Total number of objects that includes dangling objects.
144    pub total_object_count: IntGauge,
145    /// Total size of objects that includes dangling objects.
146    pub total_object_size: IntGauge,
147    /// Number of objects per table change log.
148    pub table_change_log_object_count: IntGaugeVec,
149    /// Size of objects per table change log.
150    pub table_change_log_object_size: IntGaugeVec,
151    /// The number of hummock version delta log.
152    pub delta_log_count: IntGauge,
153    /// latency of version checkpoint
154    pub version_checkpoint_latency: Histogram,
155    /// Latency for hummock manager to acquire lock
156    pub hummock_manager_lock_time: HistogramVec,
157    /// Latency for hummock manager to really process a request after acquire the lock
158    pub hummock_manager_real_process_time: HistogramVec,
159    /// The number of compactions from one level to another level that have been skipped
160    pub compact_skip_frequency: IntCounterVec,
161    /// Bytes of lsm tree needed to reach balance
162    pub compact_pending_bytes: IntGaugeVec,
163    /// Per level compression ratio
164    pub compact_level_compression_ratio: GenericGaugeVec<AtomicF64>,
165    /// Per level number of running compaction task
166    pub level_compact_task_cnt: IntGaugeVec,
167    pub time_after_last_observation: Arc<AtomicU64>,
168    pub l0_compact_level_count: HistogramVec,
169    pub compact_task_size: HistogramVec,
170    pub compact_task_file_count: HistogramVec,
171    pub compact_task_batch_count: HistogramVec,
172    pub split_compaction_group_count: IntCounterVec,
173    pub state_table_count: IntGaugeVec,
174    pub branched_sst_count: IntGaugeVec,
175    pub compact_task_trivial_move_sst_count: HistogramVec,
176
177    pub compaction_event_consumed_latency: Histogram,
178    pub compaction_event_loop_iteration_latency: Histogram,
179
180    // ********************************** Object Store ************************************
181    // Object store related metrics (for backup/restore and version checkpoint)
182    pub object_store_metric: Arc<ObjectStoreMetrics>,
183
184    // ********************************** Source ************************************
185    /// supervisor for which source is still up.
186    pub source_is_up: LabelGuardedIntGaugeVec<2>,
187    pub source_enumerator_metrics: Arc<SourceEnumeratorMetrics>,
188
189    // ********************************** Fragment ************************************
190    /// A dummpy gauge metrics with its label to be the mapping from actor id to fragment id
191    pub actor_info: IntGaugeVec,
192    /// A dummpy gauge metrics with its label to be the mapping from table id to actor id
193    pub table_info: IntGaugeVec,
194    /// A dummy gauge metrics with its label to be the mapping from actor id to sink id
195    pub sink_info: IntGaugeVec,
196
197    /// Write throughput of commit epoch for each stable
198    pub table_write_throughput: IntCounterVec,
199
200    /// The number of compaction groups that have been triggered to move
201    pub merge_compaction_group_count: IntCounterVec,
202
203    // ********************************** Auto Schema Change ************************************
204    pub auto_schema_change_failure_cnt: LabelGuardedIntCounterVec<2>,
205    pub auto_schema_change_success_cnt: LabelGuardedIntCounterVec<2>,
206    pub auto_schema_change_latency: LabelGuardedHistogramVec<2>,
207
208    pub time_travel_version_replay_latency: Histogram,
209
210    pub compaction_group_count: IntGauge,
211    pub compaction_group_size: IntGaugeVec,
212    pub compaction_group_file_count: IntGaugeVec,
213    pub compaction_group_throughput: IntGaugeVec,
214}
215
216pub static GLOBAL_META_METRICS: LazyLock<MetaMetrics> =
217    LazyLock::new(|| MetaMetrics::new(&GLOBAL_METRICS_REGISTRY));
218
219impl MetaMetrics {
220    fn new(registry: &Registry) -> Self {
221        let opts = histogram_opts!(
222            "meta_grpc_duration_seconds",
223            "gRPC latency of meta services",
224            exponential_buckets(0.0001, 2.0, 20).unwrap() // max 52s
225        );
226        let grpc_latency =
227            register_histogram_vec_with_registry!(opts, &["path"], registry).unwrap();
228
229        let opts = histogram_opts!(
230            "meta_barrier_duration_seconds",
231            "barrier latency",
232            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
233        );
234        let barrier_latency =
235            register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
236                .unwrap();
237
238        let opts = histogram_opts!(
239            "meta_barrier_wait_commit_duration_seconds",
240            "barrier_wait_commit_latency",
241            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
242        );
243        let barrier_wait_commit_latency =
244            register_histogram_with_registry!(opts, registry).unwrap();
245
246        let opts = histogram_opts!(
247            "meta_barrier_send_duration_seconds",
248            "barrier send latency",
249            exponential_buckets(0.1, 1.5, 19).unwrap() // max 148s
250        );
251        let barrier_send_latency =
252            register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
253                .unwrap();
254
255        let all_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
256            "all_barrier_nums",
257            "num of of all_barrier",
258            &["database_id"],
259            registry
260        )
261        .unwrap();
262        let in_flight_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
263            "in_flight_barrier_nums",
264            "num of of in_flight_barrier",
265            &["database_id"],
266            registry
267        )
268        .unwrap();
269        let last_committed_barrier_time = register_int_gauge_vec_with_registry!(
270            "last_committed_barrier_time",
271            "The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.",
272            &["database_id"],
273            registry
274        )
275        .unwrap();
276
277        // snapshot backfill metrics
278        let opts = histogram_opts!(
279            "meta_snapshot_backfill_barrier_duration_seconds",
280            "snapshot backfill barrier latency",
281            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
282        );
283        let snapshot_backfill_barrier_latency = register_guarded_histogram_vec_with_registry!(
284            opts,
285            &["table_id", "barrier_type"],
286            registry
287        )
288        .unwrap();
289        let opts = histogram_opts!(
290            "meta_snapshot_backfill_barrier_wait_commit_duration_seconds",
291            "snapshot backfill barrier_wait_commit_latency",
292            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
293        );
294        let snapshot_backfill_wait_commit_latency =
295            register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
296
297        let snapshot_backfill_lag = register_guarded_int_gauge_vec_with_registry!(
298            "meta_snapshot_backfill_upstream_lag",
299            "snapshot backfill upstream_lag",
300            &["table_id"],
301            registry
302        )
303        .unwrap();
304        let snapshot_backfill_inflight_barrier_num = register_guarded_int_gauge_vec_with_registry!(
305            "meta_snapshot_backfill_inflight_barrier_num",
306            "snapshot backfill inflight_barrier_num",
307            &["table_id"],
308            registry
309        )
310        .unwrap();
311
312        let max_committed_epoch = register_int_gauge_with_registry!(
313            "storage_max_committed_epoch",
314            "max committed epoch",
315            registry
316        )
317        .unwrap();
318
319        let min_committed_epoch = register_int_gauge_with_registry!(
320            "storage_min_committed_epoch",
321            "min committed epoch",
322            registry
323        )
324        .unwrap();
325
326        let level_sst_num = register_int_gauge_vec_with_registry!(
327            "storage_level_sst_num",
328            "num of SSTs in each level",
329            &["level_index"],
330            registry
331        )
332        .unwrap();
333
334        let level_compact_cnt = register_int_gauge_vec_with_registry!(
335            "storage_level_compact_cnt",
336            "num of SSTs to be merged to next level in each level",
337            &["level_index"],
338            registry
339        )
340        .unwrap();
341
342        let compact_frequency = register_int_counter_vec_with_registry!(
343            "storage_level_compact_frequency",
344            "The number of compactions from one level to another level that have completed or failed.",
345            &["compactor", "group", "task_type", "result"],
346            registry
347        )
348        .unwrap();
349        let compact_skip_frequency = register_int_counter_vec_with_registry!(
350            "storage_skip_compact_frequency",
351            "The number of compactions from one level to another level that have been skipped.",
352            &["level", "type"],
353            registry
354        )
355        .unwrap();
356
357        let version_size =
358            register_int_gauge_with_registry!("storage_version_size", "version size", registry)
359                .unwrap();
360
361        let current_version_id = register_int_gauge_with_registry!(
362            "storage_current_version_id",
363            "current version id",
364            registry
365        )
366        .unwrap();
367
368        let checkpoint_version_id = register_int_gauge_with_registry!(
369            "storage_checkpoint_version_id",
370            "checkpoint version id",
371            registry
372        )
373        .unwrap();
374
375        let min_pinned_version_id = register_int_gauge_with_registry!(
376            "storage_min_pinned_version_id",
377            "min pinned version id",
378            registry
379        )
380        .unwrap();
381
382        let write_stop_compaction_groups = register_int_gauge_vec_with_registry!(
383            "storage_write_stop_compaction_groups",
384            "compaction groups of write stop state",
385            &["compaction_group_id"],
386            registry
387        )
388        .unwrap();
389
390        let full_gc_trigger_count = register_int_gauge_with_registry!(
391            "storage_full_gc_trigger_count",
392            "the number of attempts to trigger full GC",
393            registry
394        )
395        .unwrap();
396
397        let opts = histogram_opts!(
398            "storage_full_gc_candidate_object_count",
399            "the number of candidate object to delete after scanning object store",
400            exponential_buckets(1.0, 10.0, 6).unwrap()
401        );
402        let full_gc_candidate_object_count =
403            register_histogram_with_registry!(opts, registry).unwrap();
404
405        let opts = histogram_opts!(
406            "storage_full_gc_selected_object_count",
407            "the number of object to delete after filtering by meta node",
408            exponential_buckets(1.0, 10.0, 6).unwrap()
409        );
410        let full_gc_selected_object_count =
411            register_histogram_with_registry!(opts, registry).unwrap();
412
413        let min_safepoint_version_id = register_int_gauge_with_registry!(
414            "storage_min_safepoint_version_id",
415            "min safepoint version id",
416            registry
417        )
418        .unwrap();
419
420        let level_file_size = register_int_gauge_vec_with_registry!(
421            "storage_level_total_file_size",
422            "KBs total file bytes in each level",
423            &["level_index"],
424            registry
425        )
426        .unwrap();
427
428        let version_stats = register_int_gauge_vec_with_registry!(
429            "storage_version_stats",
430            "per table stats in current hummock version",
431            &["table_id", "metric"],
432            registry
433        )
434        .unwrap();
435
436        let materialized_view_stats = register_int_gauge_vec_with_registry!(
437            "storage_materialized_view_stats",
438            "per materialized view stats in current hummock version",
439            &["table_id", "metric"],
440            registry
441        )
442        .unwrap();
443
444        let stale_object_count = register_int_gauge_with_registry!(
445            "storage_stale_object_count",
446            "total number of objects that is no longer referenced by versions.",
447            registry
448        )
449        .unwrap();
450
451        let stale_object_size = register_int_gauge_with_registry!(
452            "storage_stale_object_size",
453            "total size of objects that is no longer referenced by versions.",
454            registry
455        )
456        .unwrap();
457
458        let old_version_object_count = register_int_gauge_with_registry!(
459            "storage_old_version_object_count",
460            "total number of objects that is still referenced by non-current versions",
461            registry
462        )
463        .unwrap();
464
465        let old_version_object_size = register_int_gauge_with_registry!(
466            "storage_old_version_object_size",
467            "total size of objects that is still referenced by non-current versions",
468            registry
469        )
470        .unwrap();
471
472        let current_version_object_count = register_int_gauge_with_registry!(
473            "storage_current_version_object_count",
474            "total number of objects that is referenced by current version",
475            registry
476        )
477        .unwrap();
478
479        let current_version_object_size = register_int_gauge_with_registry!(
480            "storage_current_version_object_size",
481            "total size of objects that is referenced by current version",
482            registry
483        )
484        .unwrap();
485
486        let total_object_count = register_int_gauge_with_registry!(
487            "storage_total_object_count",
488            "Total number of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
489            registry
490        ).unwrap();
491
492        let total_object_size = register_int_gauge_with_registry!(
493            "storage_total_object_size",
494            "Total size of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
495            registry
496        ).unwrap();
497
498        let table_change_log_object_count = register_int_gauge_vec_with_registry!(
499            "storage_table_change_log_object_count",
500            "per table change log object count",
501            &["table_id"],
502            registry
503        )
504        .unwrap();
505
506        let table_change_log_object_size = register_int_gauge_vec_with_registry!(
507            "storage_table_change_log_object_size",
508            "per table change log object size",
509            &["table_id"],
510            registry
511        )
512        .unwrap();
513
514        let time_travel_object_count = register_int_gauge_with_registry!(
515            "storage_time_travel_object_count",
516            "total number of objects that is referenced by time travel.",
517            registry
518        )
519        .unwrap();
520
521        let delta_log_count = register_int_gauge_with_registry!(
522            "storage_delta_log_count",
523            "total number of hummock version delta log",
524            registry
525        )
526        .unwrap();
527
528        let opts = histogram_opts!(
529            "storage_version_checkpoint_latency",
530            "hummock version checkpoint latency",
531            exponential_buckets(0.1, 1.5, 20).unwrap()
532        );
533        let version_checkpoint_latency = register_histogram_with_registry!(opts, registry).unwrap();
534
535        let hummock_manager_lock_time = register_histogram_vec_with_registry!(
536            "hummock_manager_lock_time",
537            "latency for hummock manager to acquire the rwlock",
538            &["lock_name", "lock_type"],
539            registry
540        )
541        .unwrap();
542
543        let hummock_manager_real_process_time = register_histogram_vec_with_registry!(
544            "meta_hummock_manager_real_process_time",
545            "latency for hummock manager to really process the request",
546            &["method"],
547            registry
548        )
549        .unwrap();
550
551        let worker_num = register_int_gauge_vec_with_registry!(
552            "worker_num",
553            "number of nodes in the cluster",
554            &["worker_type"],
555            registry,
556        )
557        .unwrap();
558
559        let meta_type = register_int_gauge_vec_with_registry!(
560            "meta_num",
561            "role of meta nodes in the cluster",
562            &["worker_addr", "role"],
563            registry,
564        )
565        .unwrap();
566
567        let compact_pending_bytes = register_int_gauge_vec_with_registry!(
568            "storage_compact_pending_bytes",
569            "bytes of lsm tree needed to reach balance",
570            &["group"],
571            registry
572        )
573        .unwrap();
574
575        let compact_level_compression_ratio = register_gauge_vec_with_registry!(
576            "storage_compact_level_compression_ratio",
577            "compression ratio of each level of the lsm tree",
578            &["group", "level", "algorithm"],
579            registry
580        )
581        .unwrap();
582
583        let level_compact_task_cnt = register_int_gauge_vec_with_registry!(
584            "storage_level_compact_task_cnt",
585            "num of compact_task organized by group and level",
586            &["task"],
587            registry
588        )
589        .unwrap();
590        let object_store_metric = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
591
592        let recovery_failure_cnt = register_int_counter_vec_with_registry!(
593            "recovery_failure_cnt",
594            "Number of failed recovery attempts",
595            &["recovery_type"],
596            registry
597        )
598        .unwrap();
599        let opts = histogram_opts!(
600            "recovery_latency",
601            "Latency of the recovery process",
602            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
603        );
604        let recovery_latency =
605            register_histogram_vec_with_registry!(opts, &["recovery_type"], registry).unwrap();
606
607        let auto_schema_change_failure_cnt = register_guarded_int_counter_vec_with_registry!(
608            "auto_schema_change_failure_cnt",
609            "Number of failed auto schema change",
610            &["table_id", "table_name"],
611            registry
612        )
613        .unwrap();
614
615        let auto_schema_change_success_cnt = register_guarded_int_counter_vec_with_registry!(
616            "auto_schema_change_success_cnt",
617            "Number of success auto schema change",
618            &["table_id", "table_name"],
619            registry
620        )
621        .unwrap();
622
623        let opts = histogram_opts!(
624            "auto_schema_change_latency",
625            "Latency of the auto schema change process",
626            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
627        );
628        let auto_schema_change_latency = register_guarded_histogram_vec_with_registry!(
629            opts,
630            &["table_id", "table_name"],
631            registry
632        )
633        .unwrap();
634
635        let source_is_up = register_guarded_int_gauge_vec_with_registry!(
636            "source_status_is_up",
637            "source is up or not",
638            &["source_id", "source_name"],
639            registry
640        )
641        .unwrap();
642        let source_enumerator_metrics = Arc::new(SourceEnumeratorMetrics::default());
643
644        let actor_info = register_int_gauge_vec_with_registry!(
645            "actor_info",
646            "Mapping from actor id to (fragment id, compute node)",
647            &["actor_id", "fragment_id", "compute_node"],
648            registry
649        )
650        .unwrap();
651
652        let table_info = register_int_gauge_vec_with_registry!(
653            "table_info",
654            "Mapping from table id to (actor id, table name)",
655            &[
656                "materialized_view_id",
657                "table_id",
658                "fragment_id",
659                "table_name",
660                "table_type",
661                "compaction_group_id"
662            ],
663            registry
664        )
665        .unwrap();
666
667        let sink_info = register_int_gauge_vec_with_registry!(
668            "sink_info",
669            "Mapping from actor id to (actor id, sink name)",
670            &["actor_id", "sink_id", "sink_name",],
671            registry
672        )
673        .unwrap();
674
675        let l0_compact_level_count = register_histogram_vec_with_registry!(
676            "storage_l0_compact_level_count",
677            "level_count of l0 compact task",
678            &["group", "type"],
679            registry
680        )
681        .unwrap();
682
683        let opts = histogram_opts!(
684            "storage_compact_task_size",
685            "Total size of compact that have been issued to state store",
686            exponential_buckets(1048576.0, 2.0, 16).unwrap()
687        );
688
689        let compact_task_size =
690            register_histogram_vec_with_registry!(opts, &["group", "type"], registry).unwrap();
691
692        let compact_task_file_count = register_histogram_vec_with_registry!(
693            "storage_compact_task_file_count",
694            "file count of compact task",
695            &["group", "type"],
696            registry
697        )
698        .unwrap();
699        let opts = histogram_opts!(
700            "storage_compact_task_batch_count",
701            "count of compact task batch",
702            exponential_buckets(1.0, 2.0, 8).unwrap()
703        );
704        let compact_task_batch_count =
705            register_histogram_vec_with_registry!(opts, &["type"], registry).unwrap();
706
707        let table_write_throughput = register_int_counter_vec_with_registry!(
708            "storage_commit_write_throughput",
709            "The number of compactions from one level to another level that have been skipped.",
710            &["table_id"],
711            registry
712        )
713        .unwrap();
714
715        let split_compaction_group_count = register_int_counter_vec_with_registry!(
716            "storage_split_compaction_group_count",
717            "Count of trigger split compaction group",
718            &["group"],
719            registry
720        )
721        .unwrap();
722
723        let state_table_count = register_int_gauge_vec_with_registry!(
724            "storage_state_table_count",
725            "Count of stable table per compaction group",
726            &["group"],
727            registry
728        )
729        .unwrap();
730
731        let branched_sst_count = register_int_gauge_vec_with_registry!(
732            "storage_branched_sst_count",
733            "Count of branched sst per compaction group",
734            &["group"],
735            registry
736        )
737        .unwrap();
738
739        let opts = histogram_opts!(
740            "storage_compaction_event_consumed_latency",
741            "The latency(ms) of each event being consumed",
742            exponential_buckets(1.0, 1.5, 30).unwrap() // max 191s
743        );
744        let compaction_event_consumed_latency =
745            register_histogram_with_registry!(opts, registry).unwrap();
746
747        let opts = histogram_opts!(
748            "storage_compaction_event_loop_iteration_latency",
749            "The latency(ms) of each iteration of the compaction event loop",
750            exponential_buckets(1.0, 1.5, 30).unwrap() // max 191s
751        );
752        let compaction_event_loop_iteration_latency =
753            register_histogram_with_registry!(opts, registry).unwrap();
754
755        let merge_compaction_group_count = register_int_counter_vec_with_registry!(
756            "storage_merge_compaction_group_count",
757            "Count of trigger merge compaction group",
758            &["group"],
759            registry
760        )
761        .unwrap();
762
763        let opts = histogram_opts!(
764            "storage_time_travel_version_replay_latency",
765            "The latency(ms) of replaying a hummock version for time travel",
766            exponential_buckets(0.01, 10.0, 6).unwrap()
767        );
768        let time_travel_version_replay_latency =
769            register_histogram_with_registry!(opts, registry).unwrap();
770
771        let compaction_group_count = register_int_gauge_with_registry!(
772            "storage_compaction_group_count",
773            "The number of compaction groups",
774            registry,
775        )
776        .unwrap();
777
778        let compaction_group_size = register_int_gauge_vec_with_registry!(
779            "storage_compaction_group_size",
780            "The size of compaction group",
781            &["group"],
782            registry
783        )
784        .unwrap();
785
786        let compaction_group_file_count = register_int_gauge_vec_with_registry!(
787            "storage_compaction_group_file_count",
788            "The file count of compaction group",
789            &["group"],
790            registry
791        )
792        .unwrap();
793
794        let compaction_group_throughput = register_int_gauge_vec_with_registry!(
795            "storage_compaction_group_throughput",
796            "The throughput of compaction group",
797            &["group"],
798            registry
799        )
800        .unwrap();
801
802        let opts = histogram_opts!(
803            "storage_compact_task_trivial_move_sst_count",
804            "sst count of compact trivial-move task",
805            exponential_buckets(1.0, 2.0, 8).unwrap()
806        );
807        let compact_task_trivial_move_sst_count =
808            register_histogram_vec_with_registry!(opts, &["group"], registry).unwrap();
809
810        Self {
811            grpc_latency,
812            barrier_latency,
813            barrier_wait_commit_latency,
814            barrier_send_latency,
815            all_barrier_nums,
816            in_flight_barrier_nums,
817            last_committed_barrier_time,
818            snapshot_backfill_barrier_latency,
819            snapshot_backfill_wait_commit_latency,
820            snapshot_backfill_lag,
821            snapshot_backfill_inflight_barrier_num,
822            recovery_failure_cnt,
823            recovery_latency,
824
825            max_committed_epoch,
826            min_committed_epoch,
827            level_sst_num,
828            level_compact_cnt,
829            compact_frequency,
830            compact_skip_frequency,
831            level_file_size,
832            version_size,
833            version_stats,
834            materialized_view_stats,
835            stale_object_count,
836            stale_object_size,
837            old_version_object_count,
838            old_version_object_size,
839            time_travel_object_count,
840            current_version_object_count,
841            current_version_object_size,
842            total_object_count,
843            total_object_size,
844            table_change_log_object_count,
845            table_change_log_object_size,
846            delta_log_count,
847            version_checkpoint_latency,
848            current_version_id,
849            checkpoint_version_id,
850            min_pinned_version_id,
851            min_safepoint_version_id,
852            write_stop_compaction_groups,
853            full_gc_trigger_count,
854            full_gc_candidate_object_count,
855            full_gc_selected_object_count,
856            hummock_manager_lock_time,
857            hummock_manager_real_process_time,
858            time_after_last_observation: Arc::new(AtomicU64::new(0)),
859            worker_num,
860            meta_type,
861            compact_pending_bytes,
862            compact_level_compression_ratio,
863            level_compact_task_cnt,
864            object_store_metric,
865            source_is_up,
866            source_enumerator_metrics,
867            actor_info,
868            table_info,
869            sink_info,
870            l0_compact_level_count,
871            compact_task_size,
872            compact_task_file_count,
873            compact_task_batch_count,
874            compact_task_trivial_move_sst_count,
875            table_write_throughput,
876            split_compaction_group_count,
877            state_table_count,
878            branched_sst_count,
879            compaction_event_consumed_latency,
880            compaction_event_loop_iteration_latency,
881            auto_schema_change_failure_cnt,
882            auto_schema_change_success_cnt,
883            auto_schema_change_latency,
884            merge_compaction_group_count,
885            time_travel_version_replay_latency,
886            compaction_group_count,
887            compaction_group_size,
888            compaction_group_file_count,
889            compaction_group_throughput,
890        }
891    }
892
893    #[cfg(test)]
894    pub fn for_test(registry: &Registry) -> Self {
895        Self::new(registry)
896    }
897}
898impl Default for MetaMetrics {
899    fn default() -> Self {
900        GLOBAL_META_METRICS.clone()
901    }
902}
903
904pub fn start_worker_info_monitor(
905    metadata_manager: MetadataManager,
906    election_client: ElectionClientRef,
907    interval: Duration,
908    meta_metrics: Arc<MetaMetrics>,
909) -> (JoinHandle<()>, Sender<()>) {
910    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
911    let join_handle = tokio::spawn(async move {
912        let mut monitor_interval = tokio::time::interval(interval);
913        monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
914        loop {
915            tokio::select! {
916                // Wait for interval
917                _ = monitor_interval.tick() => {},
918                // Shutdown monitor
919                _ = &mut shutdown_rx => {
920                    tracing::info!("Worker number monitor is stopped");
921                    return;
922                }
923            }
924
925            let node_map = match metadata_manager.count_worker_node().await {
926                Ok(node_map) => node_map,
927                Err(err) => {
928                    tracing::warn!(error = %err.as_report(), "fail to count worker node");
929                    continue;
930                }
931            };
932
933            // Reset metrics to clean the stale labels e.g. invalid lease ids
934            meta_metrics.worker_num.reset();
935            meta_metrics.meta_type.reset();
936
937            for (worker_type, worker_num) in node_map {
938                meta_metrics
939                    .worker_num
940                    .with_label_values(&[(worker_type.as_str_name())])
941                    .set(worker_num as i64);
942            }
943            if let Ok(meta_members) = election_client.get_members().await {
944                meta_metrics
945                    .worker_num
946                    .with_label_values(&[WorkerType::Meta.as_str_name()])
947                    .set(meta_members.len() as i64);
948                meta_members.into_iter().for_each(|m| {
949                    let role = if m.is_leader { "leader" } else { "follower" };
950                    meta_metrics
951                        .meta_type
952                        .with_label_values(&[&m.id, role])
953                        .set(1);
954                });
955            }
956        }
957    });
958
959    (join_handle, shutdown_tx)
960}
961
962pub async fn refresh_fragment_info_metrics(
963    catalog_controller: &CatalogControllerRef,
964    cluster_controller: &ClusterControllerRef,
965    hummock_manager: &HummockManagerRef,
966    meta_metrics: Arc<MetaMetrics>,
967) {
968    let worker_nodes = match cluster_controller
969        .list_workers(Some(WorkerType::ComputeNode.into()), None)
970        .await
971    {
972        Ok(worker_nodes) => worker_nodes,
973        Err(err) => {
974            tracing::warn!(error=%err.as_report(), "fail to list worker node");
975            return;
976        }
977    };
978    let actor_locations = match catalog_controller.list_actor_locations().await {
979        Ok(actor_locations) => actor_locations,
980        Err(err) => {
981            tracing::warn!(error=%err.as_report(), "fail to get actor locations");
982            return;
983        }
984    };
985    let sink_actor_mapping = match catalog_controller.list_sink_actor_mapping().await {
986        Ok(sink_actor_mapping) => sink_actor_mapping,
987        Err(err) => {
988            tracing::warn!(error=%err.as_report(), "fail to get sink actor mapping");
989            return;
990        }
991    };
992    let fragment_state_tables = match catalog_controller.list_fragment_state_tables().await {
993        Ok(fragment_state_tables) => fragment_state_tables,
994        Err(err) => {
995            tracing::warn!(error=%err.as_report(), "fail to get fragment state tables");
996            return;
997        }
998    };
999    let table_name_and_type_mapping = match catalog_controller.get_table_name_type_mapping().await {
1000        Ok(mapping) => mapping,
1001        Err(err) => {
1002            tracing::warn!(error=%err.as_report(), "fail to get table name mapping");
1003            return;
1004        }
1005    };
1006
1007    let worker_addr_mapping: HashMap<WorkerId, String> = worker_nodes
1008        .into_iter()
1009        .map(|worker_node| {
1010            let addr = match worker_node.host {
1011                Some(host) => format!("{}:{}", host.host, host.port),
1012                None => "".to_owned(),
1013            };
1014            (worker_node.id as WorkerId, addr)
1015        })
1016        .collect();
1017    let table_compaction_group_id_mapping = hummock_manager
1018        .get_table_compaction_group_id_mapping()
1019        .await;
1020
1021    // Start fresh with a reset to clear all outdated labels. This is safe since we always
1022    // report full info on each interval.
1023    meta_metrics.actor_info.reset();
1024    meta_metrics.table_info.reset();
1025    meta_metrics.sink_info.reset();
1026    for actor_location in actor_locations {
1027        let actor_id_str = actor_location.actor_id.to_string();
1028        let fragment_id_str = actor_location.fragment_id.to_string();
1029        // Report a dummy gauge metrics with (fragment id, actor id, node
1030        // address) as its label
1031        if let Some(address) = worker_addr_mapping.get(&actor_location.worker_id) {
1032            meta_metrics
1033                .actor_info
1034                .with_label_values(&[&actor_id_str, &fragment_id_str, address])
1035                .set(1);
1036        }
1037    }
1038    for (sink_id, (sink_name, actor_ids)) in sink_actor_mapping {
1039        let sink_id_str = sink_id.to_string();
1040        for actor_id in actor_ids {
1041            let actor_id_str = actor_id.to_string();
1042            meta_metrics
1043                .sink_info
1044                .with_label_values(&[&actor_id_str, &sink_id_str, &sink_name])
1045                .set(1);
1046        }
1047    }
1048    for PartialFragmentStateTables {
1049        fragment_id,
1050        job_id,
1051        state_table_ids,
1052    } in fragment_state_tables
1053    {
1054        let fragment_id_str = fragment_id.to_string();
1055        let job_id_str = job_id.to_string();
1056        for table_id in state_table_ids.into_inner() {
1057            let table_id_str = table_id.to_string();
1058            let (table_name, table_type) = table_name_and_type_mapping
1059                .get(&table_id)
1060                .cloned()
1061                .unwrap_or_else(|| ("unknown".to_owned(), "unknown".to_owned()));
1062            let compaction_group_id = table_compaction_group_id_mapping
1063                .get(&(table_id as u32))
1064                .map(|cg_id| cg_id.to_string())
1065                .unwrap_or_else(|| "unknown".to_owned());
1066            meta_metrics
1067                .table_info
1068                .with_label_values(&[
1069                    &job_id_str,
1070                    &table_id_str,
1071                    &fragment_id_str,
1072                    &table_name,
1073                    &table_type,
1074                    &compaction_group_id,
1075                ])
1076                .set(1);
1077        }
1078    }
1079}
1080
1081pub fn start_fragment_info_monitor(
1082    metadata_manager: MetadataManager,
1083    hummock_manager: HummockManagerRef,
1084    meta_metrics: Arc<MetaMetrics>,
1085) -> (JoinHandle<()>, Sender<()>) {
1086    const COLLECT_INTERVAL_SECONDS: u64 = 60;
1087
1088    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1089    let join_handle = tokio::spawn(async move {
1090        let mut monitor_interval =
1091            tokio::time::interval(Duration::from_secs(COLLECT_INTERVAL_SECONDS));
1092        monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1093        loop {
1094            tokio::select! {
1095                // Wait for interval
1096                _ = monitor_interval.tick() => {},
1097                // Shutdown monitor
1098                _ = &mut shutdown_rx => {
1099                    tracing::info!("Fragment info monitor is stopped");
1100                    return;
1101                }
1102            }
1103
1104            refresh_fragment_info_metrics(
1105                &metadata_manager.catalog_controller,
1106                &metadata_manager.cluster_controller,
1107                &hummock_manager,
1108                meta_metrics.clone(),
1109            )
1110            .await;
1111        }
1112    });
1113
1114    (join_handle, shutdown_tx)
1115}