risingwave_meta/rpc/
metrics.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::atomic::AtomicU64;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use prometheus::core::{AtomicF64, GenericGaugeVec};
21use prometheus::{
22    GaugeVec, Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
23    exponential_buckets, histogram_opts, register_gauge_vec_with_registry,
24    register_histogram_vec_with_registry, register_histogram_with_registry,
25    register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
26    register_int_gauge_with_registry,
27};
28use risingwave_common::metrics::{
29    LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
30};
31use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
32use risingwave_common::system_param::reader::SystemParamsRead;
33use risingwave_common::{
34    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
35    register_guarded_int_gauge_vec_with_registry,
36};
37use risingwave_connector::source::monitor::EnumeratorMetrics as SourceEnumeratorMetrics;
38use risingwave_meta_model::WorkerId;
39use risingwave_object_store::object::object_metrics::{
40    GLOBAL_OBJECT_STORE_METRICS, ObjectStoreMetrics,
41};
42use risingwave_pb::common::WorkerType;
43use thiserror_ext::AsReport;
44use tokio::sync::oneshot::Sender;
45use tokio::task::JoinHandle;
46
47use crate::controller::catalog::CatalogControllerRef;
48use crate::controller::cluster::ClusterControllerRef;
49use crate::controller::system_param::SystemParamsControllerRef;
50use crate::controller::utils::PartialFragmentStateTables;
51use crate::hummock::HummockManagerRef;
52use crate::manager::MetadataManager;
53use crate::rpc::ElectionClientRef;
54
55#[derive(Clone)]
56pub struct MetaMetrics {
57    // ********************************** Meta ************************************
58    /// The number of workers in the cluster.
59    pub worker_num: IntGaugeVec,
60    /// The roles of all meta nodes in the cluster.
61    pub meta_type: IntGaugeVec,
62
63    // ********************************** gRPC ************************************
64    /// gRPC latency of meta services
65    pub grpc_latency: HistogramVec,
66
67    // ********************************** Barrier ************************************
68    /// The duration from barrier injection to commit
69    /// It is the sum of inflight-latency, sync-latency and wait-commit-latency
70    pub barrier_latency: LabelGuardedHistogramVec,
71    /// The duration from barrier complete to commit
72    pub barrier_wait_commit_latency: Histogram,
73    /// Latency between each barrier send
74    pub barrier_send_latency: LabelGuardedHistogramVec,
75    /// The number of all barriers. It is the sum of barriers that are in-flight or completed but
76    /// waiting for other barriers
77    pub all_barrier_nums: LabelGuardedIntGaugeVec,
78    /// The number of in-flight barriers
79    pub in_flight_barrier_nums: LabelGuardedIntGaugeVec,
80    /// The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.
81    pub last_committed_barrier_time: IntGaugeVec,
82    /// The barrier interval of each database
83    pub barrier_interval_by_database: GaugeVec,
84
85    // ********************************** Snapshot Backfill ***************************
86    /// The barrier latency in second of `table_id` and snapshto backfill `barrier_type`
87    pub snapshot_backfill_barrier_latency: LabelGuardedHistogramVec, // (table_id, barrier_type)
88    /// The latency of commit epoch of `table_id`
89    pub snapshot_backfill_wait_commit_latency: LabelGuardedHistogramVec, // (table_id, )
90    /// The lags between the upstream epoch and the downstream epoch.
91    pub snapshot_backfill_lag: LabelGuardedIntGaugeVec, // (table_id, )
92    /// The number of inflight barriers of `table_id`
93    pub snapshot_backfill_inflight_barrier_num: LabelGuardedIntGaugeVec, // (table_id, _)
94
95    // ********************************** Recovery ************************************
96    pub recovery_failure_cnt: IntCounterVec,
97    pub recovery_latency: HistogramVec,
98
99    // ********************************** Hummock ************************************
100    /// Max committed epoch
101    pub max_committed_epoch: IntGauge,
102    /// Min committed epoch
103    pub min_committed_epoch: IntGauge,
104    /// The number of SSTs in each level
105    pub level_sst_num: IntGaugeVec,
106    /// The number of SSTs to be merged to next level in each level
107    pub level_compact_cnt: IntGaugeVec,
108    /// The number of compact tasks
109    pub compact_frequency: IntCounterVec,
110    /// Size of each level
111    pub level_file_size: IntGaugeVec,
112    /// Hummock version size
113    pub version_size: IntGauge,
114    /// The version Id of current version.
115    pub current_version_id: IntGauge,
116    /// The version id of checkpoint version.
117    pub checkpoint_version_id: IntGauge,
118    /// The smallest version id that is being pinned by worker nodes.
119    pub min_pinned_version_id: IntGauge,
120    /// The smallest version id that is being guarded by meta node safe points.
121    pub min_safepoint_version_id: IntGauge,
122    /// Compaction groups that is in write stop state.
123    pub write_stop_compaction_groups: IntGaugeVec,
124    /// The number of attempts to trigger full GC.
125    pub full_gc_trigger_count: IntGauge,
126    /// The number of candidate object to delete after scanning object store.
127    pub full_gc_candidate_object_count: Histogram,
128    /// The number of object to delete after filtering by meta node.
129    pub full_gc_selected_object_count: Histogram,
130    /// Hummock version stats
131    pub version_stats: IntGaugeVec,
132    /// Hummock version stats
133    pub materialized_view_stats: IntGaugeVec,
134    /// Total number of objects that is no longer referenced by versions.
135    pub stale_object_count: IntGauge,
136    /// Total size of objects that is no longer referenced by versions.
137    pub stale_object_size: IntGauge,
138    /// Total number of objects that is still referenced by non-current versions.
139    pub old_version_object_count: IntGauge,
140    /// Total size of objects that is still referenced by non-current versions.
141    pub old_version_object_size: IntGauge,
142    /// Total number of objects that is referenced by time travel.
143    pub time_travel_object_count: IntGauge,
144    /// Total number of objects that is referenced by current version.
145    pub current_version_object_count: IntGauge,
146    /// Total size of objects that is referenced by current version.
147    pub current_version_object_size: IntGauge,
148    /// Total number of objects that includes dangling objects.
149    pub total_object_count: IntGauge,
150    /// Total size of objects that includes dangling objects.
151    pub total_object_size: IntGauge,
152    /// Number of objects per table change log.
153    pub table_change_log_object_count: IntGaugeVec,
154    /// Size of objects per table change log.
155    pub table_change_log_object_size: IntGaugeVec,
156    /// The number of hummock version delta log.
157    pub delta_log_count: IntGauge,
158    /// latency of version checkpoint
159    pub version_checkpoint_latency: Histogram,
160    /// Latency for hummock manager to acquire lock
161    pub hummock_manager_lock_time: HistogramVec,
162    /// Latency for hummock manager to really process a request after acquire the lock
163    pub hummock_manager_real_process_time: HistogramVec,
164    /// The number of compactions from one level to another level that have been skipped
165    pub compact_skip_frequency: IntCounterVec,
166    /// Bytes of lsm tree needed to reach balance
167    pub compact_pending_bytes: IntGaugeVec,
168    /// Per level compression ratio
169    pub compact_level_compression_ratio: GenericGaugeVec<AtomicF64>,
170    /// Per level number of running compaction task
171    pub level_compact_task_cnt: IntGaugeVec,
172    pub time_after_last_observation: Arc<AtomicU64>,
173    pub l0_compact_level_count: HistogramVec,
174    pub compact_task_size: HistogramVec,
175    pub compact_task_file_count: HistogramVec,
176    pub compact_task_batch_count: HistogramVec,
177    pub split_compaction_group_count: IntCounterVec,
178    pub state_table_count: IntGaugeVec,
179    pub branched_sst_count: IntGaugeVec,
180    pub compact_task_trivial_move_sst_count: HistogramVec,
181
182    pub compaction_event_consumed_latency: Histogram,
183    pub compaction_event_loop_iteration_latency: Histogram,
184
185    // ********************************** Object Store ************************************
186    // Object store related metrics (for backup/restore and version checkpoint)
187    pub object_store_metric: Arc<ObjectStoreMetrics>,
188
189    // ********************************** Source ************************************
190    /// supervisor for which source is still up.
191    pub source_is_up: LabelGuardedIntGaugeVec,
192    pub source_enumerator_metrics: Arc<SourceEnumeratorMetrics>,
193
194    // ********************************** Fragment ************************************
195    /// A dummy gauge metrics with its label to be the mapping from actor id to fragment id
196    pub actor_info: IntGaugeVec,
197    /// A dummy gauge metrics with its label to be the mapping from table id to actor id
198    pub table_info: IntGaugeVec,
199    /// A dummy gauge metrics with its label to be the mapping from actor id to sink id
200    pub sink_info: IntGaugeVec,
201    /// A dummy gauge metrics with its label to be relation info
202    pub relation_info: IntGaugeVec,
203
204    // ********************************** System Params ************************************
205    /// A dummy gauge metric with labels carrying system parameter info.
206    /// Labels: (name, value)
207    pub system_param_info: IntGaugeVec,
208
209    /// Write throughput of commit epoch for each stable
210    pub table_write_throughput: IntCounterVec,
211
212    /// The number of compaction groups that have been triggered to move
213    pub merge_compaction_group_count: IntCounterVec,
214
215    // ********************************** Auto Schema Change ************************************
216    pub auto_schema_change_failure_cnt: LabelGuardedIntCounterVec,
217    pub auto_schema_change_success_cnt: LabelGuardedIntCounterVec,
218    pub auto_schema_change_latency: LabelGuardedHistogramVec,
219
220    pub time_travel_version_replay_latency: Histogram,
221
222    pub compaction_group_count: IntGauge,
223    pub compaction_group_size: IntGaugeVec,
224    pub compaction_group_file_count: IntGaugeVec,
225    pub compaction_group_throughput: IntGaugeVec,
226}
227
228pub static GLOBAL_META_METRICS: LazyLock<MetaMetrics> =
229    LazyLock::new(|| MetaMetrics::new(&GLOBAL_METRICS_REGISTRY));
230
231impl MetaMetrics {
232    fn new(registry: &Registry) -> Self {
233        let opts = histogram_opts!(
234            "meta_grpc_duration_seconds",
235            "gRPC latency of meta services",
236            exponential_buckets(0.0001, 2.0, 20).unwrap() // max 52s
237        );
238        let grpc_latency =
239            register_histogram_vec_with_registry!(opts, &["path"], registry).unwrap();
240
241        let opts = histogram_opts!(
242            "meta_barrier_duration_seconds",
243            "barrier latency",
244            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
245        );
246        let barrier_latency =
247            register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
248                .unwrap();
249
250        let opts = histogram_opts!(
251            "meta_barrier_wait_commit_duration_seconds",
252            "barrier_wait_commit_latency",
253            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
254        );
255        let barrier_wait_commit_latency =
256            register_histogram_with_registry!(opts, registry).unwrap();
257
258        let opts = histogram_opts!(
259            "meta_barrier_send_duration_seconds",
260            "barrier send latency",
261            exponential_buckets(0.1, 1.5, 19).unwrap() // max 148s
262        );
263        let barrier_send_latency =
264            register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
265                .unwrap();
266        let barrier_interval_by_database = register_gauge_vec_with_registry!(
267            "meta_barrier_interval_by_database",
268            "barrier interval of each database",
269            &["database_id"],
270            registry
271        )
272        .unwrap();
273
274        let all_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
275            "all_barrier_nums",
276            "num of of all_barrier",
277            &["database_id"],
278            registry
279        )
280        .unwrap();
281        let in_flight_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
282            "in_flight_barrier_nums",
283            "num of of in_flight_barrier",
284            &["database_id"],
285            registry
286        )
287        .unwrap();
288        let last_committed_barrier_time = register_int_gauge_vec_with_registry!(
289            "last_committed_barrier_time",
290            "The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.",
291            &["database_id"],
292            registry
293        )
294        .unwrap();
295
296        // snapshot backfill metrics
297        let opts = histogram_opts!(
298            "meta_snapshot_backfill_barrier_duration_seconds",
299            "snapshot backfill barrier latency",
300            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
301        );
302        let snapshot_backfill_barrier_latency = register_guarded_histogram_vec_with_registry!(
303            opts,
304            &["table_id", "barrier_type"],
305            registry
306        )
307        .unwrap();
308        let opts = histogram_opts!(
309            "meta_snapshot_backfill_barrier_wait_commit_duration_seconds",
310            "snapshot backfill barrier_wait_commit_latency",
311            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
312        );
313        let snapshot_backfill_wait_commit_latency =
314            register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
315
316        let snapshot_backfill_lag = register_guarded_int_gauge_vec_with_registry!(
317            "meta_snapshot_backfill_upstream_lag",
318            "snapshot backfill upstream_lag",
319            &["table_id"],
320            registry
321        )
322        .unwrap();
323        let snapshot_backfill_inflight_barrier_num = register_guarded_int_gauge_vec_with_registry!(
324            "meta_snapshot_backfill_inflight_barrier_num",
325            "snapshot backfill inflight_barrier_num",
326            &["table_id"],
327            registry
328        )
329        .unwrap();
330
331        let max_committed_epoch = register_int_gauge_with_registry!(
332            "storage_max_committed_epoch",
333            "max committed epoch",
334            registry
335        )
336        .unwrap();
337
338        let min_committed_epoch = register_int_gauge_with_registry!(
339            "storage_min_committed_epoch",
340            "min committed epoch",
341            registry
342        )
343        .unwrap();
344
345        let level_sst_num = register_int_gauge_vec_with_registry!(
346            "storage_level_sst_num",
347            "num of SSTs in each level",
348            &["level_index"],
349            registry
350        )
351        .unwrap();
352
353        let level_compact_cnt = register_int_gauge_vec_with_registry!(
354            "storage_level_compact_cnt",
355            "num of SSTs to be merged to next level in each level",
356            &["level_index"],
357            registry
358        )
359        .unwrap();
360
361        let compact_frequency = register_int_counter_vec_with_registry!(
362            "storage_level_compact_frequency",
363            "The number of compactions from one level to another level that have completed or failed.",
364            &["compactor", "group", "task_type", "result"],
365            registry
366        )
367        .unwrap();
368        let compact_skip_frequency = register_int_counter_vec_with_registry!(
369            "storage_skip_compact_frequency",
370            "The number of compactions from one level to another level that have been skipped.",
371            &["level", "type"],
372            registry
373        )
374        .unwrap();
375
376        let version_size =
377            register_int_gauge_with_registry!("storage_version_size", "version size", registry)
378                .unwrap();
379
380        let current_version_id = register_int_gauge_with_registry!(
381            "storage_current_version_id",
382            "current version id",
383            registry
384        )
385        .unwrap();
386
387        let checkpoint_version_id = register_int_gauge_with_registry!(
388            "storage_checkpoint_version_id",
389            "checkpoint version id",
390            registry
391        )
392        .unwrap();
393
394        let min_pinned_version_id = register_int_gauge_with_registry!(
395            "storage_min_pinned_version_id",
396            "min pinned version id",
397            registry
398        )
399        .unwrap();
400
401        let write_stop_compaction_groups = register_int_gauge_vec_with_registry!(
402            "storage_write_stop_compaction_groups",
403            "compaction groups of write stop state",
404            &["compaction_group_id"],
405            registry
406        )
407        .unwrap();
408
409        let full_gc_trigger_count = register_int_gauge_with_registry!(
410            "storage_full_gc_trigger_count",
411            "the number of attempts to trigger full GC",
412            registry
413        )
414        .unwrap();
415
416        let opts = histogram_opts!(
417            "storage_full_gc_candidate_object_count",
418            "the number of candidate object to delete after scanning object store",
419            exponential_buckets(1.0, 10.0, 6).unwrap()
420        );
421        let full_gc_candidate_object_count =
422            register_histogram_with_registry!(opts, registry).unwrap();
423
424        let opts = histogram_opts!(
425            "storage_full_gc_selected_object_count",
426            "the number of object to delete after filtering by meta node",
427            exponential_buckets(1.0, 10.0, 6).unwrap()
428        );
429        let full_gc_selected_object_count =
430            register_histogram_with_registry!(opts, registry).unwrap();
431
432        let min_safepoint_version_id = register_int_gauge_with_registry!(
433            "storage_min_safepoint_version_id",
434            "min safepoint version id",
435            registry
436        )
437        .unwrap();
438
439        let level_file_size = register_int_gauge_vec_with_registry!(
440            "storage_level_total_file_size",
441            "KBs total file bytes in each level",
442            &["level_index"],
443            registry
444        )
445        .unwrap();
446
447        let version_stats = register_int_gauge_vec_with_registry!(
448            "storage_version_stats",
449            "per table stats in current hummock version",
450            &["table_id", "metric"],
451            registry
452        )
453        .unwrap();
454
455        let materialized_view_stats = register_int_gauge_vec_with_registry!(
456            "storage_materialized_view_stats",
457            "per materialized view stats in current hummock version",
458            &["table_id", "metric"],
459            registry
460        )
461        .unwrap();
462
463        let stale_object_count = register_int_gauge_with_registry!(
464            "storage_stale_object_count",
465            "total number of objects that is no longer referenced by versions.",
466            registry
467        )
468        .unwrap();
469
470        let stale_object_size = register_int_gauge_with_registry!(
471            "storage_stale_object_size",
472            "total size of objects that is no longer referenced by versions.",
473            registry
474        )
475        .unwrap();
476
477        let old_version_object_count = register_int_gauge_with_registry!(
478            "storage_old_version_object_count",
479            "total number of objects that is still referenced by non-current versions",
480            registry
481        )
482        .unwrap();
483
484        let old_version_object_size = register_int_gauge_with_registry!(
485            "storage_old_version_object_size",
486            "total size of objects that is still referenced by non-current versions",
487            registry
488        )
489        .unwrap();
490
491        let current_version_object_count = register_int_gauge_with_registry!(
492            "storage_current_version_object_count",
493            "total number of objects that is referenced by current version",
494            registry
495        )
496        .unwrap();
497
498        let current_version_object_size = register_int_gauge_with_registry!(
499            "storage_current_version_object_size",
500            "total size of objects that is referenced by current version",
501            registry
502        )
503        .unwrap();
504
505        let total_object_count = register_int_gauge_with_registry!(
506            "storage_total_object_count",
507            "Total number of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
508            registry
509        ).unwrap();
510
511        let total_object_size = register_int_gauge_with_registry!(
512            "storage_total_object_size",
513            "Total size of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
514            registry
515        ).unwrap();
516
517        let table_change_log_object_count = register_int_gauge_vec_with_registry!(
518            "storage_table_change_log_object_count",
519            "per table change log object count",
520            &["table_id"],
521            registry
522        )
523        .unwrap();
524
525        let table_change_log_object_size = register_int_gauge_vec_with_registry!(
526            "storage_table_change_log_object_size",
527            "per table change log object size",
528            &["table_id"],
529            registry
530        )
531        .unwrap();
532
533        let time_travel_object_count = register_int_gauge_with_registry!(
534            "storage_time_travel_object_count",
535            "total number of objects that is referenced by time travel.",
536            registry
537        )
538        .unwrap();
539
540        let delta_log_count = register_int_gauge_with_registry!(
541            "storage_delta_log_count",
542            "total number of hummock version delta log",
543            registry
544        )
545        .unwrap();
546
547        let opts = histogram_opts!(
548            "storage_version_checkpoint_latency",
549            "hummock version checkpoint latency",
550            exponential_buckets(0.1, 1.5, 20).unwrap()
551        );
552        let version_checkpoint_latency = register_histogram_with_registry!(opts, registry).unwrap();
553
554        let hummock_manager_lock_time = register_histogram_vec_with_registry!(
555            "hummock_manager_lock_time",
556            "latency for hummock manager to acquire the rwlock",
557            &["lock_name", "lock_type"],
558            registry
559        )
560        .unwrap();
561
562        let hummock_manager_real_process_time = register_histogram_vec_with_registry!(
563            "meta_hummock_manager_real_process_time",
564            "latency for hummock manager to really process the request",
565            &["method"],
566            registry
567        )
568        .unwrap();
569
570        let worker_num = register_int_gauge_vec_with_registry!(
571            "worker_num",
572            "number of nodes in the cluster",
573            &["worker_type"],
574            registry,
575        )
576        .unwrap();
577
578        let meta_type = register_int_gauge_vec_with_registry!(
579            "meta_num",
580            "role of meta nodes in the cluster",
581            &["worker_addr", "role"],
582            registry,
583        )
584        .unwrap();
585
586        let compact_pending_bytes = register_int_gauge_vec_with_registry!(
587            "storage_compact_pending_bytes",
588            "bytes of lsm tree needed to reach balance",
589            &["group"],
590            registry
591        )
592        .unwrap();
593
594        let compact_level_compression_ratio = register_gauge_vec_with_registry!(
595            "storage_compact_level_compression_ratio",
596            "compression ratio of each level of the lsm tree",
597            &["group", "level", "algorithm"],
598            registry
599        )
600        .unwrap();
601
602        let level_compact_task_cnt = register_int_gauge_vec_with_registry!(
603            "storage_level_compact_task_cnt",
604            "num of compact_task organized by group and level",
605            &["task"],
606            registry
607        )
608        .unwrap();
609        let object_store_metric = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
610
611        let recovery_failure_cnt = register_int_counter_vec_with_registry!(
612            "recovery_failure_cnt",
613            "Number of failed recovery attempts",
614            &["recovery_type"],
615            registry
616        )
617        .unwrap();
618        let opts = histogram_opts!(
619            "recovery_latency",
620            "Latency of the recovery process",
621            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
622        );
623        let recovery_latency =
624            register_histogram_vec_with_registry!(opts, &["recovery_type"], registry).unwrap();
625
626        let auto_schema_change_failure_cnt = register_guarded_int_counter_vec_with_registry!(
627            "auto_schema_change_failure_cnt",
628            "Number of failed auto schema change",
629            &["table_id", "table_name"],
630            registry
631        )
632        .unwrap();
633
634        let auto_schema_change_success_cnt = register_guarded_int_counter_vec_with_registry!(
635            "auto_schema_change_success_cnt",
636            "Number of success auto schema change",
637            &["table_id", "table_name"],
638            registry
639        )
640        .unwrap();
641
642        let opts = histogram_opts!(
643            "auto_schema_change_latency",
644            "Latency of the auto schema change process",
645            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
646        );
647        let auto_schema_change_latency = register_guarded_histogram_vec_with_registry!(
648            opts,
649            &["table_id", "table_name"],
650            registry
651        )
652        .unwrap();
653
654        let source_is_up = register_guarded_int_gauge_vec_with_registry!(
655            "source_status_is_up",
656            "source is up or not",
657            &["source_id", "source_name"],
658            registry
659        )
660        .unwrap();
661        let source_enumerator_metrics = Arc::new(SourceEnumeratorMetrics::default());
662
663        let actor_info = register_int_gauge_vec_with_registry!(
664            "actor_info",
665            "Mapping from actor id to (fragment id, compute node)",
666            &["actor_id", "fragment_id", "compute_node"],
667            registry
668        )
669        .unwrap();
670
671        let table_info = register_int_gauge_vec_with_registry!(
672            "table_info",
673            "Mapping from table id to (actor id, table name)",
674            &[
675                "materialized_view_id",
676                "table_id",
677                "fragment_id",
678                "table_name",
679                "table_type",
680                "compaction_group_id"
681            ],
682            registry
683        )
684        .unwrap();
685
686        let sink_info = register_int_gauge_vec_with_registry!(
687            "sink_info",
688            "Mapping from actor id to (actor id, sink name)",
689            &["actor_id", "sink_id", "sink_name",],
690            registry
691        )
692        .unwrap();
693
694        let relation_info = register_int_gauge_vec_with_registry!(
695            "relation_info",
696            "Information of the database relation (table/source/sink)",
697            &["id", "database", "schema", "name", "resource_group", "type"],
698            registry
699        )
700        .unwrap();
701
702        let l0_compact_level_count = register_histogram_vec_with_registry!(
703            "storage_l0_compact_level_count",
704            "level_count of l0 compact task",
705            &["group", "type"],
706            registry
707        )
708        .unwrap();
709
710        // System parameter info
711        let system_param_info = register_int_gauge_vec_with_registry!(
712            "system_param_info",
713            "Information of system parameters",
714            &["name", "value"],
715            registry
716        )
717        .unwrap();
718
719        let opts = histogram_opts!(
720            "storage_compact_task_size",
721            "Total size of compact that have been issued to state store",
722            exponential_buckets(1048576.0, 2.0, 16).unwrap()
723        );
724
725        let compact_task_size =
726            register_histogram_vec_with_registry!(opts, &["group", "type"], registry).unwrap();
727
728        let compact_task_file_count = register_histogram_vec_with_registry!(
729            "storage_compact_task_file_count",
730            "file count of compact task",
731            &["group", "type"],
732            registry
733        )
734        .unwrap();
735        let opts = histogram_opts!(
736            "storage_compact_task_batch_count",
737            "count of compact task batch",
738            exponential_buckets(1.0, 2.0, 8).unwrap()
739        );
740        let compact_task_batch_count =
741            register_histogram_vec_with_registry!(opts, &["type"], registry).unwrap();
742
743        let table_write_throughput = register_int_counter_vec_with_registry!(
744            "storage_commit_write_throughput",
745            "The number of compactions from one level to another level that have been skipped.",
746            &["table_id"],
747            registry
748        )
749        .unwrap();
750
751        let split_compaction_group_count = register_int_counter_vec_with_registry!(
752            "storage_split_compaction_group_count",
753            "Count of trigger split compaction group",
754            &["group"],
755            registry
756        )
757        .unwrap();
758
759        let state_table_count = register_int_gauge_vec_with_registry!(
760            "storage_state_table_count",
761            "Count of stable table per compaction group",
762            &["group"],
763            registry
764        )
765        .unwrap();
766
767        let branched_sst_count = register_int_gauge_vec_with_registry!(
768            "storage_branched_sst_count",
769            "Count of branched sst per compaction group",
770            &["group"],
771            registry
772        )
773        .unwrap();
774
775        let opts = histogram_opts!(
776            "storage_compaction_event_consumed_latency",
777            "The latency(ms) of each event being consumed",
778            exponential_buckets(1.0, 1.5, 30).unwrap() // max 191s
779        );
780        let compaction_event_consumed_latency =
781            register_histogram_with_registry!(opts, registry).unwrap();
782
783        let opts = histogram_opts!(
784            "storage_compaction_event_loop_iteration_latency",
785            "The latency(ms) of each iteration of the compaction event loop",
786            exponential_buckets(1.0, 1.5, 30).unwrap() // max 191s
787        );
788        let compaction_event_loop_iteration_latency =
789            register_histogram_with_registry!(opts, registry).unwrap();
790
791        let merge_compaction_group_count = register_int_counter_vec_with_registry!(
792            "storage_merge_compaction_group_count",
793            "Count of trigger merge compaction group",
794            &["group"],
795            registry
796        )
797        .unwrap();
798
799        let opts = histogram_opts!(
800            "storage_time_travel_version_replay_latency",
801            "The latency(ms) of replaying a hummock version for time travel",
802            exponential_buckets(0.01, 10.0, 6).unwrap()
803        );
804        let time_travel_version_replay_latency =
805            register_histogram_with_registry!(opts, registry).unwrap();
806
807        let compaction_group_count = register_int_gauge_with_registry!(
808            "storage_compaction_group_count",
809            "The number of compaction groups",
810            registry,
811        )
812        .unwrap();
813
814        let compaction_group_size = register_int_gauge_vec_with_registry!(
815            "storage_compaction_group_size",
816            "The size of compaction group",
817            &["group"],
818            registry
819        )
820        .unwrap();
821
822        let compaction_group_file_count = register_int_gauge_vec_with_registry!(
823            "storage_compaction_group_file_count",
824            "The file count of compaction group",
825            &["group"],
826            registry
827        )
828        .unwrap();
829
830        let compaction_group_throughput = register_int_gauge_vec_with_registry!(
831            "storage_compaction_group_throughput",
832            "The throughput of compaction group",
833            &["group"],
834            registry
835        )
836        .unwrap();
837
838        let opts = histogram_opts!(
839            "storage_compact_task_trivial_move_sst_count",
840            "sst count of compact trivial-move task",
841            exponential_buckets(1.0, 2.0, 8).unwrap()
842        );
843        let compact_task_trivial_move_sst_count =
844            register_histogram_vec_with_registry!(opts, &["group"], registry).unwrap();
845
846        Self {
847            grpc_latency,
848            barrier_latency,
849            barrier_wait_commit_latency,
850            barrier_send_latency,
851            all_barrier_nums,
852            in_flight_barrier_nums,
853            last_committed_barrier_time,
854            barrier_interval_by_database,
855            snapshot_backfill_barrier_latency,
856            snapshot_backfill_wait_commit_latency,
857            snapshot_backfill_lag,
858            snapshot_backfill_inflight_barrier_num,
859            recovery_failure_cnt,
860            recovery_latency,
861
862            max_committed_epoch,
863            min_committed_epoch,
864            level_sst_num,
865            level_compact_cnt,
866            compact_frequency,
867            compact_skip_frequency,
868            level_file_size,
869            version_size,
870            version_stats,
871            materialized_view_stats,
872            stale_object_count,
873            stale_object_size,
874            old_version_object_count,
875            old_version_object_size,
876            time_travel_object_count,
877            current_version_object_count,
878            current_version_object_size,
879            total_object_count,
880            total_object_size,
881            table_change_log_object_count,
882            table_change_log_object_size,
883            delta_log_count,
884            version_checkpoint_latency,
885            current_version_id,
886            checkpoint_version_id,
887            min_pinned_version_id,
888            min_safepoint_version_id,
889            write_stop_compaction_groups,
890            full_gc_trigger_count,
891            full_gc_candidate_object_count,
892            full_gc_selected_object_count,
893            hummock_manager_lock_time,
894            hummock_manager_real_process_time,
895            time_after_last_observation: Arc::new(AtomicU64::new(0)),
896            worker_num,
897            meta_type,
898            compact_pending_bytes,
899            compact_level_compression_ratio,
900            level_compact_task_cnt,
901            object_store_metric,
902            source_is_up,
903            source_enumerator_metrics,
904            actor_info,
905            table_info,
906            sink_info,
907            relation_info,
908            system_param_info,
909            l0_compact_level_count,
910            compact_task_size,
911            compact_task_file_count,
912            compact_task_batch_count,
913            compact_task_trivial_move_sst_count,
914            table_write_throughput,
915            split_compaction_group_count,
916            state_table_count,
917            branched_sst_count,
918            compaction_event_consumed_latency,
919            compaction_event_loop_iteration_latency,
920            auto_schema_change_failure_cnt,
921            auto_schema_change_success_cnt,
922            auto_schema_change_latency,
923            merge_compaction_group_count,
924            time_travel_version_replay_latency,
925            compaction_group_count,
926            compaction_group_size,
927            compaction_group_file_count,
928            compaction_group_throughput,
929        }
930    }
931
932    #[cfg(test)]
933    pub fn for_test(registry: &Registry) -> Self {
934        Self::new(registry)
935    }
936}
937impl Default for MetaMetrics {
938    fn default() -> Self {
939        GLOBAL_META_METRICS.clone()
940    }
941}
942
943/// Refresh `system_param_info` metrics by reading current system parameters.
944pub async fn refresh_system_param_info_metrics(
945    system_params_controller: &SystemParamsControllerRef,
946    meta_metrics: Arc<MetaMetrics>,
947) {
948    let params_info = system_params_controller.get_params().await.get_all();
949
950    meta_metrics.system_param_info.reset();
951    for info in params_info {
952        meta_metrics
953            .system_param_info
954            .with_label_values(&[info.name, &info.value])
955            .set(1);
956    }
957}
958
959pub fn start_worker_info_monitor(
960    metadata_manager: MetadataManager,
961    election_client: ElectionClientRef,
962    interval: Duration,
963    meta_metrics: Arc<MetaMetrics>,
964) -> (JoinHandle<()>, Sender<()>) {
965    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
966    let join_handle = tokio::spawn(async move {
967        let mut monitor_interval = tokio::time::interval(interval);
968        monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
969        loop {
970            tokio::select! {
971                // Wait for interval
972                _ = monitor_interval.tick() => {},
973                // Shutdown monitor
974                _ = &mut shutdown_rx => {
975                    tracing::info!("Worker number monitor is stopped");
976                    return;
977                }
978            }
979
980            let node_map = match metadata_manager.count_worker_node().await {
981                Ok(node_map) => node_map,
982                Err(err) => {
983                    tracing::warn!(error = %err.as_report(), "fail to count worker node");
984                    continue;
985                }
986            };
987
988            // Reset metrics to clean the stale labels e.g. invalid lease ids
989            meta_metrics.worker_num.reset();
990            meta_metrics.meta_type.reset();
991
992            for (worker_type, worker_num) in node_map {
993                meta_metrics
994                    .worker_num
995                    .with_label_values(&[(worker_type.as_str_name())])
996                    .set(worker_num as i64);
997            }
998            if let Ok(meta_members) = election_client.get_members().await {
999                meta_metrics
1000                    .worker_num
1001                    .with_label_values(&[WorkerType::Meta.as_str_name()])
1002                    .set(meta_members.len() as i64);
1003                meta_members.into_iter().for_each(|m| {
1004                    let role = if m.is_leader { "leader" } else { "follower" };
1005                    meta_metrics
1006                        .meta_type
1007                        .with_label_values(&[m.id.as_str(), role])
1008                        .set(1);
1009                });
1010            }
1011        }
1012    });
1013
1014    (join_handle, shutdown_tx)
1015}
1016
1017pub async fn refresh_fragment_info_metrics(
1018    catalog_controller: &CatalogControllerRef,
1019    cluster_controller: &ClusterControllerRef,
1020    hummock_manager: &HummockManagerRef,
1021    meta_metrics: Arc<MetaMetrics>,
1022) {
1023    let worker_nodes = match cluster_controller
1024        .list_workers(Some(WorkerType::ComputeNode.into()), None)
1025        .await
1026    {
1027        Ok(worker_nodes) => worker_nodes,
1028        Err(err) => {
1029            tracing::warn!(error=%err.as_report(), "fail to list worker node");
1030            return;
1031        }
1032    };
1033    let actor_locations = match catalog_controller.list_actor_locations() {
1034        Ok(actor_locations) => actor_locations,
1035        Err(err) => {
1036            tracing::warn!(error=%err.as_report(), "fail to get actor locations");
1037            return;
1038        }
1039    };
1040    let sink_actor_mapping = match catalog_controller.list_sink_actor_mapping().await {
1041        Ok(sink_actor_mapping) => sink_actor_mapping,
1042        Err(err) => {
1043            tracing::warn!(error=%err.as_report(), "fail to get sink actor mapping");
1044            return;
1045        }
1046    };
1047    let fragment_state_tables = match catalog_controller.list_fragment_state_tables().await {
1048        Ok(fragment_state_tables) => fragment_state_tables,
1049        Err(err) => {
1050            tracing::warn!(error=%err.as_report(), "fail to get fragment state tables");
1051            return;
1052        }
1053    };
1054    let table_name_and_type_mapping = match catalog_controller.get_table_name_type_mapping().await {
1055        Ok(mapping) => mapping,
1056        Err(err) => {
1057            tracing::warn!(error=%err.as_report(), "fail to get table name mapping");
1058            return;
1059        }
1060    };
1061
1062    let worker_addr_mapping: HashMap<WorkerId, String> = worker_nodes
1063        .into_iter()
1064        .map(|worker_node| {
1065            let addr = match worker_node.host {
1066                Some(host) => format!("{}:{}", host.host, host.port),
1067                None => "".to_owned(),
1068            };
1069            (worker_node.id as WorkerId, addr)
1070        })
1071        .collect();
1072    let table_compaction_group_id_mapping = hummock_manager
1073        .get_table_compaction_group_id_mapping()
1074        .await;
1075
1076    // Start fresh with a reset to clear all outdated labels. This is safe since we always
1077    // report full info on each interval.
1078    meta_metrics.actor_info.reset();
1079    meta_metrics.table_info.reset();
1080    meta_metrics.sink_info.reset();
1081    for actor_location in actor_locations {
1082        let actor_id_str = actor_location.actor_id.to_string();
1083        let fragment_id_str = actor_location.fragment_id.to_string();
1084        // Report a dummy gauge metrics with (fragment id, actor id, node
1085        // address) as its label
1086        if let Some(address) = worker_addr_mapping.get(&actor_location.worker_id) {
1087            meta_metrics
1088                .actor_info
1089                .with_label_values(&[&actor_id_str, &fragment_id_str, address])
1090                .set(1);
1091        }
1092    }
1093    for (sink_id, (sink_name, actor_ids)) in sink_actor_mapping {
1094        let sink_id_str = sink_id.to_string();
1095        for actor_id in actor_ids {
1096            let actor_id_str = actor_id.to_string();
1097            meta_metrics
1098                .sink_info
1099                .with_label_values(&[&actor_id_str, &sink_id_str, &sink_name])
1100                .set(1);
1101        }
1102    }
1103    for PartialFragmentStateTables {
1104        fragment_id,
1105        job_id,
1106        state_table_ids,
1107    } in fragment_state_tables
1108    {
1109        let fragment_id_str = fragment_id.to_string();
1110        let job_id_str = job_id.to_string();
1111        for table_id in state_table_ids.into_inner() {
1112            let table_id_str = table_id.to_string();
1113            let (table_name, table_type) = table_name_and_type_mapping
1114                .get(&table_id)
1115                .cloned()
1116                .unwrap_or_else(|| ("unknown".to_owned(), "unknown".to_owned()));
1117            let compaction_group_id = table_compaction_group_id_mapping
1118                .get(&(table_id as u32))
1119                .map(|cg_id| cg_id.to_string())
1120                .unwrap_or_else(|| "unknown".to_owned());
1121            meta_metrics
1122                .table_info
1123                .with_label_values(&[
1124                    &job_id_str,
1125                    &table_id_str,
1126                    &fragment_id_str,
1127                    &table_name,
1128                    &table_type,
1129                    &compaction_group_id,
1130                ])
1131                .set(1);
1132        }
1133    }
1134}
1135
1136pub async fn refresh_relation_info_metrics(
1137    catalog_controller: &CatalogControllerRef,
1138    meta_metrics: Arc<MetaMetrics>,
1139) {
1140    let table_objects = match catalog_controller.list_table_objects().await {
1141        Ok(table_objects) => table_objects,
1142        Err(err) => {
1143            tracing::warn!(error=%err.as_report(), "fail to get table objects");
1144            return;
1145        }
1146    };
1147
1148    let source_objects = match catalog_controller.list_source_objects().await {
1149        Ok(source_objects) => source_objects,
1150        Err(err) => {
1151            tracing::warn!(error=%err.as_report(), "fail to get source objects");
1152            return;
1153        }
1154    };
1155
1156    let sink_objects = match catalog_controller.list_sink_objects().await {
1157        Ok(sink_objects) => sink_objects,
1158        Err(err) => {
1159            tracing::warn!(error=%err.as_report(), "fail to get sink objects");
1160            return;
1161        }
1162    };
1163
1164    meta_metrics.relation_info.reset();
1165
1166    for (id, db, schema, name, resource_group) in table_objects {
1167        meta_metrics
1168            .relation_info
1169            .with_label_values(&[
1170                &id.to_string(),
1171                &db,
1172                &schema,
1173                &name,
1174                &resource_group,
1175                &"table".to_owned(),
1176            ])
1177            .set(1);
1178    }
1179
1180    for (id, db, schema, name, resource_group) in source_objects {
1181        meta_metrics
1182            .relation_info
1183            .with_label_values(&[
1184                &id.to_string(),
1185                &db,
1186                &schema,
1187                &name,
1188                &resource_group,
1189                &"source".to_owned(),
1190            ])
1191            .set(1);
1192    }
1193
1194    for (id, db, schema, name, resource_group) in sink_objects {
1195        meta_metrics
1196            .relation_info
1197            .with_label_values(&[
1198                &id.to_string(),
1199                &db,
1200                &schema,
1201                &name,
1202                &resource_group,
1203                &"sink".to_owned(),
1204            ])
1205            .set(1);
1206    }
1207}
1208
1209pub fn start_info_monitor(
1210    metadata_manager: MetadataManager,
1211    hummock_manager: HummockManagerRef,
1212    system_params_controller: SystemParamsControllerRef,
1213    meta_metrics: Arc<MetaMetrics>,
1214) -> (JoinHandle<()>, Sender<()>) {
1215    const COLLECT_INTERVAL_SECONDS: u64 = 60;
1216
1217    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1218    let join_handle = tokio::spawn(async move {
1219        let mut monitor_interval =
1220            tokio::time::interval(Duration::from_secs(COLLECT_INTERVAL_SECONDS));
1221        monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1222        loop {
1223            tokio::select! {
1224                // Wait for interval
1225                _ = monitor_interval.tick() => {},
1226                // Shutdown monitor
1227                _ = &mut shutdown_rx => {
1228                    tracing::info!("Meta info monitor is stopped");
1229                    return;
1230                }
1231            }
1232
1233            // Fragment and relation info
1234            refresh_fragment_info_metrics(
1235                &metadata_manager.catalog_controller,
1236                &metadata_manager.cluster_controller,
1237                &hummock_manager,
1238                meta_metrics.clone(),
1239            )
1240            .await;
1241
1242            refresh_relation_info_metrics(
1243                &metadata_manager.catalog_controller,
1244                meta_metrics.clone(),
1245            )
1246            .await;
1247
1248            // System parameter info
1249            refresh_system_param_info_metrics(&system_params_controller, meta_metrics.clone())
1250                .await;
1251        }
1252    });
1253
1254    (join_handle, shutdown_tx)
1255}