risingwave_meta/rpc/
metrics.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::atomic::AtomicU64;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use prometheus::core::{AtomicF64, GenericGaugeVec};
21use prometheus::{
22    GaugeVec, Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
23    exponential_buckets, histogram_opts, register_gauge_vec_with_registry,
24    register_histogram_vec_with_registry, register_histogram_with_registry,
25    register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
26    register_int_gauge_with_registry,
27};
28use risingwave_common::metrics::{
29    LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
30};
31use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
32use risingwave_common::{
33    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
34    register_guarded_int_gauge_vec_with_registry,
35};
36use risingwave_connector::source::monitor::EnumeratorMetrics as SourceEnumeratorMetrics;
37use risingwave_meta_model::WorkerId;
38use risingwave_object_store::object::object_metrics::{
39    GLOBAL_OBJECT_STORE_METRICS, ObjectStoreMetrics,
40};
41use risingwave_pb::common::WorkerType;
42use thiserror_ext::AsReport;
43use tokio::sync::oneshot::Sender;
44use tokio::task::JoinHandle;
45
46use crate::controller::catalog::CatalogControllerRef;
47use crate::controller::cluster::ClusterControllerRef;
48use crate::controller::utils::PartialFragmentStateTables;
49use crate::hummock::HummockManagerRef;
50use crate::manager::MetadataManager;
51use crate::rpc::ElectionClientRef;
52
53#[derive(Clone)]
54pub struct MetaMetrics {
55    // ********************************** Meta ************************************
56    /// The number of workers in the cluster.
57    pub worker_num: IntGaugeVec,
58    /// The roles of all meta nodes in the cluster.
59    pub meta_type: IntGaugeVec,
60
61    // ********************************** gRPC ************************************
62    /// gRPC latency of meta services
63    pub grpc_latency: HistogramVec,
64
65    // ********************************** Barrier ************************************
66    /// The duration from barrier injection to commit
67    /// It is the sum of inflight-latency, sync-latency and wait-commit-latency
68    pub barrier_latency: LabelGuardedHistogramVec,
69    /// The duration from barrier complete to commit
70    pub barrier_wait_commit_latency: Histogram,
71    /// Latency between each barrier send
72    pub barrier_send_latency: LabelGuardedHistogramVec,
73    /// The number of all barriers. It is the sum of barriers that are in-flight or completed but
74    /// waiting for other barriers
75    pub all_barrier_nums: LabelGuardedIntGaugeVec,
76    /// The number of in-flight barriers
77    pub in_flight_barrier_nums: LabelGuardedIntGaugeVec,
78    /// The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.
79    pub last_committed_barrier_time: IntGaugeVec,
80    /// The barrier interval of each database
81    pub barrier_interval_by_database: GaugeVec,
82
83    // ********************************** Snapshot Backfill ***************************
84    /// The barrier latency in second of `table_id` and snapshto backfill `barrier_type`
85    pub snapshot_backfill_barrier_latency: LabelGuardedHistogramVec, // (table_id, barrier_type)
86    /// The latency of commit epoch of `table_id`
87    pub snapshot_backfill_wait_commit_latency: LabelGuardedHistogramVec, // (table_id, )
88    /// The lags between the upstream epoch and the downstream epoch.
89    pub snapshot_backfill_lag: LabelGuardedIntGaugeVec, // (table_id, )
90    /// The number of inflight barriers of `table_id`
91    pub snapshot_backfill_inflight_barrier_num: LabelGuardedIntGaugeVec, // (table_id, _)
92
93    // ********************************** Recovery ************************************
94    pub recovery_failure_cnt: IntCounterVec,
95    pub recovery_latency: HistogramVec,
96
97    // ********************************** Hummock ************************************
98    /// Max committed epoch
99    pub max_committed_epoch: IntGauge,
100    /// Min committed epoch
101    pub min_committed_epoch: IntGauge,
102    /// The number of SSTs in each level
103    pub level_sst_num: IntGaugeVec,
104    /// The number of SSTs to be merged to next level in each level
105    pub level_compact_cnt: IntGaugeVec,
106    /// The number of compact tasks
107    pub compact_frequency: IntCounterVec,
108    /// Size of each level
109    pub level_file_size: IntGaugeVec,
110    /// Hummock version size
111    pub version_size: IntGauge,
112    /// The version Id of current version.
113    pub current_version_id: IntGauge,
114    /// The version id of checkpoint version.
115    pub checkpoint_version_id: IntGauge,
116    /// The smallest version id that is being pinned by worker nodes.
117    pub min_pinned_version_id: IntGauge,
118    /// The smallest version id that is being guarded by meta node safe points.
119    pub min_safepoint_version_id: IntGauge,
120    /// Compaction groups that is in write stop state.
121    pub write_stop_compaction_groups: IntGaugeVec,
122    /// The number of attempts to trigger full GC.
123    pub full_gc_trigger_count: IntGauge,
124    /// The number of candidate object to delete after scanning object store.
125    pub full_gc_candidate_object_count: Histogram,
126    /// The number of object to delete after filtering by meta node.
127    pub full_gc_selected_object_count: Histogram,
128    /// Hummock version stats
129    pub version_stats: IntGaugeVec,
130    /// Hummock version stats
131    pub materialized_view_stats: IntGaugeVec,
132    /// Total number of objects that is no longer referenced by versions.
133    pub stale_object_count: IntGauge,
134    /// Total size of objects that is no longer referenced by versions.
135    pub stale_object_size: IntGauge,
136    /// Total number of objects that is still referenced by non-current versions.
137    pub old_version_object_count: IntGauge,
138    /// Total size of objects that is still referenced by non-current versions.
139    pub old_version_object_size: IntGauge,
140    /// Total number of objects that is referenced by time travel.
141    pub time_travel_object_count: IntGauge,
142    /// Total number of objects that is referenced by current version.
143    pub current_version_object_count: IntGauge,
144    /// Total size of objects that is referenced by current version.
145    pub current_version_object_size: IntGauge,
146    /// Total number of objects that includes dangling objects.
147    pub total_object_count: IntGauge,
148    /// Total size of objects that includes dangling objects.
149    pub total_object_size: IntGauge,
150    /// Number of objects per table change log.
151    pub table_change_log_object_count: IntGaugeVec,
152    /// Size of objects per table change log.
153    pub table_change_log_object_size: IntGaugeVec,
154    /// The number of hummock version delta log.
155    pub delta_log_count: IntGauge,
156    /// latency of version checkpoint
157    pub version_checkpoint_latency: Histogram,
158    /// Latency for hummock manager to acquire lock
159    pub hummock_manager_lock_time: HistogramVec,
160    /// Latency for hummock manager to really process a request after acquire the lock
161    pub hummock_manager_real_process_time: HistogramVec,
162    /// The number of compactions from one level to another level that have been skipped
163    pub compact_skip_frequency: IntCounterVec,
164    /// Bytes of lsm tree needed to reach balance
165    pub compact_pending_bytes: IntGaugeVec,
166    /// Per level compression ratio
167    pub compact_level_compression_ratio: GenericGaugeVec<AtomicF64>,
168    /// Per level number of running compaction task
169    pub level_compact_task_cnt: IntGaugeVec,
170    pub time_after_last_observation: Arc<AtomicU64>,
171    pub l0_compact_level_count: HistogramVec,
172    pub compact_task_size: HistogramVec,
173    pub compact_task_file_count: HistogramVec,
174    pub compact_task_batch_count: HistogramVec,
175    pub split_compaction_group_count: IntCounterVec,
176    pub state_table_count: IntGaugeVec,
177    pub branched_sst_count: IntGaugeVec,
178    pub compact_task_trivial_move_sst_count: HistogramVec,
179
180    pub compaction_event_consumed_latency: Histogram,
181    pub compaction_event_loop_iteration_latency: Histogram,
182
183    // ********************************** Object Store ************************************
184    // Object store related metrics (for backup/restore and version checkpoint)
185    pub object_store_metric: Arc<ObjectStoreMetrics>,
186
187    // ********************************** Source ************************************
188    /// supervisor for which source is still up.
189    pub source_is_up: LabelGuardedIntGaugeVec,
190    pub source_enumerator_metrics: Arc<SourceEnumeratorMetrics>,
191
192    // ********************************** Fragment ************************************
193    /// A dummy gauge metrics with its label to be the mapping from actor id to fragment id
194    pub actor_info: IntGaugeVec,
195    /// A dummy gauge metrics with its label to be the mapping from table id to actor id
196    pub table_info: IntGaugeVec,
197    /// A dummy gauge metrics with its label to be the mapping from actor id to sink id
198    pub sink_info: IntGaugeVec,
199    /// A dummy gauge metrics with its label to be relation info
200    pub relation_info: IntGaugeVec,
201
202    /// Write throughput of commit epoch for each stable
203    pub table_write_throughput: IntCounterVec,
204
205    /// The number of compaction groups that have been triggered to move
206    pub merge_compaction_group_count: IntCounterVec,
207
208    // ********************************** Auto Schema Change ************************************
209    pub auto_schema_change_failure_cnt: LabelGuardedIntCounterVec,
210    pub auto_schema_change_success_cnt: LabelGuardedIntCounterVec,
211    pub auto_schema_change_latency: LabelGuardedHistogramVec,
212
213    pub time_travel_version_replay_latency: Histogram,
214
215    pub compaction_group_count: IntGauge,
216    pub compaction_group_size: IntGaugeVec,
217    pub compaction_group_file_count: IntGaugeVec,
218    pub compaction_group_throughput: IntGaugeVec,
219}
220
221pub static GLOBAL_META_METRICS: LazyLock<MetaMetrics> =
222    LazyLock::new(|| MetaMetrics::new(&GLOBAL_METRICS_REGISTRY));
223
224impl MetaMetrics {
225    fn new(registry: &Registry) -> Self {
226        let opts = histogram_opts!(
227            "meta_grpc_duration_seconds",
228            "gRPC latency of meta services",
229            exponential_buckets(0.0001, 2.0, 20).unwrap() // max 52s
230        );
231        let grpc_latency =
232            register_histogram_vec_with_registry!(opts, &["path"], registry).unwrap();
233
234        let opts = histogram_opts!(
235            "meta_barrier_duration_seconds",
236            "barrier latency",
237            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
238        );
239        let barrier_latency =
240            register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
241                .unwrap();
242
243        let opts = histogram_opts!(
244            "meta_barrier_wait_commit_duration_seconds",
245            "barrier_wait_commit_latency",
246            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
247        );
248        let barrier_wait_commit_latency =
249            register_histogram_with_registry!(opts, registry).unwrap();
250
251        let opts = histogram_opts!(
252            "meta_barrier_send_duration_seconds",
253            "barrier send latency",
254            exponential_buckets(0.1, 1.5, 19).unwrap() // max 148s
255        );
256        let barrier_send_latency =
257            register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
258                .unwrap();
259        let barrier_interval_by_database = register_gauge_vec_with_registry!(
260            "meta_barrier_interval_by_database",
261            "barrier interval of each database",
262            &["database_id"],
263            registry
264        )
265        .unwrap();
266
267        let all_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
268            "all_barrier_nums",
269            "num of of all_barrier",
270            &["database_id"],
271            registry
272        )
273        .unwrap();
274        let in_flight_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
275            "in_flight_barrier_nums",
276            "num of of in_flight_barrier",
277            &["database_id"],
278            registry
279        )
280        .unwrap();
281        let last_committed_barrier_time = register_int_gauge_vec_with_registry!(
282            "last_committed_barrier_time",
283            "The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.",
284            &["database_id"],
285            registry
286        )
287        .unwrap();
288
289        // snapshot backfill metrics
290        let opts = histogram_opts!(
291            "meta_snapshot_backfill_barrier_duration_seconds",
292            "snapshot backfill barrier latency",
293            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
294        );
295        let snapshot_backfill_barrier_latency = register_guarded_histogram_vec_with_registry!(
296            opts,
297            &["table_id", "barrier_type"],
298            registry
299        )
300        .unwrap();
301        let opts = histogram_opts!(
302            "meta_snapshot_backfill_barrier_wait_commit_duration_seconds",
303            "snapshot backfill barrier_wait_commit_latency",
304            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
305        );
306        let snapshot_backfill_wait_commit_latency =
307            register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
308
309        let snapshot_backfill_lag = register_guarded_int_gauge_vec_with_registry!(
310            "meta_snapshot_backfill_upstream_lag",
311            "snapshot backfill upstream_lag",
312            &["table_id"],
313            registry
314        )
315        .unwrap();
316        let snapshot_backfill_inflight_barrier_num = register_guarded_int_gauge_vec_with_registry!(
317            "meta_snapshot_backfill_inflight_barrier_num",
318            "snapshot backfill inflight_barrier_num",
319            &["table_id"],
320            registry
321        )
322        .unwrap();
323
324        let max_committed_epoch = register_int_gauge_with_registry!(
325            "storage_max_committed_epoch",
326            "max committed epoch",
327            registry
328        )
329        .unwrap();
330
331        let min_committed_epoch = register_int_gauge_with_registry!(
332            "storage_min_committed_epoch",
333            "min committed epoch",
334            registry
335        )
336        .unwrap();
337
338        let level_sst_num = register_int_gauge_vec_with_registry!(
339            "storage_level_sst_num",
340            "num of SSTs in each level",
341            &["level_index"],
342            registry
343        )
344        .unwrap();
345
346        let level_compact_cnt = register_int_gauge_vec_with_registry!(
347            "storage_level_compact_cnt",
348            "num of SSTs to be merged to next level in each level",
349            &["level_index"],
350            registry
351        )
352        .unwrap();
353
354        let compact_frequency = register_int_counter_vec_with_registry!(
355            "storage_level_compact_frequency",
356            "The number of compactions from one level to another level that have completed or failed.",
357            &["compactor", "group", "task_type", "result"],
358            registry
359        )
360        .unwrap();
361        let compact_skip_frequency = register_int_counter_vec_with_registry!(
362            "storage_skip_compact_frequency",
363            "The number of compactions from one level to another level that have been skipped.",
364            &["level", "type"],
365            registry
366        )
367        .unwrap();
368
369        let version_size =
370            register_int_gauge_with_registry!("storage_version_size", "version size", registry)
371                .unwrap();
372
373        let current_version_id = register_int_gauge_with_registry!(
374            "storage_current_version_id",
375            "current version id",
376            registry
377        )
378        .unwrap();
379
380        let checkpoint_version_id = register_int_gauge_with_registry!(
381            "storage_checkpoint_version_id",
382            "checkpoint version id",
383            registry
384        )
385        .unwrap();
386
387        let min_pinned_version_id = register_int_gauge_with_registry!(
388            "storage_min_pinned_version_id",
389            "min pinned version id",
390            registry
391        )
392        .unwrap();
393
394        let write_stop_compaction_groups = register_int_gauge_vec_with_registry!(
395            "storage_write_stop_compaction_groups",
396            "compaction groups of write stop state",
397            &["compaction_group_id"],
398            registry
399        )
400        .unwrap();
401
402        let full_gc_trigger_count = register_int_gauge_with_registry!(
403            "storage_full_gc_trigger_count",
404            "the number of attempts to trigger full GC",
405            registry
406        )
407        .unwrap();
408
409        let opts = histogram_opts!(
410            "storage_full_gc_candidate_object_count",
411            "the number of candidate object to delete after scanning object store",
412            exponential_buckets(1.0, 10.0, 6).unwrap()
413        );
414        let full_gc_candidate_object_count =
415            register_histogram_with_registry!(opts, registry).unwrap();
416
417        let opts = histogram_opts!(
418            "storage_full_gc_selected_object_count",
419            "the number of object to delete after filtering by meta node",
420            exponential_buckets(1.0, 10.0, 6).unwrap()
421        );
422        let full_gc_selected_object_count =
423            register_histogram_with_registry!(opts, registry).unwrap();
424
425        let min_safepoint_version_id = register_int_gauge_with_registry!(
426            "storage_min_safepoint_version_id",
427            "min safepoint version id",
428            registry
429        )
430        .unwrap();
431
432        let level_file_size = register_int_gauge_vec_with_registry!(
433            "storage_level_total_file_size",
434            "KBs total file bytes in each level",
435            &["level_index"],
436            registry
437        )
438        .unwrap();
439
440        let version_stats = register_int_gauge_vec_with_registry!(
441            "storage_version_stats",
442            "per table stats in current hummock version",
443            &["table_id", "metric"],
444            registry
445        )
446        .unwrap();
447
448        let materialized_view_stats = register_int_gauge_vec_with_registry!(
449            "storage_materialized_view_stats",
450            "per materialized view stats in current hummock version",
451            &["table_id", "metric"],
452            registry
453        )
454        .unwrap();
455
456        let stale_object_count = register_int_gauge_with_registry!(
457            "storage_stale_object_count",
458            "total number of objects that is no longer referenced by versions.",
459            registry
460        )
461        .unwrap();
462
463        let stale_object_size = register_int_gauge_with_registry!(
464            "storage_stale_object_size",
465            "total size of objects that is no longer referenced by versions.",
466            registry
467        )
468        .unwrap();
469
470        let old_version_object_count = register_int_gauge_with_registry!(
471            "storage_old_version_object_count",
472            "total number of objects that is still referenced by non-current versions",
473            registry
474        )
475        .unwrap();
476
477        let old_version_object_size = register_int_gauge_with_registry!(
478            "storage_old_version_object_size",
479            "total size of objects that is still referenced by non-current versions",
480            registry
481        )
482        .unwrap();
483
484        let current_version_object_count = register_int_gauge_with_registry!(
485            "storage_current_version_object_count",
486            "total number of objects that is referenced by current version",
487            registry
488        )
489        .unwrap();
490
491        let current_version_object_size = register_int_gauge_with_registry!(
492            "storage_current_version_object_size",
493            "total size of objects that is referenced by current version",
494            registry
495        )
496        .unwrap();
497
498        let total_object_count = register_int_gauge_with_registry!(
499            "storage_total_object_count",
500            "Total number of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
501            registry
502        ).unwrap();
503
504        let total_object_size = register_int_gauge_with_registry!(
505            "storage_total_object_size",
506            "Total size of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
507            registry
508        ).unwrap();
509
510        let table_change_log_object_count = register_int_gauge_vec_with_registry!(
511            "storage_table_change_log_object_count",
512            "per table change log object count",
513            &["table_id"],
514            registry
515        )
516        .unwrap();
517
518        let table_change_log_object_size = register_int_gauge_vec_with_registry!(
519            "storage_table_change_log_object_size",
520            "per table change log object size",
521            &["table_id"],
522            registry
523        )
524        .unwrap();
525
526        let time_travel_object_count = register_int_gauge_with_registry!(
527            "storage_time_travel_object_count",
528            "total number of objects that is referenced by time travel.",
529            registry
530        )
531        .unwrap();
532
533        let delta_log_count = register_int_gauge_with_registry!(
534            "storage_delta_log_count",
535            "total number of hummock version delta log",
536            registry
537        )
538        .unwrap();
539
540        let opts = histogram_opts!(
541            "storage_version_checkpoint_latency",
542            "hummock version checkpoint latency",
543            exponential_buckets(0.1, 1.5, 20).unwrap()
544        );
545        let version_checkpoint_latency = register_histogram_with_registry!(opts, registry).unwrap();
546
547        let hummock_manager_lock_time = register_histogram_vec_with_registry!(
548            "hummock_manager_lock_time",
549            "latency for hummock manager to acquire the rwlock",
550            &["lock_name", "lock_type"],
551            registry
552        )
553        .unwrap();
554
555        let hummock_manager_real_process_time = register_histogram_vec_with_registry!(
556            "meta_hummock_manager_real_process_time",
557            "latency for hummock manager to really process the request",
558            &["method"],
559            registry
560        )
561        .unwrap();
562
563        let worker_num = register_int_gauge_vec_with_registry!(
564            "worker_num",
565            "number of nodes in the cluster",
566            &["worker_type"],
567            registry,
568        )
569        .unwrap();
570
571        let meta_type = register_int_gauge_vec_with_registry!(
572            "meta_num",
573            "role of meta nodes in the cluster",
574            &["worker_addr", "role"],
575            registry,
576        )
577        .unwrap();
578
579        let compact_pending_bytes = register_int_gauge_vec_with_registry!(
580            "storage_compact_pending_bytes",
581            "bytes of lsm tree needed to reach balance",
582            &["group"],
583            registry
584        )
585        .unwrap();
586
587        let compact_level_compression_ratio = register_gauge_vec_with_registry!(
588            "storage_compact_level_compression_ratio",
589            "compression ratio of each level of the lsm tree",
590            &["group", "level", "algorithm"],
591            registry
592        )
593        .unwrap();
594
595        let level_compact_task_cnt = register_int_gauge_vec_with_registry!(
596            "storage_level_compact_task_cnt",
597            "num of compact_task organized by group and level",
598            &["task"],
599            registry
600        )
601        .unwrap();
602        let object_store_metric = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
603
604        let recovery_failure_cnt = register_int_counter_vec_with_registry!(
605            "recovery_failure_cnt",
606            "Number of failed recovery attempts",
607            &["recovery_type"],
608            registry
609        )
610        .unwrap();
611        let opts = histogram_opts!(
612            "recovery_latency",
613            "Latency of the recovery process",
614            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
615        );
616        let recovery_latency =
617            register_histogram_vec_with_registry!(opts, &["recovery_type"], registry).unwrap();
618
619        let auto_schema_change_failure_cnt = register_guarded_int_counter_vec_with_registry!(
620            "auto_schema_change_failure_cnt",
621            "Number of failed auto schema change",
622            &["table_id", "table_name"],
623            registry
624        )
625        .unwrap();
626
627        let auto_schema_change_success_cnt = register_guarded_int_counter_vec_with_registry!(
628            "auto_schema_change_success_cnt",
629            "Number of success auto schema change",
630            &["table_id", "table_name"],
631            registry
632        )
633        .unwrap();
634
635        let opts = histogram_opts!(
636            "auto_schema_change_latency",
637            "Latency of the auto schema change process",
638            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
639        );
640        let auto_schema_change_latency = register_guarded_histogram_vec_with_registry!(
641            opts,
642            &["table_id", "table_name"],
643            registry
644        )
645        .unwrap();
646
647        let source_is_up = register_guarded_int_gauge_vec_with_registry!(
648            "source_status_is_up",
649            "source is up or not",
650            &["source_id", "source_name"],
651            registry
652        )
653        .unwrap();
654        let source_enumerator_metrics = Arc::new(SourceEnumeratorMetrics::default());
655
656        let actor_info = register_int_gauge_vec_with_registry!(
657            "actor_info",
658            "Mapping from actor id to (fragment id, compute node)",
659            &["actor_id", "fragment_id", "compute_node"],
660            registry
661        )
662        .unwrap();
663
664        let table_info = register_int_gauge_vec_with_registry!(
665            "table_info",
666            "Mapping from table id to (actor id, table name)",
667            &[
668                "materialized_view_id",
669                "table_id",
670                "fragment_id",
671                "table_name",
672                "table_type",
673                "compaction_group_id"
674            ],
675            registry
676        )
677        .unwrap();
678
679        let sink_info = register_int_gauge_vec_with_registry!(
680            "sink_info",
681            "Mapping from actor id to (actor id, sink name)",
682            &["actor_id", "sink_id", "sink_name",],
683            registry
684        )
685        .unwrap();
686
687        let relation_info = register_int_gauge_vec_with_registry!(
688            "relation_info",
689            "Information of the database relation (table/source/sink)",
690            &["id", "database", "schema", "name", "resource_group", "type"],
691            registry
692        )
693        .unwrap();
694
695        let l0_compact_level_count = register_histogram_vec_with_registry!(
696            "storage_l0_compact_level_count",
697            "level_count of l0 compact task",
698            &["group", "type"],
699            registry
700        )
701        .unwrap();
702
703        let opts = histogram_opts!(
704            "storage_compact_task_size",
705            "Total size of compact that have been issued to state store",
706            exponential_buckets(1048576.0, 2.0, 16).unwrap()
707        );
708
709        let compact_task_size =
710            register_histogram_vec_with_registry!(opts, &["group", "type"], registry).unwrap();
711
712        let compact_task_file_count = register_histogram_vec_with_registry!(
713            "storage_compact_task_file_count",
714            "file count of compact task",
715            &["group", "type"],
716            registry
717        )
718        .unwrap();
719        let opts = histogram_opts!(
720            "storage_compact_task_batch_count",
721            "count of compact task batch",
722            exponential_buckets(1.0, 2.0, 8).unwrap()
723        );
724        let compact_task_batch_count =
725            register_histogram_vec_with_registry!(opts, &["type"], registry).unwrap();
726
727        let table_write_throughput = register_int_counter_vec_with_registry!(
728            "storage_commit_write_throughput",
729            "The number of compactions from one level to another level that have been skipped.",
730            &["table_id"],
731            registry
732        )
733        .unwrap();
734
735        let split_compaction_group_count = register_int_counter_vec_with_registry!(
736            "storage_split_compaction_group_count",
737            "Count of trigger split compaction group",
738            &["group"],
739            registry
740        )
741        .unwrap();
742
743        let state_table_count = register_int_gauge_vec_with_registry!(
744            "storage_state_table_count",
745            "Count of stable table per compaction group",
746            &["group"],
747            registry
748        )
749        .unwrap();
750
751        let branched_sst_count = register_int_gauge_vec_with_registry!(
752            "storage_branched_sst_count",
753            "Count of branched sst per compaction group",
754            &["group"],
755            registry
756        )
757        .unwrap();
758
759        let opts = histogram_opts!(
760            "storage_compaction_event_consumed_latency",
761            "The latency(ms) of each event being consumed",
762            exponential_buckets(1.0, 1.5, 30).unwrap() // max 191s
763        );
764        let compaction_event_consumed_latency =
765            register_histogram_with_registry!(opts, registry).unwrap();
766
767        let opts = histogram_opts!(
768            "storage_compaction_event_loop_iteration_latency",
769            "The latency(ms) of each iteration of the compaction event loop",
770            exponential_buckets(1.0, 1.5, 30).unwrap() // max 191s
771        );
772        let compaction_event_loop_iteration_latency =
773            register_histogram_with_registry!(opts, registry).unwrap();
774
775        let merge_compaction_group_count = register_int_counter_vec_with_registry!(
776            "storage_merge_compaction_group_count",
777            "Count of trigger merge compaction group",
778            &["group"],
779            registry
780        )
781        .unwrap();
782
783        let opts = histogram_opts!(
784            "storage_time_travel_version_replay_latency",
785            "The latency(ms) of replaying a hummock version for time travel",
786            exponential_buckets(0.01, 10.0, 6).unwrap()
787        );
788        let time_travel_version_replay_latency =
789            register_histogram_with_registry!(opts, registry).unwrap();
790
791        let compaction_group_count = register_int_gauge_with_registry!(
792            "storage_compaction_group_count",
793            "The number of compaction groups",
794            registry,
795        )
796        .unwrap();
797
798        let compaction_group_size = register_int_gauge_vec_with_registry!(
799            "storage_compaction_group_size",
800            "The size of compaction group",
801            &["group"],
802            registry
803        )
804        .unwrap();
805
806        let compaction_group_file_count = register_int_gauge_vec_with_registry!(
807            "storage_compaction_group_file_count",
808            "The file count of compaction group",
809            &["group"],
810            registry
811        )
812        .unwrap();
813
814        let compaction_group_throughput = register_int_gauge_vec_with_registry!(
815            "storage_compaction_group_throughput",
816            "The throughput of compaction group",
817            &["group"],
818            registry
819        )
820        .unwrap();
821
822        let opts = histogram_opts!(
823            "storage_compact_task_trivial_move_sst_count",
824            "sst count of compact trivial-move task",
825            exponential_buckets(1.0, 2.0, 8).unwrap()
826        );
827        let compact_task_trivial_move_sst_count =
828            register_histogram_vec_with_registry!(opts, &["group"], registry).unwrap();
829
830        Self {
831            grpc_latency,
832            barrier_latency,
833            barrier_wait_commit_latency,
834            barrier_send_latency,
835            all_barrier_nums,
836            in_flight_barrier_nums,
837            last_committed_barrier_time,
838            barrier_interval_by_database,
839            snapshot_backfill_barrier_latency,
840            snapshot_backfill_wait_commit_latency,
841            snapshot_backfill_lag,
842            snapshot_backfill_inflight_barrier_num,
843            recovery_failure_cnt,
844            recovery_latency,
845
846            max_committed_epoch,
847            min_committed_epoch,
848            level_sst_num,
849            level_compact_cnt,
850            compact_frequency,
851            compact_skip_frequency,
852            level_file_size,
853            version_size,
854            version_stats,
855            materialized_view_stats,
856            stale_object_count,
857            stale_object_size,
858            old_version_object_count,
859            old_version_object_size,
860            time_travel_object_count,
861            current_version_object_count,
862            current_version_object_size,
863            total_object_count,
864            total_object_size,
865            table_change_log_object_count,
866            table_change_log_object_size,
867            delta_log_count,
868            version_checkpoint_latency,
869            current_version_id,
870            checkpoint_version_id,
871            min_pinned_version_id,
872            min_safepoint_version_id,
873            write_stop_compaction_groups,
874            full_gc_trigger_count,
875            full_gc_candidate_object_count,
876            full_gc_selected_object_count,
877            hummock_manager_lock_time,
878            hummock_manager_real_process_time,
879            time_after_last_observation: Arc::new(AtomicU64::new(0)),
880            worker_num,
881            meta_type,
882            compact_pending_bytes,
883            compact_level_compression_ratio,
884            level_compact_task_cnt,
885            object_store_metric,
886            source_is_up,
887            source_enumerator_metrics,
888            actor_info,
889            table_info,
890            sink_info,
891            relation_info,
892            l0_compact_level_count,
893            compact_task_size,
894            compact_task_file_count,
895            compact_task_batch_count,
896            compact_task_trivial_move_sst_count,
897            table_write_throughput,
898            split_compaction_group_count,
899            state_table_count,
900            branched_sst_count,
901            compaction_event_consumed_latency,
902            compaction_event_loop_iteration_latency,
903            auto_schema_change_failure_cnt,
904            auto_schema_change_success_cnt,
905            auto_schema_change_latency,
906            merge_compaction_group_count,
907            time_travel_version_replay_latency,
908            compaction_group_count,
909            compaction_group_size,
910            compaction_group_file_count,
911            compaction_group_throughput,
912        }
913    }
914
915    #[cfg(test)]
916    pub fn for_test(registry: &Registry) -> Self {
917        Self::new(registry)
918    }
919}
920impl Default for MetaMetrics {
921    fn default() -> Self {
922        GLOBAL_META_METRICS.clone()
923    }
924}
925
926pub fn start_worker_info_monitor(
927    metadata_manager: MetadataManager,
928    election_client: ElectionClientRef,
929    interval: Duration,
930    meta_metrics: Arc<MetaMetrics>,
931) -> (JoinHandle<()>, Sender<()>) {
932    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
933    let join_handle = tokio::spawn(async move {
934        let mut monitor_interval = tokio::time::interval(interval);
935        monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
936        loop {
937            tokio::select! {
938                // Wait for interval
939                _ = monitor_interval.tick() => {},
940                // Shutdown monitor
941                _ = &mut shutdown_rx => {
942                    tracing::info!("Worker number monitor is stopped");
943                    return;
944                }
945            }
946
947            let node_map = match metadata_manager.count_worker_node().await {
948                Ok(node_map) => node_map,
949                Err(err) => {
950                    tracing::warn!(error = %err.as_report(), "fail to count worker node");
951                    continue;
952                }
953            };
954
955            // Reset metrics to clean the stale labels e.g. invalid lease ids
956            meta_metrics.worker_num.reset();
957            meta_metrics.meta_type.reset();
958
959            for (worker_type, worker_num) in node_map {
960                meta_metrics
961                    .worker_num
962                    .with_label_values(&[(worker_type.as_str_name())])
963                    .set(worker_num as i64);
964            }
965            if let Ok(meta_members) = election_client.get_members().await {
966                meta_metrics
967                    .worker_num
968                    .with_label_values(&[WorkerType::Meta.as_str_name()])
969                    .set(meta_members.len() as i64);
970                meta_members.into_iter().for_each(|m| {
971                    let role = if m.is_leader { "leader" } else { "follower" };
972                    meta_metrics
973                        .meta_type
974                        .with_label_values(&[m.id.as_str(), role])
975                        .set(1);
976                });
977            }
978        }
979    });
980
981    (join_handle, shutdown_tx)
982}
983
984pub async fn refresh_fragment_info_metrics(
985    catalog_controller: &CatalogControllerRef,
986    cluster_controller: &ClusterControllerRef,
987    hummock_manager: &HummockManagerRef,
988    meta_metrics: Arc<MetaMetrics>,
989) {
990    let worker_nodes = match cluster_controller
991        .list_workers(Some(WorkerType::ComputeNode.into()), None)
992        .await
993    {
994        Ok(worker_nodes) => worker_nodes,
995        Err(err) => {
996            tracing::warn!(error=%err.as_report(), "fail to list worker node");
997            return;
998        }
999    };
1000    let actor_locations = match catalog_controller.list_actor_locations().await {
1001        Ok(actor_locations) => actor_locations,
1002        Err(err) => {
1003            tracing::warn!(error=%err.as_report(), "fail to get actor locations");
1004            return;
1005        }
1006    };
1007    let sink_actor_mapping = match catalog_controller.list_sink_actor_mapping().await {
1008        Ok(sink_actor_mapping) => sink_actor_mapping,
1009        Err(err) => {
1010            tracing::warn!(error=%err.as_report(), "fail to get sink actor mapping");
1011            return;
1012        }
1013    };
1014    let fragment_state_tables = match catalog_controller.list_fragment_state_tables().await {
1015        Ok(fragment_state_tables) => fragment_state_tables,
1016        Err(err) => {
1017            tracing::warn!(error=%err.as_report(), "fail to get fragment state tables");
1018            return;
1019        }
1020    };
1021    let table_name_and_type_mapping = match catalog_controller.get_table_name_type_mapping().await {
1022        Ok(mapping) => mapping,
1023        Err(err) => {
1024            tracing::warn!(error=%err.as_report(), "fail to get table name mapping");
1025            return;
1026        }
1027    };
1028
1029    let worker_addr_mapping: HashMap<WorkerId, String> = worker_nodes
1030        .into_iter()
1031        .map(|worker_node| {
1032            let addr = match worker_node.host {
1033                Some(host) => format!("{}:{}", host.host, host.port),
1034                None => "".to_owned(),
1035            };
1036            (worker_node.id as WorkerId, addr)
1037        })
1038        .collect();
1039    let table_compaction_group_id_mapping = hummock_manager
1040        .get_table_compaction_group_id_mapping()
1041        .await;
1042
1043    // Start fresh with a reset to clear all outdated labels. This is safe since we always
1044    // report full info on each interval.
1045    meta_metrics.actor_info.reset();
1046    meta_metrics.table_info.reset();
1047    meta_metrics.sink_info.reset();
1048    for actor_location in actor_locations {
1049        let actor_id_str = actor_location.actor_id.to_string();
1050        let fragment_id_str = actor_location.fragment_id.to_string();
1051        // Report a dummy gauge metrics with (fragment id, actor id, node
1052        // address) as its label
1053        if let Some(address) = worker_addr_mapping.get(&actor_location.worker_id) {
1054            meta_metrics
1055                .actor_info
1056                .with_label_values(&[&actor_id_str, &fragment_id_str, address])
1057                .set(1);
1058        }
1059    }
1060    for (sink_id, (sink_name, actor_ids)) in sink_actor_mapping {
1061        let sink_id_str = sink_id.to_string();
1062        for actor_id in actor_ids {
1063            let actor_id_str = actor_id.to_string();
1064            meta_metrics
1065                .sink_info
1066                .with_label_values(&[&actor_id_str, &sink_id_str, &sink_name])
1067                .set(1);
1068        }
1069    }
1070    for PartialFragmentStateTables {
1071        fragment_id,
1072        job_id,
1073        state_table_ids,
1074    } in fragment_state_tables
1075    {
1076        let fragment_id_str = fragment_id.to_string();
1077        let job_id_str = job_id.to_string();
1078        for table_id in state_table_ids.into_inner() {
1079            let table_id_str = table_id.to_string();
1080            let (table_name, table_type) = table_name_and_type_mapping
1081                .get(&table_id)
1082                .cloned()
1083                .unwrap_or_else(|| ("unknown".to_owned(), "unknown".to_owned()));
1084            let compaction_group_id = table_compaction_group_id_mapping
1085                .get(&(table_id as u32))
1086                .map(|cg_id| cg_id.to_string())
1087                .unwrap_or_else(|| "unknown".to_owned());
1088            meta_metrics
1089                .table_info
1090                .with_label_values(&[
1091                    &job_id_str,
1092                    &table_id_str,
1093                    &fragment_id_str,
1094                    &table_name,
1095                    &table_type,
1096                    &compaction_group_id,
1097                ])
1098                .set(1);
1099        }
1100    }
1101}
1102
1103pub async fn refresh_relation_info_metrics(
1104    catalog_controller: &CatalogControllerRef,
1105    meta_metrics: Arc<MetaMetrics>,
1106) {
1107    let table_objects = match catalog_controller.list_table_objects().await {
1108        Ok(table_objects) => table_objects,
1109        Err(err) => {
1110            tracing::warn!(error=%err.as_report(), "fail to get table objects");
1111            return;
1112        }
1113    };
1114
1115    let source_objects = match catalog_controller.list_source_objects().await {
1116        Ok(source_objects) => source_objects,
1117        Err(err) => {
1118            tracing::warn!(error=%err.as_report(), "fail to get source objects");
1119            return;
1120        }
1121    };
1122
1123    let sink_objects = match catalog_controller.list_sink_objects().await {
1124        Ok(sink_objects) => sink_objects,
1125        Err(err) => {
1126            tracing::warn!(error=%err.as_report(), "fail to get sink objects");
1127            return;
1128        }
1129    };
1130
1131    meta_metrics.relation_info.reset();
1132
1133    for (id, db, schema, name, resource_group) in table_objects {
1134        meta_metrics
1135            .relation_info
1136            .with_label_values(&[
1137                &id.to_string(),
1138                &db,
1139                &schema,
1140                &name,
1141                &resource_group,
1142                &"table".to_owned(),
1143            ])
1144            .set(1);
1145    }
1146
1147    for (id, db, schema, name, resource_group) in source_objects {
1148        meta_metrics
1149            .relation_info
1150            .with_label_values(&[
1151                &id.to_string(),
1152                &db,
1153                &schema,
1154                &name,
1155                &resource_group,
1156                &"source".to_owned(),
1157            ])
1158            .set(1);
1159    }
1160
1161    for (id, db, schema, name, resource_group) in sink_objects {
1162        meta_metrics
1163            .relation_info
1164            .with_label_values(&[
1165                &id.to_string(),
1166                &db,
1167                &schema,
1168                &name,
1169                &resource_group,
1170                &"sink".to_owned(),
1171            ])
1172            .set(1);
1173    }
1174}
1175
1176pub fn start_fragment_info_monitor(
1177    metadata_manager: MetadataManager,
1178    hummock_manager: HummockManagerRef,
1179    meta_metrics: Arc<MetaMetrics>,
1180) -> (JoinHandle<()>, Sender<()>) {
1181    const COLLECT_INTERVAL_SECONDS: u64 = 60;
1182
1183    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1184    let join_handle = tokio::spawn(async move {
1185        let mut monitor_interval =
1186            tokio::time::interval(Duration::from_secs(COLLECT_INTERVAL_SECONDS));
1187        monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1188        loop {
1189            tokio::select! {
1190                // Wait for interval
1191                _ = monitor_interval.tick() => {},
1192                // Shutdown monitor
1193                _ = &mut shutdown_rx => {
1194                    tracing::info!("Fragment info monitor is stopped");
1195                    return;
1196                }
1197            }
1198
1199            refresh_fragment_info_metrics(
1200                &metadata_manager.catalog_controller,
1201                &metadata_manager.cluster_controller,
1202                &hummock_manager,
1203                meta_metrics.clone(),
1204            )
1205            .await;
1206
1207            refresh_relation_info_metrics(
1208                &metadata_manager.catalog_controller,
1209                meta_metrics.clone(),
1210            )
1211            .await;
1212        }
1213    });
1214
1215    (join_handle, shutdown_tx)
1216}