risingwave_meta/rpc/
metrics.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::atomic::AtomicU64;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use prometheus::core::{AtomicF64, GenericGaugeVec};
21use prometheus::{
22    GaugeVec, Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
23    exponential_buckets, histogram_opts, register_gauge_vec_with_registry,
24    register_histogram_vec_with_registry, register_histogram_with_registry,
25    register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
26    register_int_gauge_with_registry,
27};
28use risingwave_common::metrics::{
29    LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
30    LabelGuardedUintGaugeVec,
31};
32use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::{
35    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
36    register_guarded_int_gauge_vec_with_registry, register_guarded_uint_gauge_vec_with_registry,
37};
38use risingwave_connector::source::monitor::EnumeratorMetrics as SourceEnumeratorMetrics;
39use risingwave_meta_model::WorkerId;
40use risingwave_object_store::object::object_metrics::{
41    GLOBAL_OBJECT_STORE_METRICS, ObjectStoreMetrics,
42};
43use risingwave_pb::common::WorkerType;
44use thiserror_ext::AsReport;
45use tokio::sync::oneshot::Sender;
46use tokio::task::JoinHandle;
47
48use crate::controller::catalog::CatalogControllerRef;
49use crate::controller::cluster::ClusterControllerRef;
50use crate::controller::system_param::SystemParamsControllerRef;
51use crate::controller::utils::PartialFragmentStateTables;
52use crate::hummock::HummockManagerRef;
53use crate::manager::MetadataManager;
54use crate::rpc::ElectionClientRef;
55
56#[derive(Clone)]
57pub struct MetaMetrics {
58    // ********************************** Meta ************************************
59    /// The number of workers in the cluster.
60    pub worker_num: IntGaugeVec,
61    /// The roles of all meta nodes in the cluster.
62    pub meta_type: IntGaugeVec,
63
64    // ********************************** gRPC ************************************
65    /// gRPC latency of meta services
66    pub grpc_latency: HistogramVec,
67
68    // ********************************** Barrier ************************************
69    /// The duration from barrier injection to commit
70    /// It is the sum of inflight-latency, sync-latency and wait-commit-latency
71    pub barrier_latency: LabelGuardedHistogramVec,
72    /// The duration from barrier complete to commit
73    pub barrier_wait_commit_latency: Histogram,
74    /// Latency between each barrier send
75    pub barrier_send_latency: LabelGuardedHistogramVec,
76    /// The number of all barriers. It is the sum of barriers that are in-flight or completed but
77    /// waiting for other barriers
78    pub all_barrier_nums: LabelGuardedIntGaugeVec,
79    /// The number of in-flight barriers
80    pub in_flight_barrier_nums: LabelGuardedIntGaugeVec,
81    /// The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.
82    pub last_committed_barrier_time: IntGaugeVec,
83    /// The barrier interval of each database
84    pub barrier_interval_by_database: GaugeVec,
85
86    // ********************************** Snapshot Backfill ***************************
87    /// The barrier latency in second of `table_id` and snapshto backfill `barrier_type`
88    pub snapshot_backfill_barrier_latency: LabelGuardedHistogramVec, // (table_id, barrier_type)
89    /// The latency of commit epoch of `table_id`
90    pub snapshot_backfill_wait_commit_latency: LabelGuardedHistogramVec, // (table_id, )
91    /// The lags between the upstream epoch and the downstream epoch.
92    pub snapshot_backfill_lag: LabelGuardedIntGaugeVec, // (table_id, )
93    /// The number of inflight barriers of `table_id`
94    pub snapshot_backfill_inflight_barrier_num: LabelGuardedIntGaugeVec, // (table_id, _)
95
96    // ********************************** Recovery ************************************
97    pub recovery_failure_cnt: IntCounterVec,
98    pub recovery_latency: HistogramVec,
99
100    // ********************************** Hummock ************************************
101    /// Max committed epoch
102    pub max_committed_epoch: IntGauge,
103    /// Min committed epoch
104    pub min_committed_epoch: IntGauge,
105    /// The number of SSTs in each level
106    pub level_sst_num: IntGaugeVec,
107    /// The number of SSTs to be merged to next level in each level
108    pub level_compact_cnt: IntGaugeVec,
109    /// The number of compact tasks
110    pub compact_frequency: IntCounterVec,
111    /// Size of each level
112    pub level_file_size: IntGaugeVec,
113    /// Hummock version size
114    pub version_size: IntGauge,
115    /// The version Id of current version.
116    pub current_version_id: IntGauge,
117    /// The version id of checkpoint version.
118    pub checkpoint_version_id: IntGauge,
119    /// The smallest version id that is being pinned by worker nodes.
120    pub min_pinned_version_id: IntGauge,
121    /// The smallest version id that is being guarded by meta node safe points.
122    pub min_safepoint_version_id: IntGauge,
123    /// Compaction groups that is in write stop state.
124    pub write_stop_compaction_groups: IntGaugeVec,
125    /// The number of attempts to trigger full GC.
126    pub full_gc_trigger_count: IntGauge,
127    /// The number of candidate object to delete after scanning object store.
128    pub full_gc_candidate_object_count: Histogram,
129    /// The number of object to delete after filtering by meta node.
130    pub full_gc_selected_object_count: Histogram,
131    /// Hummock version stats
132    pub version_stats: IntGaugeVec,
133    /// Hummock version stats
134    pub materialized_view_stats: IntGaugeVec,
135    /// Total number of objects that is no longer referenced by versions.
136    pub stale_object_count: IntGauge,
137    /// Total size of objects that is no longer referenced by versions.
138    pub stale_object_size: IntGauge,
139    /// Total number of objects that is still referenced by non-current versions.
140    pub old_version_object_count: IntGauge,
141    /// Total size of objects that is still referenced by non-current versions.
142    pub old_version_object_size: IntGauge,
143    /// Total number of objects that is referenced by time travel.
144    pub time_travel_object_count: IntGauge,
145    /// Total number of objects that is referenced by current version.
146    pub current_version_object_count: IntGauge,
147    /// Total size of objects that is referenced by current version.
148    pub current_version_object_size: IntGauge,
149    /// Total number of objects that includes dangling objects.
150    pub total_object_count: IntGauge,
151    /// Total size of objects that includes dangling objects.
152    pub total_object_size: IntGauge,
153    /// Number of objects per table change log.
154    pub table_change_log_object_count: IntGaugeVec,
155    /// Size of objects per table change log.
156    pub table_change_log_object_size: IntGaugeVec,
157    /// The number of hummock version delta log.
158    pub delta_log_count: IntGauge,
159    /// latency of version checkpoint
160    pub version_checkpoint_latency: Histogram,
161    /// Latency for hummock manager to acquire lock
162    pub hummock_manager_lock_time: HistogramVec,
163    /// Latency for hummock manager to really process a request after acquire the lock
164    pub hummock_manager_real_process_time: HistogramVec,
165    /// The number of compactions from one level to another level that have been skipped
166    pub compact_skip_frequency: IntCounterVec,
167    /// Bytes of lsm tree needed to reach balance
168    pub compact_pending_bytes: IntGaugeVec,
169    /// Per level compression ratio
170    pub compact_level_compression_ratio: GenericGaugeVec<AtomicF64>,
171    /// Per level number of running compaction task
172    pub level_compact_task_cnt: IntGaugeVec,
173    pub time_after_last_observation: Arc<AtomicU64>,
174    pub l0_compact_level_count: HistogramVec,
175    pub compact_task_size: HistogramVec,
176    pub compact_task_file_count: HistogramVec,
177    pub compact_task_batch_count: HistogramVec,
178    pub split_compaction_group_count: IntCounterVec,
179    pub state_table_count: IntGaugeVec,
180    pub branched_sst_count: IntGaugeVec,
181    pub compact_task_trivial_move_sst_count: HistogramVec,
182
183    pub compaction_event_consumed_latency: Histogram,
184    pub compaction_event_loop_iteration_latency: Histogram,
185
186    // ********************************** Object Store ************************************
187    // Object store related metrics (for backup/restore and version checkpoint)
188    pub object_store_metric: Arc<ObjectStoreMetrics>,
189
190    // ********************************** Source ************************************
191    /// supervisor for which source is still up.
192    pub source_is_up: LabelGuardedIntGaugeVec,
193    pub source_enumerator_metrics: Arc<SourceEnumeratorMetrics>,
194
195    // ********************************** Fragment ************************************
196    /// A dummy gauge metrics with its label to be the mapping from actor id to fragment id
197    pub actor_info: IntGaugeVec,
198    /// A dummy gauge metrics with its label to be the mapping from table id to actor id
199    pub table_info: IntGaugeVec,
200    /// A dummy gauge metrics with its label to be the mapping from actor id to sink id
201    pub sink_info: IntGaugeVec,
202    /// A dummy gauge metrics with its label to be relation info
203    pub relation_info: IntGaugeVec,
204
205    // ********************************** System Params ************************************
206    /// A dummy gauge metric with labels carrying system parameter info.
207    /// Labels: (name, value)
208    pub system_param_info: IntGaugeVec,
209
210    /// Write throughput of commit epoch for each stable
211    pub table_write_throughput: IntCounterVec,
212
213    /// The number of compaction groups that have been triggered to move
214    pub merge_compaction_group_count: IntCounterVec,
215
216    // ********************************** Auto Schema Change ************************************
217    pub auto_schema_change_failure_cnt: LabelGuardedIntCounterVec,
218    pub auto_schema_change_success_cnt: LabelGuardedIntCounterVec,
219    pub auto_schema_change_latency: LabelGuardedHistogramVec,
220
221    pub time_travel_version_replay_latency: Histogram,
222
223    pub compaction_group_count: IntGauge,
224    pub compaction_group_size: IntGaugeVec,
225    pub compaction_group_file_count: IntGaugeVec,
226    pub compaction_group_throughput: IntGaugeVec,
227
228    // ********************************** Refresh Manager ************************************
229    pub refresh_job_duration: LabelGuardedUintGaugeVec,
230    pub refresh_job_finish_cnt: LabelGuardedIntCounterVec,
231    pub refresh_cron_job_trigger_cnt: LabelGuardedIntCounterVec,
232    pub refresh_cron_job_miss_cnt: LabelGuardedIntCounterVec,
233}
234
235pub static GLOBAL_META_METRICS: LazyLock<MetaMetrics> =
236    LazyLock::new(|| MetaMetrics::new(&GLOBAL_METRICS_REGISTRY));
237
238impl MetaMetrics {
239    fn new(registry: &Registry) -> Self {
240        let opts = histogram_opts!(
241            "meta_grpc_duration_seconds",
242            "gRPC latency of meta services",
243            exponential_buckets(0.0001, 2.0, 20).unwrap() // max 52s
244        );
245        let grpc_latency =
246            register_histogram_vec_with_registry!(opts, &["path"], registry).unwrap();
247
248        let opts = histogram_opts!(
249            "meta_barrier_duration_seconds",
250            "barrier latency",
251            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
252        );
253        let barrier_latency =
254            register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
255                .unwrap();
256
257        let opts = histogram_opts!(
258            "meta_barrier_wait_commit_duration_seconds",
259            "barrier_wait_commit_latency",
260            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
261        );
262        let barrier_wait_commit_latency =
263            register_histogram_with_registry!(opts, registry).unwrap();
264
265        let opts = histogram_opts!(
266            "meta_barrier_send_duration_seconds",
267            "barrier send latency",
268            exponential_buckets(0.1, 1.5, 19).unwrap() // max 148s
269        );
270        let barrier_send_latency =
271            register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
272                .unwrap();
273        let barrier_interval_by_database = register_gauge_vec_with_registry!(
274            "meta_barrier_interval_by_database",
275            "barrier interval of each database",
276            &["database_id"],
277            registry
278        )
279        .unwrap();
280
281        let all_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
282            "all_barrier_nums",
283            "num of of all_barrier",
284            &["database_id"],
285            registry
286        )
287        .unwrap();
288        let in_flight_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
289            "in_flight_barrier_nums",
290            "num of of in_flight_barrier",
291            &["database_id"],
292            registry
293        )
294        .unwrap();
295        let last_committed_barrier_time = register_int_gauge_vec_with_registry!(
296            "last_committed_barrier_time",
297            "The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.",
298            &["database_id"],
299            registry
300        )
301        .unwrap();
302
303        // snapshot backfill metrics
304        let opts = histogram_opts!(
305            "meta_snapshot_backfill_barrier_duration_seconds",
306            "snapshot backfill barrier latency",
307            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
308        );
309        let snapshot_backfill_barrier_latency = register_guarded_histogram_vec_with_registry!(
310            opts,
311            &["table_id", "barrier_type"],
312            registry
313        )
314        .unwrap();
315        let opts = histogram_opts!(
316            "meta_snapshot_backfill_barrier_wait_commit_duration_seconds",
317            "snapshot backfill barrier_wait_commit_latency",
318            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
319        );
320        let snapshot_backfill_wait_commit_latency =
321            register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
322
323        let snapshot_backfill_lag = register_guarded_int_gauge_vec_with_registry!(
324            "meta_snapshot_backfill_upstream_lag",
325            "snapshot backfill upstream_lag",
326            &["table_id"],
327            registry
328        )
329        .unwrap();
330        let snapshot_backfill_inflight_barrier_num = register_guarded_int_gauge_vec_with_registry!(
331            "meta_snapshot_backfill_inflight_barrier_num",
332            "snapshot backfill inflight_barrier_num",
333            &["table_id"],
334            registry
335        )
336        .unwrap();
337
338        let max_committed_epoch = register_int_gauge_with_registry!(
339            "storage_max_committed_epoch",
340            "max committed epoch",
341            registry
342        )
343        .unwrap();
344
345        let min_committed_epoch = register_int_gauge_with_registry!(
346            "storage_min_committed_epoch",
347            "min committed epoch",
348            registry
349        )
350        .unwrap();
351
352        let level_sst_num = register_int_gauge_vec_with_registry!(
353            "storage_level_sst_num",
354            "num of SSTs in each level",
355            &["level_index"],
356            registry
357        )
358        .unwrap();
359
360        let level_compact_cnt = register_int_gauge_vec_with_registry!(
361            "storage_level_compact_cnt",
362            "num of SSTs to be merged to next level in each level",
363            &["level_index"],
364            registry
365        )
366        .unwrap();
367
368        let compact_frequency = register_int_counter_vec_with_registry!(
369            "storage_level_compact_frequency",
370            "The number of compactions from one level to another level that have completed or failed.",
371            &["compactor", "group", "task_type", "result"],
372            registry
373        )
374        .unwrap();
375        let compact_skip_frequency = register_int_counter_vec_with_registry!(
376            "storage_skip_compact_frequency",
377            "The number of compactions from one level to another level that have been skipped.",
378            &["level", "type"],
379            registry
380        )
381        .unwrap();
382
383        let version_size =
384            register_int_gauge_with_registry!("storage_version_size", "version size", registry)
385                .unwrap();
386
387        let current_version_id = register_int_gauge_with_registry!(
388            "storage_current_version_id",
389            "current version id",
390            registry
391        )
392        .unwrap();
393
394        let checkpoint_version_id = register_int_gauge_with_registry!(
395            "storage_checkpoint_version_id",
396            "checkpoint version id",
397            registry
398        )
399        .unwrap();
400
401        let min_pinned_version_id = register_int_gauge_with_registry!(
402            "storage_min_pinned_version_id",
403            "min pinned version id",
404            registry
405        )
406        .unwrap();
407
408        let write_stop_compaction_groups = register_int_gauge_vec_with_registry!(
409            "storage_write_stop_compaction_groups",
410            "compaction groups of write stop state",
411            &["compaction_group_id"],
412            registry
413        )
414        .unwrap();
415
416        let full_gc_trigger_count = register_int_gauge_with_registry!(
417            "storage_full_gc_trigger_count",
418            "the number of attempts to trigger full GC",
419            registry
420        )
421        .unwrap();
422
423        let opts = histogram_opts!(
424            "storage_full_gc_candidate_object_count",
425            "the number of candidate object to delete after scanning object store",
426            exponential_buckets(1.0, 10.0, 6).unwrap()
427        );
428        let full_gc_candidate_object_count =
429            register_histogram_with_registry!(opts, registry).unwrap();
430
431        let opts = histogram_opts!(
432            "storage_full_gc_selected_object_count",
433            "the number of object to delete after filtering by meta node",
434            exponential_buckets(1.0, 10.0, 6).unwrap()
435        );
436        let full_gc_selected_object_count =
437            register_histogram_with_registry!(opts, registry).unwrap();
438
439        let min_safepoint_version_id = register_int_gauge_with_registry!(
440            "storage_min_safepoint_version_id",
441            "min safepoint version id",
442            registry
443        )
444        .unwrap();
445
446        let level_file_size = register_int_gauge_vec_with_registry!(
447            "storage_level_total_file_size",
448            "KBs total file bytes in each level",
449            &["level_index"],
450            registry
451        )
452        .unwrap();
453
454        let version_stats = register_int_gauge_vec_with_registry!(
455            "storage_version_stats",
456            "per table stats in current hummock version",
457            &["table_id", "metric"],
458            registry
459        )
460        .unwrap();
461
462        let materialized_view_stats = register_int_gauge_vec_with_registry!(
463            "storage_materialized_view_stats",
464            "per materialized view stats in current hummock version",
465            &["table_id", "metric"],
466            registry
467        )
468        .unwrap();
469
470        let stale_object_count = register_int_gauge_with_registry!(
471            "storage_stale_object_count",
472            "total number of objects that is no longer referenced by versions.",
473            registry
474        )
475        .unwrap();
476
477        let stale_object_size = register_int_gauge_with_registry!(
478            "storage_stale_object_size",
479            "total size of objects that is no longer referenced by versions.",
480            registry
481        )
482        .unwrap();
483
484        let old_version_object_count = register_int_gauge_with_registry!(
485            "storage_old_version_object_count",
486            "total number of objects that is still referenced by non-current versions",
487            registry
488        )
489        .unwrap();
490
491        let old_version_object_size = register_int_gauge_with_registry!(
492            "storage_old_version_object_size",
493            "total size of objects that is still referenced by non-current versions",
494            registry
495        )
496        .unwrap();
497
498        let current_version_object_count = register_int_gauge_with_registry!(
499            "storage_current_version_object_count",
500            "total number of objects that is referenced by current version",
501            registry
502        )
503        .unwrap();
504
505        let current_version_object_size = register_int_gauge_with_registry!(
506            "storage_current_version_object_size",
507            "total size of objects that is referenced by current version",
508            registry
509        )
510        .unwrap();
511
512        let total_object_count = register_int_gauge_with_registry!(
513            "storage_total_object_count",
514            "Total number of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
515            registry
516        ).unwrap();
517
518        let total_object_size = register_int_gauge_with_registry!(
519            "storage_total_object_size",
520            "Total size of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
521            registry
522        ).unwrap();
523
524        let table_change_log_object_count = register_int_gauge_vec_with_registry!(
525            "storage_table_change_log_object_count",
526            "per table change log object count",
527            &["table_id"],
528            registry
529        )
530        .unwrap();
531
532        let table_change_log_object_size = register_int_gauge_vec_with_registry!(
533            "storage_table_change_log_object_size",
534            "per table change log object size",
535            &["table_id"],
536            registry
537        )
538        .unwrap();
539
540        let time_travel_object_count = register_int_gauge_with_registry!(
541            "storage_time_travel_object_count",
542            "total number of objects that is referenced by time travel.",
543            registry
544        )
545        .unwrap();
546
547        let delta_log_count = register_int_gauge_with_registry!(
548            "storage_delta_log_count",
549            "total number of hummock version delta log",
550            registry
551        )
552        .unwrap();
553
554        let opts = histogram_opts!(
555            "storage_version_checkpoint_latency",
556            "hummock version checkpoint latency",
557            exponential_buckets(0.1, 1.5, 20).unwrap()
558        );
559        let version_checkpoint_latency = register_histogram_with_registry!(opts, registry).unwrap();
560
561        let hummock_manager_lock_time = register_histogram_vec_with_registry!(
562            "hummock_manager_lock_time",
563            "latency for hummock manager to acquire the rwlock",
564            &["lock_name", "lock_type"],
565            registry
566        )
567        .unwrap();
568
569        let hummock_manager_real_process_time = register_histogram_vec_with_registry!(
570            "meta_hummock_manager_real_process_time",
571            "latency for hummock manager to really process the request",
572            &["method"],
573            registry
574        )
575        .unwrap();
576
577        let worker_num = register_int_gauge_vec_with_registry!(
578            "worker_num",
579            "number of nodes in the cluster",
580            &["worker_type"],
581            registry,
582        )
583        .unwrap();
584
585        let meta_type = register_int_gauge_vec_with_registry!(
586            "meta_num",
587            "role of meta nodes in the cluster",
588            &["worker_addr", "role"],
589            registry,
590        )
591        .unwrap();
592
593        let compact_pending_bytes = register_int_gauge_vec_with_registry!(
594            "storage_compact_pending_bytes",
595            "bytes of lsm tree needed to reach balance",
596            &["group"],
597            registry
598        )
599        .unwrap();
600
601        let compact_level_compression_ratio = register_gauge_vec_with_registry!(
602            "storage_compact_level_compression_ratio",
603            "compression ratio of each level of the lsm tree",
604            &["group", "level", "algorithm"],
605            registry
606        )
607        .unwrap();
608
609        let level_compact_task_cnt = register_int_gauge_vec_with_registry!(
610            "storage_level_compact_task_cnt",
611            "num of compact_task organized by group and level",
612            &["task"],
613            registry
614        )
615        .unwrap();
616        let object_store_metric = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
617
618        let recovery_failure_cnt = register_int_counter_vec_with_registry!(
619            "recovery_failure_cnt",
620            "Number of failed recovery attempts",
621            &["recovery_type"],
622            registry
623        )
624        .unwrap();
625        let opts = histogram_opts!(
626            "recovery_latency",
627            "Latency of the recovery process",
628            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
629        );
630        let recovery_latency =
631            register_histogram_vec_with_registry!(opts, &["recovery_type"], registry).unwrap();
632
633        let auto_schema_change_failure_cnt = register_guarded_int_counter_vec_with_registry!(
634            "auto_schema_change_failure_cnt",
635            "Number of failed auto schema change",
636            &["table_id", "table_name"],
637            registry
638        )
639        .unwrap();
640
641        let auto_schema_change_success_cnt = register_guarded_int_counter_vec_with_registry!(
642            "auto_schema_change_success_cnt",
643            "Number of success auto schema change",
644            &["table_id", "table_name"],
645            registry
646        )
647        .unwrap();
648
649        let opts = histogram_opts!(
650            "auto_schema_change_latency",
651            "Latency of the auto schema change process",
652            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
653        );
654        let auto_schema_change_latency = register_guarded_histogram_vec_with_registry!(
655            opts,
656            &["table_id", "table_name"],
657            registry
658        )
659        .unwrap();
660
661        let source_is_up = register_guarded_int_gauge_vec_with_registry!(
662            "source_status_is_up",
663            "source is up or not",
664            &["source_id", "source_name"],
665            registry
666        )
667        .unwrap();
668        let source_enumerator_metrics = Arc::new(SourceEnumeratorMetrics::default());
669
670        let actor_info = register_int_gauge_vec_with_registry!(
671            "actor_info",
672            "Mapping from actor id to (fragment id, compute node)",
673            &["actor_id", "fragment_id", "compute_node"],
674            registry
675        )
676        .unwrap();
677
678        let table_info = register_int_gauge_vec_with_registry!(
679            "table_info",
680            "Mapping from table id to (actor id, table name)",
681            &[
682                "materialized_view_id",
683                "table_id",
684                "fragment_id",
685                "table_name",
686                "table_type",
687                "compaction_group_id"
688            ],
689            registry
690        )
691        .unwrap();
692
693        let sink_info = register_int_gauge_vec_with_registry!(
694            "sink_info",
695            "Mapping from actor id to (actor id, sink name)",
696            &["actor_id", "sink_id", "sink_name",],
697            registry
698        )
699        .unwrap();
700
701        let relation_info = register_int_gauge_vec_with_registry!(
702            "relation_info",
703            "Information of the database relation (table/source/sink)",
704            &["id", "database", "schema", "name", "resource_group", "type"],
705            registry
706        )
707        .unwrap();
708
709        let l0_compact_level_count = register_histogram_vec_with_registry!(
710            "storage_l0_compact_level_count",
711            "level_count of l0 compact task",
712            &["group", "type"],
713            registry
714        )
715        .unwrap();
716
717        // System parameter info
718        let system_param_info = register_int_gauge_vec_with_registry!(
719            "system_param_info",
720            "Information of system parameters",
721            &["name", "value"],
722            registry
723        )
724        .unwrap();
725
726        let opts = histogram_opts!(
727            "storage_compact_task_size",
728            "Total size of compact that have been issued to state store",
729            exponential_buckets(1048576.0, 2.0, 16).unwrap()
730        );
731
732        let compact_task_size =
733            register_histogram_vec_with_registry!(opts, &["group", "type"], registry).unwrap();
734
735        let compact_task_file_count = register_histogram_vec_with_registry!(
736            "storage_compact_task_file_count",
737            "file count of compact task",
738            &["group", "type"],
739            registry
740        )
741        .unwrap();
742        let opts = histogram_opts!(
743            "storage_compact_task_batch_count",
744            "count of compact task batch",
745            exponential_buckets(1.0, 2.0, 8).unwrap()
746        );
747        let compact_task_batch_count =
748            register_histogram_vec_with_registry!(opts, &["type"], registry).unwrap();
749
750        let table_write_throughput = register_int_counter_vec_with_registry!(
751            "storage_commit_write_throughput",
752            "The number of compactions from one level to another level that have been skipped.",
753            &["table_id"],
754            registry
755        )
756        .unwrap();
757
758        let split_compaction_group_count = register_int_counter_vec_with_registry!(
759            "storage_split_compaction_group_count",
760            "Count of trigger split compaction group",
761            &["group"],
762            registry
763        )
764        .unwrap();
765
766        let state_table_count = register_int_gauge_vec_with_registry!(
767            "storage_state_table_count",
768            "Count of stable table per compaction group",
769            &["group"],
770            registry
771        )
772        .unwrap();
773
774        let branched_sst_count = register_int_gauge_vec_with_registry!(
775            "storage_branched_sst_count",
776            "Count of branched sst per compaction group",
777            &["group"],
778            registry
779        )
780        .unwrap();
781
782        let opts = histogram_opts!(
783            "storage_compaction_event_consumed_latency",
784            "The latency(ms) of each event being consumed",
785            exponential_buckets(1.0, 1.5, 30).unwrap() // max 191s
786        );
787        let compaction_event_consumed_latency =
788            register_histogram_with_registry!(opts, registry).unwrap();
789
790        let opts = histogram_opts!(
791            "storage_compaction_event_loop_iteration_latency",
792            "The latency(ms) of each iteration of the compaction event loop",
793            exponential_buckets(1.0, 1.5, 30).unwrap() // max 191s
794        );
795        let compaction_event_loop_iteration_latency =
796            register_histogram_with_registry!(opts, registry).unwrap();
797
798        let merge_compaction_group_count = register_int_counter_vec_with_registry!(
799            "storage_merge_compaction_group_count",
800            "Count of trigger merge compaction group",
801            &["group"],
802            registry
803        )
804        .unwrap();
805
806        let opts = histogram_opts!(
807            "storage_time_travel_version_replay_latency",
808            "The latency(ms) of replaying a hummock version for time travel",
809            exponential_buckets(0.01, 10.0, 6).unwrap()
810        );
811        let time_travel_version_replay_latency =
812            register_histogram_with_registry!(opts, registry).unwrap();
813
814        let compaction_group_count = register_int_gauge_with_registry!(
815            "storage_compaction_group_count",
816            "The number of compaction groups",
817            registry,
818        )
819        .unwrap();
820
821        let compaction_group_size = register_int_gauge_vec_with_registry!(
822            "storage_compaction_group_size",
823            "The size of compaction group",
824            &["group"],
825            registry
826        )
827        .unwrap();
828
829        let compaction_group_file_count = register_int_gauge_vec_with_registry!(
830            "storage_compaction_group_file_count",
831            "The file count of compaction group",
832            &["group"],
833            registry
834        )
835        .unwrap();
836
837        let compaction_group_throughput = register_int_gauge_vec_with_registry!(
838            "storage_compaction_group_throughput",
839            "The throughput of compaction group",
840            &["group"],
841            registry
842        )
843        .unwrap();
844
845        let opts = histogram_opts!(
846            "storage_compact_task_trivial_move_sst_count",
847            "sst count of compact trivial-move task",
848            exponential_buckets(1.0, 2.0, 8).unwrap()
849        );
850        let compact_task_trivial_move_sst_count =
851            register_histogram_vec_with_registry!(opts, &["group"], registry).unwrap();
852
853        let refresh_job_duration = register_guarded_uint_gauge_vec_with_registry!(
854            "meta_refresh_job_duration",
855            "The duration of refresh job",
856            &["table_id", "status"],
857            registry
858        )
859        .unwrap();
860        let refresh_job_finish_cnt = register_guarded_int_counter_vec_with_registry!(
861            "meta_refresh_job_finish_cnt",
862            "The number of finished refresh jobs",
863            &["table_id", "status"],
864            registry
865        )
866        .unwrap();
867        let refresh_cron_job_trigger_cnt = register_guarded_int_counter_vec_with_registry!(
868            "meta_refresh_cron_job_trigger_cnt",
869            "The number of cron refresh jobs triggered",
870            &["table_id"],
871            registry
872        )
873        .unwrap();
874        let refresh_cron_job_miss_cnt = register_guarded_int_counter_vec_with_registry!(
875            "meta_refresh_cron_job_miss_cnt",
876            "The number of cron refresh jobs missed",
877            &["table_id"],
878            registry
879        )
880        .unwrap();
881
882        Self {
883            grpc_latency,
884            barrier_latency,
885            barrier_wait_commit_latency,
886            barrier_send_latency,
887            all_barrier_nums,
888            in_flight_barrier_nums,
889            last_committed_barrier_time,
890            barrier_interval_by_database,
891            snapshot_backfill_barrier_latency,
892            snapshot_backfill_wait_commit_latency,
893            snapshot_backfill_lag,
894            snapshot_backfill_inflight_barrier_num,
895            recovery_failure_cnt,
896            recovery_latency,
897
898            max_committed_epoch,
899            min_committed_epoch,
900            level_sst_num,
901            level_compact_cnt,
902            compact_frequency,
903            compact_skip_frequency,
904            level_file_size,
905            version_size,
906            version_stats,
907            materialized_view_stats,
908            stale_object_count,
909            stale_object_size,
910            old_version_object_count,
911            old_version_object_size,
912            time_travel_object_count,
913            current_version_object_count,
914            current_version_object_size,
915            total_object_count,
916            total_object_size,
917            table_change_log_object_count,
918            table_change_log_object_size,
919            delta_log_count,
920            version_checkpoint_latency,
921            current_version_id,
922            checkpoint_version_id,
923            min_pinned_version_id,
924            min_safepoint_version_id,
925            write_stop_compaction_groups,
926            full_gc_trigger_count,
927            full_gc_candidate_object_count,
928            full_gc_selected_object_count,
929            hummock_manager_lock_time,
930            hummock_manager_real_process_time,
931            time_after_last_observation: Arc::new(AtomicU64::new(0)),
932            worker_num,
933            meta_type,
934            compact_pending_bytes,
935            compact_level_compression_ratio,
936            level_compact_task_cnt,
937            object_store_metric,
938            source_is_up,
939            source_enumerator_metrics,
940            actor_info,
941            table_info,
942            sink_info,
943            relation_info,
944            system_param_info,
945            l0_compact_level_count,
946            compact_task_size,
947            compact_task_file_count,
948            compact_task_batch_count,
949            compact_task_trivial_move_sst_count,
950            table_write_throughput,
951            split_compaction_group_count,
952            state_table_count,
953            branched_sst_count,
954            compaction_event_consumed_latency,
955            compaction_event_loop_iteration_latency,
956            auto_schema_change_failure_cnt,
957            auto_schema_change_success_cnt,
958            auto_schema_change_latency,
959            merge_compaction_group_count,
960            time_travel_version_replay_latency,
961            compaction_group_count,
962            compaction_group_size,
963            compaction_group_file_count,
964            compaction_group_throughput,
965            refresh_job_duration,
966            refresh_job_finish_cnt,
967            refresh_cron_job_trigger_cnt,
968            refresh_cron_job_miss_cnt,
969        }
970    }
971
972    #[cfg(test)]
973    pub fn for_test(registry: &Registry) -> Self {
974        Self::new(registry)
975    }
976}
977impl Default for MetaMetrics {
978    fn default() -> Self {
979        GLOBAL_META_METRICS.clone()
980    }
981}
982
983/// Refresh `system_param_info` metrics by reading current system parameters.
984pub async fn refresh_system_param_info_metrics(
985    system_params_controller: &SystemParamsControllerRef,
986    meta_metrics: Arc<MetaMetrics>,
987) {
988    let params_info = system_params_controller.get_params().await.get_all();
989
990    meta_metrics.system_param_info.reset();
991    for info in params_info {
992        meta_metrics
993            .system_param_info
994            .with_label_values(&[info.name, &info.value])
995            .set(1);
996    }
997}
998
999pub fn start_worker_info_monitor(
1000    metadata_manager: MetadataManager,
1001    election_client: ElectionClientRef,
1002    interval: Duration,
1003    meta_metrics: Arc<MetaMetrics>,
1004) -> (JoinHandle<()>, Sender<()>) {
1005    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1006    let join_handle = tokio::spawn(async move {
1007        let mut monitor_interval = tokio::time::interval(interval);
1008        monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1009        loop {
1010            tokio::select! {
1011                // Wait for interval
1012                _ = monitor_interval.tick() => {},
1013                // Shutdown monitor
1014                _ = &mut shutdown_rx => {
1015                    tracing::info!("Worker number monitor is stopped");
1016                    return;
1017                }
1018            }
1019
1020            let node_map = match metadata_manager.count_worker_node().await {
1021                Ok(node_map) => node_map,
1022                Err(err) => {
1023                    tracing::warn!(error = %err.as_report(), "fail to count worker node");
1024                    continue;
1025                }
1026            };
1027
1028            // Reset metrics to clean the stale labels e.g. invalid lease ids
1029            meta_metrics.worker_num.reset();
1030            meta_metrics.meta_type.reset();
1031
1032            for (worker_type, worker_num) in node_map {
1033                meta_metrics
1034                    .worker_num
1035                    .with_label_values(&[(worker_type.as_str_name())])
1036                    .set(worker_num as i64);
1037            }
1038            if let Ok(meta_members) = election_client.get_members().await {
1039                meta_metrics
1040                    .worker_num
1041                    .with_label_values(&[WorkerType::Meta.as_str_name()])
1042                    .set(meta_members.len() as i64);
1043                meta_members.into_iter().for_each(|m| {
1044                    let role = if m.is_leader { "leader" } else { "follower" };
1045                    meta_metrics
1046                        .meta_type
1047                        .with_label_values(&[m.id.as_str(), role])
1048                        .set(1);
1049                });
1050            }
1051        }
1052    });
1053
1054    (join_handle, shutdown_tx)
1055}
1056
1057pub async fn refresh_fragment_info_metrics(
1058    catalog_controller: &CatalogControllerRef,
1059    cluster_controller: &ClusterControllerRef,
1060    hummock_manager: &HummockManagerRef,
1061    meta_metrics: Arc<MetaMetrics>,
1062) {
1063    let worker_nodes = match cluster_controller
1064        .list_workers(Some(WorkerType::ComputeNode.into()), None)
1065        .await
1066    {
1067        Ok(worker_nodes) => worker_nodes,
1068        Err(err) => {
1069            tracing::warn!(error=%err.as_report(), "fail to list worker node");
1070            return;
1071        }
1072    };
1073    let actor_locations = match catalog_controller.list_actor_locations() {
1074        Ok(actor_locations) => actor_locations,
1075        Err(err) => {
1076            tracing::warn!(error=%err.as_report(), "fail to get actor locations");
1077            return;
1078        }
1079    };
1080    let sink_actor_mapping = match catalog_controller.list_sink_actor_mapping().await {
1081        Ok(sink_actor_mapping) => sink_actor_mapping,
1082        Err(err) => {
1083            tracing::warn!(error=%err.as_report(), "fail to get sink actor mapping");
1084            return;
1085        }
1086    };
1087    let fragment_state_tables = match catalog_controller.list_fragment_state_tables().await {
1088        Ok(fragment_state_tables) => fragment_state_tables,
1089        Err(err) => {
1090            tracing::warn!(error=%err.as_report(), "fail to get fragment state tables");
1091            return;
1092        }
1093    };
1094    let table_name_and_type_mapping = match catalog_controller.get_table_name_type_mapping().await {
1095        Ok(mapping) => mapping,
1096        Err(err) => {
1097            tracing::warn!(error=%err.as_report(), "fail to get table name mapping");
1098            return;
1099        }
1100    };
1101
1102    let worker_addr_mapping: HashMap<WorkerId, String> = worker_nodes
1103        .into_iter()
1104        .map(|worker_node| {
1105            let addr = match worker_node.host {
1106                Some(host) => format!("{}:{}", host.host, host.port),
1107                None => "".to_owned(),
1108            };
1109            (worker_node.id, addr)
1110        })
1111        .collect();
1112    let table_compaction_group_id_mapping = hummock_manager
1113        .get_table_compaction_group_id_mapping()
1114        .await;
1115
1116    // Start fresh with a reset to clear all outdated labels. This is safe since we always
1117    // report full info on each interval.
1118    meta_metrics.actor_info.reset();
1119    meta_metrics.table_info.reset();
1120    meta_metrics.sink_info.reset();
1121    for actor_location in actor_locations {
1122        let actor_id_str = actor_location.actor_id.to_string();
1123        let fragment_id_str = actor_location.fragment_id.to_string();
1124        // Report a dummy gauge metrics with (fragment id, actor id, node
1125        // address) as its label
1126        if let Some(address) = worker_addr_mapping.get(&actor_location.worker_id) {
1127            meta_metrics
1128                .actor_info
1129                .with_label_values(&[&actor_id_str, &fragment_id_str, address])
1130                .set(1);
1131        }
1132    }
1133    for (sink_id, (sink_name, actor_ids)) in sink_actor_mapping {
1134        let sink_id_str = sink_id.to_string();
1135        for actor_id in actor_ids {
1136            let actor_id_str = actor_id.to_string();
1137            meta_metrics
1138                .sink_info
1139                .with_label_values(&[&actor_id_str, &sink_id_str, &sink_name])
1140                .set(1);
1141        }
1142    }
1143    for PartialFragmentStateTables {
1144        fragment_id,
1145        job_id,
1146        state_table_ids,
1147    } in fragment_state_tables
1148    {
1149        let fragment_id_str = fragment_id.to_string();
1150        let job_id_str = job_id.to_string();
1151        for table_id in state_table_ids.into_inner() {
1152            let table_id_str = table_id.to_string();
1153            let (table_name, table_type) = table_name_and_type_mapping
1154                .get(&table_id)
1155                .cloned()
1156                .unwrap_or_else(|| ("unknown".to_owned(), "unknown".to_owned()));
1157            let compaction_group_id = table_compaction_group_id_mapping
1158                .get(&table_id)
1159                .map(|cg_id| cg_id.to_string())
1160                .unwrap_or_else(|| "unknown".to_owned());
1161            meta_metrics
1162                .table_info
1163                .with_label_values(&[
1164                    &job_id_str,
1165                    &table_id_str,
1166                    &fragment_id_str,
1167                    &table_name,
1168                    &table_type,
1169                    &compaction_group_id,
1170                ])
1171                .set(1);
1172        }
1173    }
1174}
1175
1176pub async fn refresh_relation_info_metrics(
1177    catalog_controller: &CatalogControllerRef,
1178    meta_metrics: Arc<MetaMetrics>,
1179) {
1180    let table_objects = match catalog_controller.list_table_objects().await {
1181        Ok(table_objects) => table_objects,
1182        Err(err) => {
1183            tracing::warn!(error=%err.as_report(), "fail to get table objects");
1184            return;
1185        }
1186    };
1187
1188    let source_objects = match catalog_controller.list_source_objects().await {
1189        Ok(source_objects) => source_objects,
1190        Err(err) => {
1191            tracing::warn!(error=%err.as_report(), "fail to get source objects");
1192            return;
1193        }
1194    };
1195
1196    let sink_objects = match catalog_controller.list_sink_objects().await {
1197        Ok(sink_objects) => sink_objects,
1198        Err(err) => {
1199            tracing::warn!(error=%err.as_report(), "fail to get sink objects");
1200            return;
1201        }
1202    };
1203
1204    meta_metrics.relation_info.reset();
1205
1206    for (id, db, schema, name, resource_group) in table_objects {
1207        meta_metrics
1208            .relation_info
1209            .with_label_values(&[
1210                &id.to_string(),
1211                &db,
1212                &schema,
1213                &name,
1214                &resource_group,
1215                &"table".to_owned(),
1216            ])
1217            .set(1);
1218    }
1219
1220    for (id, db, schema, name, resource_group) in source_objects {
1221        meta_metrics
1222            .relation_info
1223            .with_label_values(&[
1224                &id.to_string(),
1225                &db,
1226                &schema,
1227                &name,
1228                &resource_group,
1229                &"source".to_owned(),
1230            ])
1231            .set(1);
1232    }
1233
1234    for (id, db, schema, name, resource_group) in sink_objects {
1235        meta_metrics
1236            .relation_info
1237            .with_label_values(&[
1238                &id.to_string(),
1239                &db,
1240                &schema,
1241                &name,
1242                &resource_group,
1243                &"sink".to_owned(),
1244            ])
1245            .set(1);
1246    }
1247}
1248
1249pub fn start_info_monitor(
1250    metadata_manager: MetadataManager,
1251    hummock_manager: HummockManagerRef,
1252    system_params_controller: SystemParamsControllerRef,
1253    meta_metrics: Arc<MetaMetrics>,
1254) -> (JoinHandle<()>, Sender<()>) {
1255    const COLLECT_INTERVAL_SECONDS: u64 = 60;
1256
1257    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1258    let join_handle = tokio::spawn(async move {
1259        let mut monitor_interval =
1260            tokio::time::interval(Duration::from_secs(COLLECT_INTERVAL_SECONDS));
1261        monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1262        loop {
1263            tokio::select! {
1264                // Wait for interval
1265                _ = monitor_interval.tick() => {},
1266                // Shutdown monitor
1267                _ = &mut shutdown_rx => {
1268                    tracing::info!("Meta info monitor is stopped");
1269                    return;
1270                }
1271            }
1272
1273            // Fragment and relation info
1274            refresh_fragment_info_metrics(
1275                &metadata_manager.catalog_controller,
1276                &metadata_manager.cluster_controller,
1277                &hummock_manager,
1278                meta_metrics.clone(),
1279            )
1280            .await;
1281
1282            refresh_relation_info_metrics(
1283                &metadata_manager.catalog_controller,
1284                meta_metrics.clone(),
1285            )
1286            .await;
1287
1288            // System parameter info
1289            refresh_system_param_info_metrics(&system_params_controller, meta_metrics.clone())
1290                .await;
1291        }
1292    });
1293
1294    (join_handle, shutdown_tx)
1295}