risingwave_meta/rpc/
metrics.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::atomic::AtomicU64;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use prometheus::core::{AtomicF64, GenericGaugeVec};
21use prometheus::{
22    GaugeVec, Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
23    exponential_buckets, histogram_opts, register_gauge_vec_with_registry,
24    register_histogram_vec_with_registry, register_histogram_with_registry,
25    register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
26    register_int_gauge_with_registry,
27};
28use risingwave_common::metrics::{
29    LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
30    LabelGuardedUintGaugeVec,
31};
32use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::{
35    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
36    register_guarded_int_gauge_vec_with_registry, register_guarded_uint_gauge_vec_with_registry,
37};
38use risingwave_connector::source::monitor::EnumeratorMetrics as SourceEnumeratorMetrics;
39use risingwave_meta_model::WorkerId;
40use risingwave_meta_model::table::TableType;
41use risingwave_object_store::object::object_metrics::{
42    GLOBAL_OBJECT_STORE_METRICS, ObjectStoreMetrics,
43};
44use risingwave_pb::common::WorkerType;
45use thiserror_ext::AsReport;
46use tokio::sync::oneshot::Sender;
47use tokio::task::JoinHandle;
48
49use crate::controller::catalog::CatalogControllerRef;
50use crate::controller::cluster::ClusterControllerRef;
51use crate::controller::system_param::SystemParamsControllerRef;
52use crate::controller::utils::PartialFragmentStateTables;
53use crate::hummock::HummockManagerRef;
54use crate::manager::MetadataManager;
55use crate::rpc::ElectionClientRef;
56
57#[derive(Clone)]
58pub struct MetaMetrics {
59    // ********************************** Meta ************************************
60    /// The number of workers in the cluster.
61    pub worker_num: IntGaugeVec,
62    /// The roles of all meta nodes in the cluster.
63    pub meta_type: IntGaugeVec,
64
65    // ********************************** gRPC ************************************
66    /// gRPC latency of meta services
67    pub grpc_latency: HistogramVec,
68
69    // ********************************** Barrier ************************************
70    /// The duration from barrier injection to commit
71    /// It is the sum of inflight-latency, sync-latency and wait-commit-latency
72    pub barrier_latency: LabelGuardedHistogramVec,
73    /// The duration from barrier complete to commit
74    pub barrier_wait_commit_latency: Histogram,
75    /// Latency between each barrier send
76    pub barrier_send_latency: LabelGuardedHistogramVec,
77    /// The number of all barriers. It is the sum of barriers that are in-flight or completed but
78    /// waiting for other barriers
79    pub all_barrier_nums: LabelGuardedIntGaugeVec,
80    /// The number of in-flight barriers
81    pub in_flight_barrier_nums: LabelGuardedIntGaugeVec,
82    /// The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.
83    pub last_committed_barrier_time: IntGaugeVec,
84    /// The barrier interval of each database
85    pub barrier_interval_by_database: GaugeVec,
86
87    // ********************************** Snapshot Backfill ***************************
88    /// The barrier latency in second of `table_id` and snapshto backfill `barrier_type`
89    pub snapshot_backfill_barrier_latency: LabelGuardedHistogramVec, // (table_id, barrier_type)
90    /// The latency of commit epoch of `table_id`
91    pub snapshot_backfill_wait_commit_latency: LabelGuardedHistogramVec, // (table_id, )
92    /// The lags between the upstream epoch and the downstream epoch.
93    pub snapshot_backfill_lag: LabelGuardedIntGaugeVec, // (table_id, )
94    /// The number of inflight barriers of `table_id`
95    pub snapshot_backfill_inflight_barrier_num: LabelGuardedIntGaugeVec, // (table_id, _)
96
97    // ********************************** Recovery ************************************
98    pub recovery_failure_cnt: IntCounterVec,
99    pub recovery_latency: HistogramVec,
100
101    // ********************************** Hummock ************************************
102    /// Max committed epoch
103    pub max_committed_epoch: IntGauge,
104    /// Min committed epoch
105    pub min_committed_epoch: IntGauge,
106    /// The number of SSTs in each level
107    pub level_sst_num: IntGaugeVec,
108    /// The number of SSTs to be merged to next level in each level
109    pub level_compact_cnt: IntGaugeVec,
110    /// The number of compact tasks
111    pub compact_frequency: IntCounterVec,
112    /// Size of each level
113    pub level_file_size: IntGaugeVec,
114    /// Hummock version size
115    pub version_size: IntGauge,
116    /// The version Id of current version.
117    pub current_version_id: IntGauge,
118    /// The version id of checkpoint version.
119    pub checkpoint_version_id: IntGauge,
120    /// The smallest version id that is being pinned by worker nodes.
121    pub min_pinned_version_id: IntGauge,
122    /// The smallest version id that is being guarded by meta node safe points.
123    pub min_safepoint_version_id: IntGauge,
124    /// Compaction groups that is in write stop state.
125    pub write_stop_compaction_groups: IntGaugeVec,
126    /// The number of attempts to trigger full GC.
127    pub full_gc_trigger_count: IntGauge,
128    /// The number of candidate object to delete after scanning object store.
129    pub full_gc_candidate_object_count: Histogram,
130    /// The number of object to delete after filtering by meta node.
131    pub full_gc_selected_object_count: Histogram,
132    /// Hummock version stats
133    pub version_stats: IntGaugeVec,
134    /// Hummock version stats
135    pub materialized_view_stats: IntGaugeVec,
136    /// Total number of objects that is no longer referenced by versions.
137    pub stale_object_count: IntGauge,
138    /// Total size of objects that is no longer referenced by versions.
139    pub stale_object_size: IntGauge,
140    /// Total number of objects that is still referenced by non-current versions.
141    pub old_version_object_count: IntGauge,
142    /// Total size of objects that is still referenced by non-current versions.
143    pub old_version_object_size: IntGauge,
144    /// Total number of objects that is referenced by time travel.
145    pub time_travel_object_count: IntGauge,
146    /// Total number of objects that is referenced by current version.
147    pub current_version_object_count: IntGauge,
148    /// Total size of objects that is referenced by current version.
149    pub current_version_object_size: IntGauge,
150    /// Total number of objects that includes dangling objects.
151    pub total_object_count: IntGauge,
152    /// Total size of objects that includes dangling objects.
153    pub total_object_size: IntGauge,
154    /// Number of objects per table change log.
155    pub table_change_log_object_count: IntGaugeVec,
156    /// Size of objects per table change log.
157    pub table_change_log_object_size: IntGaugeVec,
158    /// The number of hummock version delta log.
159    pub delta_log_count: IntGauge,
160    /// latency of version checkpoint
161    pub version_checkpoint_latency: Histogram,
162    /// Latency for hummock manager to acquire lock
163    pub hummock_manager_lock_time: HistogramVec,
164    /// Latency for hummock manager to really process a request after acquire the lock
165    pub hummock_manager_real_process_time: HistogramVec,
166    /// The number of compactions from one level to another level that have been skipped
167    pub compact_skip_frequency: IntCounterVec,
168    /// Bytes of lsm tree needed to reach balance
169    pub compact_pending_bytes: IntGaugeVec,
170    /// Per level compression ratio
171    pub compact_level_compression_ratio: GenericGaugeVec<AtomicF64>,
172    /// Per level number of running compaction task
173    pub level_compact_task_cnt: IntGaugeVec,
174    pub time_after_last_observation: Arc<AtomicU64>,
175    pub l0_compact_level_count: HistogramVec,
176    pub compact_task_size: HistogramVec,
177    pub compact_task_file_count: HistogramVec,
178    pub compact_task_batch_count: HistogramVec,
179    pub split_compaction_group_count: IntCounterVec,
180    pub state_table_count: IntGaugeVec,
181    pub branched_sst_count: IntGaugeVec,
182    pub compact_task_trivial_move_sst_count: HistogramVec,
183
184    pub compaction_event_consumed_latency: Histogram,
185    pub compaction_event_loop_iteration_latency: Histogram,
186    pub time_travel_vacuum_metadata_latency: Histogram,
187    pub time_travel_write_metadata_latency: Histogram,
188
189    // ********************************** Object Store ************************************
190    // Object store related metrics (for backup/restore and version checkpoint)
191    pub object_store_metric: Arc<ObjectStoreMetrics>,
192
193    // ********************************** Source ************************************
194    /// supervisor for which source is still up.
195    pub source_is_up: LabelGuardedIntGaugeVec,
196    pub source_enumerator_metrics: Arc<SourceEnumeratorMetrics>,
197
198    // ********************************** Fragment ************************************
199    /// A dummy gauge metrics with its label to be the mapping from actor id to fragment id
200    pub actor_info: IntGaugeVec,
201    /// A dummy gauge metrics with its label to be the mapping from table id to actor id
202    pub table_info: IntGaugeVec,
203    /// A dummy gauge metrics with its label to be the mapping from actor id to sink id
204    pub sink_info: IntGaugeVec,
205    /// A dummy gauge metrics with its label to be relation info
206    pub relation_info: IntGaugeVec,
207
208    // ********************************** System Params ************************************
209    /// A dummy gauge metric with labels carrying system parameter info.
210    /// Labels: (name, value)
211    pub system_param_info: IntGaugeVec,
212
213    /// Write throughput of commit epoch for each stable
214    pub table_write_throughput: IntCounterVec,
215
216    /// The number of compaction groups that have been triggered to move
217    pub merge_compaction_group_count: IntCounterVec,
218
219    // ********************************** Auto Schema Change ************************************
220    pub auto_schema_change_failure_cnt: LabelGuardedIntCounterVec,
221    pub auto_schema_change_success_cnt: LabelGuardedIntCounterVec,
222    pub auto_schema_change_latency: LabelGuardedHistogramVec,
223
224    pub time_travel_version_replay_latency: Histogram,
225
226    pub compaction_group_count: IntGauge,
227    pub compaction_group_size: IntGaugeVec,
228    pub compaction_group_file_count: IntGaugeVec,
229    pub compaction_group_throughput: IntGaugeVec,
230
231    // ********************************** Refresh Manager ************************************
232    pub refresh_job_duration: LabelGuardedUintGaugeVec,
233    pub refresh_job_finish_cnt: LabelGuardedIntCounterVec,
234    pub refresh_cron_job_trigger_cnt: LabelGuardedIntCounterVec,
235    pub refresh_cron_job_miss_cnt: LabelGuardedIntCounterVec,
236}
237
238pub static GLOBAL_META_METRICS: LazyLock<MetaMetrics> =
239    LazyLock::new(|| MetaMetrics::new(&GLOBAL_METRICS_REGISTRY));
240
241impl MetaMetrics {
242    fn new(registry: &Registry) -> Self {
243        let opts = histogram_opts!(
244            "meta_grpc_duration_seconds",
245            "gRPC latency of meta services",
246            exponential_buckets(0.0001, 2.0, 20).unwrap() // max 52s
247        );
248        let grpc_latency =
249            register_histogram_vec_with_registry!(opts, &["path"], registry).unwrap();
250
251        let opts = histogram_opts!(
252            "meta_barrier_duration_seconds",
253            "barrier latency",
254            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
255        );
256        let barrier_latency =
257            register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
258                .unwrap();
259
260        let opts = histogram_opts!(
261            "meta_barrier_wait_commit_duration_seconds",
262            "barrier_wait_commit_latency",
263            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
264        );
265        let barrier_wait_commit_latency =
266            register_histogram_with_registry!(opts, registry).unwrap();
267
268        let opts = histogram_opts!(
269            "meta_barrier_send_duration_seconds",
270            "barrier send latency",
271            exponential_buckets(0.1, 1.5, 19).unwrap() // max 148s
272        );
273        let barrier_send_latency =
274            register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
275                .unwrap();
276        let barrier_interval_by_database = register_gauge_vec_with_registry!(
277            "meta_barrier_interval_by_database",
278            "barrier interval of each database",
279            &["database_id"],
280            registry
281        )
282        .unwrap();
283
284        let all_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
285            "all_barrier_nums",
286            "num of of all_barrier",
287            &["database_id"],
288            registry
289        )
290        .unwrap();
291        let in_flight_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
292            "in_flight_barrier_nums",
293            "num of of in_flight_barrier",
294            &["database_id"],
295            registry
296        )
297        .unwrap();
298        let last_committed_barrier_time = register_int_gauge_vec_with_registry!(
299            "last_committed_barrier_time",
300            "The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.",
301            &["database_id"],
302            registry
303        )
304        .unwrap();
305
306        // snapshot backfill metrics
307        let opts = histogram_opts!(
308            "meta_snapshot_backfill_barrier_duration_seconds",
309            "snapshot backfill barrier latency",
310            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
311        );
312        let snapshot_backfill_barrier_latency = register_guarded_histogram_vec_with_registry!(
313            opts,
314            &["table_id", "barrier_type"],
315            registry
316        )
317        .unwrap();
318        let opts = histogram_opts!(
319            "meta_snapshot_backfill_barrier_wait_commit_duration_seconds",
320            "snapshot backfill barrier_wait_commit_latency",
321            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
322        );
323        let snapshot_backfill_wait_commit_latency =
324            register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
325
326        let snapshot_backfill_lag = register_guarded_int_gauge_vec_with_registry!(
327            "meta_snapshot_backfill_upstream_lag",
328            "snapshot backfill upstream_lag",
329            &["table_id"],
330            registry
331        )
332        .unwrap();
333        let snapshot_backfill_inflight_barrier_num = register_guarded_int_gauge_vec_with_registry!(
334            "meta_snapshot_backfill_inflight_barrier_num",
335            "snapshot backfill inflight_barrier_num",
336            &["table_id"],
337            registry
338        )
339        .unwrap();
340
341        let max_committed_epoch = register_int_gauge_with_registry!(
342            "storage_max_committed_epoch",
343            "max committed epoch",
344            registry
345        )
346        .unwrap();
347
348        let min_committed_epoch = register_int_gauge_with_registry!(
349            "storage_min_committed_epoch",
350            "min committed epoch",
351            registry
352        )
353        .unwrap();
354
355        let level_sst_num = register_int_gauge_vec_with_registry!(
356            "storage_level_sst_num",
357            "num of SSTs in each level",
358            &["level_index"],
359            registry
360        )
361        .unwrap();
362
363        let level_compact_cnt = register_int_gauge_vec_with_registry!(
364            "storage_level_compact_cnt",
365            "num of SSTs to be merged to next level in each level",
366            &["level_index"],
367            registry
368        )
369        .unwrap();
370
371        let compact_frequency = register_int_counter_vec_with_registry!(
372            "storage_level_compact_frequency",
373            "The number of compactions from one level to another level that have completed or failed.",
374            &["compactor", "group", "task_type", "result"],
375            registry
376        )
377        .unwrap();
378        let compact_skip_frequency = register_int_counter_vec_with_registry!(
379            "storage_skip_compact_frequency",
380            "The number of compactions from one level to another level that have been skipped.",
381            &["level", "type"],
382            registry
383        )
384        .unwrap();
385
386        let version_size =
387            register_int_gauge_with_registry!("storage_version_size", "version size", registry)
388                .unwrap();
389
390        let current_version_id = register_int_gauge_with_registry!(
391            "storage_current_version_id",
392            "current version id",
393            registry
394        )
395        .unwrap();
396
397        let checkpoint_version_id = register_int_gauge_with_registry!(
398            "storage_checkpoint_version_id",
399            "checkpoint version id",
400            registry
401        )
402        .unwrap();
403
404        let min_pinned_version_id = register_int_gauge_with_registry!(
405            "storage_min_pinned_version_id",
406            "min pinned version id",
407            registry
408        )
409        .unwrap();
410
411        let write_stop_compaction_groups = register_int_gauge_vec_with_registry!(
412            "storage_write_stop_compaction_groups",
413            "compaction groups of write stop state",
414            &["compaction_group_id"],
415            registry
416        )
417        .unwrap();
418
419        let full_gc_trigger_count = register_int_gauge_with_registry!(
420            "storage_full_gc_trigger_count",
421            "the number of attempts to trigger full GC",
422            registry
423        )
424        .unwrap();
425
426        let opts = histogram_opts!(
427            "storage_full_gc_candidate_object_count",
428            "the number of candidate object to delete after scanning object store",
429            exponential_buckets(1.0, 10.0, 6).unwrap()
430        );
431        let full_gc_candidate_object_count =
432            register_histogram_with_registry!(opts, registry).unwrap();
433
434        let opts = histogram_opts!(
435            "storage_full_gc_selected_object_count",
436            "the number of object to delete after filtering by meta node",
437            exponential_buckets(1.0, 10.0, 6).unwrap()
438        );
439        let full_gc_selected_object_count =
440            register_histogram_with_registry!(opts, registry).unwrap();
441
442        let min_safepoint_version_id = register_int_gauge_with_registry!(
443            "storage_min_safepoint_version_id",
444            "min safepoint version id",
445            registry
446        )
447        .unwrap();
448
449        let level_file_size = register_int_gauge_vec_with_registry!(
450            "storage_level_total_file_size",
451            "KBs total file bytes in each level",
452            &["level_index"],
453            registry
454        )
455        .unwrap();
456
457        let version_stats = register_int_gauge_vec_with_registry!(
458            "storage_version_stats",
459            "per table stats in current hummock version",
460            &["table_id", "metric"],
461            registry
462        )
463        .unwrap();
464
465        let materialized_view_stats = register_int_gauge_vec_with_registry!(
466            "storage_materialized_view_stats",
467            "per materialized view stats in current hummock version",
468            &["table_id", "metric"],
469            registry
470        )
471        .unwrap();
472
473        let stale_object_count = register_int_gauge_with_registry!(
474            "storage_stale_object_count",
475            "total number of objects that is no longer referenced by versions.",
476            registry
477        )
478        .unwrap();
479
480        let stale_object_size = register_int_gauge_with_registry!(
481            "storage_stale_object_size",
482            "total size of objects that is no longer referenced by versions.",
483            registry
484        )
485        .unwrap();
486
487        let old_version_object_count = register_int_gauge_with_registry!(
488            "storage_old_version_object_count",
489            "total number of objects that is still referenced by non-current versions",
490            registry
491        )
492        .unwrap();
493
494        let old_version_object_size = register_int_gauge_with_registry!(
495            "storage_old_version_object_size",
496            "total size of objects that is still referenced by non-current versions",
497            registry
498        )
499        .unwrap();
500
501        let current_version_object_count = register_int_gauge_with_registry!(
502            "storage_current_version_object_count",
503            "total number of objects that is referenced by current version",
504            registry
505        )
506        .unwrap();
507
508        let current_version_object_size = register_int_gauge_with_registry!(
509            "storage_current_version_object_size",
510            "total size of objects that is referenced by current version",
511            registry
512        )
513        .unwrap();
514
515        let total_object_count = register_int_gauge_with_registry!(
516            "storage_total_object_count",
517            "Total number of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
518            registry
519        ).unwrap();
520
521        let total_object_size = register_int_gauge_with_registry!(
522            "storage_total_object_size",
523            "Total size of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
524            registry
525        ).unwrap();
526
527        let table_change_log_object_count = register_int_gauge_vec_with_registry!(
528            "storage_table_change_log_object_count",
529            "per table change log object count",
530            &["table_id"],
531            registry
532        )
533        .unwrap();
534
535        let table_change_log_object_size = register_int_gauge_vec_with_registry!(
536            "storage_table_change_log_object_size",
537            "per table change log object size",
538            &["table_id"],
539            registry
540        )
541        .unwrap();
542
543        let time_travel_object_count = register_int_gauge_with_registry!(
544            "storage_time_travel_object_count",
545            "total number of objects that is referenced by time travel.",
546            registry
547        )
548        .unwrap();
549
550        let delta_log_count = register_int_gauge_with_registry!(
551            "storage_delta_log_count",
552            "total number of hummock version delta log",
553            registry
554        )
555        .unwrap();
556
557        let opts = histogram_opts!(
558            "storage_version_checkpoint_latency",
559            "hummock version checkpoint latency",
560            exponential_buckets(0.1, 1.5, 20).unwrap()
561        );
562        let version_checkpoint_latency = register_histogram_with_registry!(opts, registry).unwrap();
563
564        let hummock_manager_lock_time = register_histogram_vec_with_registry!(
565            "hummock_manager_lock_time",
566            "latency for hummock manager to acquire the rwlock",
567            &["lock_name", "lock_type"],
568            registry
569        )
570        .unwrap();
571
572        let hummock_manager_real_process_time = register_histogram_vec_with_registry!(
573            "meta_hummock_manager_real_process_time",
574            "latency for hummock manager to really process the request",
575            &["method"],
576            registry
577        )
578        .unwrap();
579
580        let worker_num = register_int_gauge_vec_with_registry!(
581            "worker_num",
582            "number of nodes in the cluster",
583            &["worker_type"],
584            registry,
585        )
586        .unwrap();
587
588        let meta_type = register_int_gauge_vec_with_registry!(
589            "meta_num",
590            "role of meta nodes in the cluster",
591            &["worker_addr", "role"],
592            registry,
593        )
594        .unwrap();
595
596        let compact_pending_bytes = register_int_gauge_vec_with_registry!(
597            "storage_compact_pending_bytes",
598            "bytes of lsm tree needed to reach balance",
599            &["group"],
600            registry
601        )
602        .unwrap();
603
604        let compact_level_compression_ratio = register_gauge_vec_with_registry!(
605            "storage_compact_level_compression_ratio",
606            "compression ratio of each level of the lsm tree",
607            &["group", "level", "algorithm"],
608            registry
609        )
610        .unwrap();
611
612        let level_compact_task_cnt = register_int_gauge_vec_with_registry!(
613            "storage_level_compact_task_cnt",
614            "num of compact_task organized by group and level",
615            &["task"],
616            registry
617        )
618        .unwrap();
619
620        let time_travel_vacuum_metadata_latency = register_histogram_with_registry!(
621            histogram_opts!(
622                "storage_time_travel_vacuum_metadata_latency",
623                "Latency of vacuuming metadata for time travel",
624                exponential_buckets(0.1, 1.5, 20).unwrap()
625            ),
626            registry
627        )
628        .unwrap();
629        let time_travel_write_metadata_latency = register_histogram_with_registry!(
630            histogram_opts!(
631                "storage_time_travel_write_metadata_latency",
632                "Latency of writing metadata for time travel",
633                exponential_buckets(0.1, 1.5, 20).unwrap()
634            ),
635            registry
636        )
637        .unwrap();
638
639        let object_store_metric = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
640
641        let recovery_failure_cnt = register_int_counter_vec_with_registry!(
642            "recovery_failure_cnt",
643            "Number of failed recovery attempts",
644            &["recovery_type"],
645            registry
646        )
647        .unwrap();
648        let opts = histogram_opts!(
649            "recovery_latency",
650            "Latency of the recovery process",
651            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
652        );
653        let recovery_latency =
654            register_histogram_vec_with_registry!(opts, &["recovery_type"], registry).unwrap();
655
656        let auto_schema_change_failure_cnt = register_guarded_int_counter_vec_with_registry!(
657            "auto_schema_change_failure_cnt",
658            "Number of failed auto schema change",
659            &["table_id", "table_name"],
660            registry
661        )
662        .unwrap();
663
664        let auto_schema_change_success_cnt = register_guarded_int_counter_vec_with_registry!(
665            "auto_schema_change_success_cnt",
666            "Number of success auto schema change",
667            &["table_id", "table_name"],
668            registry
669        )
670        .unwrap();
671
672        let opts = histogram_opts!(
673            "auto_schema_change_latency",
674            "Latency of the auto schema change process",
675            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
676        );
677        let auto_schema_change_latency = register_guarded_histogram_vec_with_registry!(
678            opts,
679            &["table_id", "table_name"],
680            registry
681        )
682        .unwrap();
683
684        let source_is_up = register_guarded_int_gauge_vec_with_registry!(
685            "source_status_is_up",
686            "source is up or not",
687            &["source_id", "source_name"],
688            registry
689        )
690        .unwrap();
691        let source_enumerator_metrics = Arc::new(SourceEnumeratorMetrics::default());
692
693        let actor_info = register_int_gauge_vec_with_registry!(
694            "actor_info",
695            "Mapping from actor id to (fragment id, compute node)",
696            &["actor_id", "fragment_id", "compute_node"],
697            registry
698        )
699        .unwrap();
700
701        let table_info = register_int_gauge_vec_with_registry!(
702            "table_info",
703            "Mapping from table id to (actor id, table name)",
704            &[
705                "materialized_view_id",
706                "table_id",
707                "fragment_id",
708                "table_name",
709                "table_type",
710                "compaction_group_id"
711            ],
712            registry
713        )
714        .unwrap();
715
716        let sink_info = register_int_gauge_vec_with_registry!(
717            "sink_info",
718            "Mapping from actor id to (actor id, sink name)",
719            &["actor_id", "sink_id", "sink_name",],
720            registry
721        )
722        .unwrap();
723
724        let relation_info = register_int_gauge_vec_with_registry!(
725            "relation_info",
726            "Information of the database relation (table/source/sink/materialized view/index/internal)",
727            &["id", "database", "schema", "name", "resource_group", "type"],
728            registry
729        )
730        .unwrap();
731
732        let l0_compact_level_count = register_histogram_vec_with_registry!(
733            "storage_l0_compact_level_count",
734            "level_count of l0 compact task",
735            &["group", "type"],
736            registry
737        )
738        .unwrap();
739
740        // System parameter info
741        let system_param_info = register_int_gauge_vec_with_registry!(
742            "system_param_info",
743            "Information of system parameters",
744            &["name", "value"],
745            registry
746        )
747        .unwrap();
748
749        let opts = histogram_opts!(
750            "storage_compact_task_size",
751            "Total size of compact that have been issued to state store",
752            exponential_buckets(1048576.0, 2.0, 16).unwrap()
753        );
754
755        let compact_task_size =
756            register_histogram_vec_with_registry!(opts, &["group", "type"], registry).unwrap();
757
758        let compact_task_file_count = register_histogram_vec_with_registry!(
759            "storage_compact_task_file_count",
760            "file count of compact task",
761            &["group", "type"],
762            registry
763        )
764        .unwrap();
765        let opts = histogram_opts!(
766            "storage_compact_task_batch_count",
767            "count of compact task batch",
768            exponential_buckets(1.0, 2.0, 8).unwrap()
769        );
770        let compact_task_batch_count =
771            register_histogram_vec_with_registry!(opts, &["type"], registry).unwrap();
772
773        let table_write_throughput = register_int_counter_vec_with_registry!(
774            "storage_commit_write_throughput",
775            "The number of compactions from one level to another level that have been skipped.",
776            &["table_id"],
777            registry
778        )
779        .unwrap();
780
781        let split_compaction_group_count = register_int_counter_vec_with_registry!(
782            "storage_split_compaction_group_count",
783            "Count of trigger split compaction group",
784            &["group"],
785            registry
786        )
787        .unwrap();
788
789        let state_table_count = register_int_gauge_vec_with_registry!(
790            "storage_state_table_count",
791            "Count of stable table per compaction group",
792            &["group"],
793            registry
794        )
795        .unwrap();
796
797        let branched_sst_count = register_int_gauge_vec_with_registry!(
798            "storage_branched_sst_count",
799            "Count of branched sst per compaction group",
800            &["group"],
801            registry
802        )
803        .unwrap();
804
805        let opts = histogram_opts!(
806            "storage_compaction_event_consumed_latency",
807            "The latency(ms) of each event being consumed",
808            exponential_buckets(1.0, 1.5, 30).unwrap() // max 191s
809        );
810        let compaction_event_consumed_latency =
811            register_histogram_with_registry!(opts, registry).unwrap();
812
813        let opts = histogram_opts!(
814            "storage_compaction_event_loop_iteration_latency",
815            "The latency(ms) of each iteration of the compaction event loop",
816            exponential_buckets(1.0, 1.5, 30).unwrap() // max 191s
817        );
818        let compaction_event_loop_iteration_latency =
819            register_histogram_with_registry!(opts, registry).unwrap();
820
821        let merge_compaction_group_count = register_int_counter_vec_with_registry!(
822            "storage_merge_compaction_group_count",
823            "Count of trigger merge compaction group",
824            &["group"],
825            registry
826        )
827        .unwrap();
828
829        let opts = histogram_opts!(
830            "storage_time_travel_version_replay_latency",
831            "The latency(ms) of replaying a hummock version for time travel",
832            exponential_buckets(0.01, 10.0, 6).unwrap()
833        );
834        let time_travel_version_replay_latency =
835            register_histogram_with_registry!(opts, registry).unwrap();
836
837        let compaction_group_count = register_int_gauge_with_registry!(
838            "storage_compaction_group_count",
839            "The number of compaction groups",
840            registry,
841        )
842        .unwrap();
843
844        let compaction_group_size = register_int_gauge_vec_with_registry!(
845            "storage_compaction_group_size",
846            "The size of compaction group",
847            &["group"],
848            registry
849        )
850        .unwrap();
851
852        let compaction_group_file_count = register_int_gauge_vec_with_registry!(
853            "storage_compaction_group_file_count",
854            "The file count of compaction group",
855            &["group"],
856            registry
857        )
858        .unwrap();
859
860        let compaction_group_throughput = register_int_gauge_vec_with_registry!(
861            "storage_compaction_group_throughput",
862            "The throughput of compaction group",
863            &["group"],
864            registry
865        )
866        .unwrap();
867
868        let opts = histogram_opts!(
869            "storage_compact_task_trivial_move_sst_count",
870            "sst count of compact trivial-move task",
871            exponential_buckets(1.0, 2.0, 8).unwrap()
872        );
873        let compact_task_trivial_move_sst_count =
874            register_histogram_vec_with_registry!(opts, &["group"], registry).unwrap();
875
876        let refresh_job_duration = register_guarded_uint_gauge_vec_with_registry!(
877            "meta_refresh_job_duration",
878            "The duration of refresh job",
879            &["table_id", "status"],
880            registry
881        )
882        .unwrap();
883        let refresh_job_finish_cnt = register_guarded_int_counter_vec_with_registry!(
884            "meta_refresh_job_finish_cnt",
885            "The number of finished refresh jobs",
886            &["table_id", "status"],
887            registry
888        )
889        .unwrap();
890        let refresh_cron_job_trigger_cnt = register_guarded_int_counter_vec_with_registry!(
891            "meta_refresh_cron_job_trigger_cnt",
892            "The number of cron refresh jobs triggered",
893            &["table_id"],
894            registry
895        )
896        .unwrap();
897        let refresh_cron_job_miss_cnt = register_guarded_int_counter_vec_with_registry!(
898            "meta_refresh_cron_job_miss_cnt",
899            "The number of cron refresh jobs missed",
900            &["table_id"],
901            registry
902        )
903        .unwrap();
904
905        Self {
906            grpc_latency,
907            barrier_latency,
908            barrier_wait_commit_latency,
909            barrier_send_latency,
910            all_barrier_nums,
911            in_flight_barrier_nums,
912            last_committed_barrier_time,
913            barrier_interval_by_database,
914            snapshot_backfill_barrier_latency,
915            snapshot_backfill_wait_commit_latency,
916            snapshot_backfill_lag,
917            snapshot_backfill_inflight_barrier_num,
918            recovery_failure_cnt,
919            recovery_latency,
920
921            max_committed_epoch,
922            min_committed_epoch,
923            level_sst_num,
924            level_compact_cnt,
925            compact_frequency,
926            compact_skip_frequency,
927            level_file_size,
928            version_size,
929            version_stats,
930            materialized_view_stats,
931            stale_object_count,
932            stale_object_size,
933            old_version_object_count,
934            old_version_object_size,
935            time_travel_object_count,
936            current_version_object_count,
937            current_version_object_size,
938            total_object_count,
939            total_object_size,
940            table_change_log_object_count,
941            table_change_log_object_size,
942            delta_log_count,
943            version_checkpoint_latency,
944            current_version_id,
945            checkpoint_version_id,
946            min_pinned_version_id,
947            min_safepoint_version_id,
948            write_stop_compaction_groups,
949            full_gc_trigger_count,
950            full_gc_candidate_object_count,
951            full_gc_selected_object_count,
952            hummock_manager_lock_time,
953            hummock_manager_real_process_time,
954            time_after_last_observation: Arc::new(AtomicU64::new(0)),
955            worker_num,
956            meta_type,
957            compact_pending_bytes,
958            compact_level_compression_ratio,
959            level_compact_task_cnt,
960            object_store_metric,
961            source_is_up,
962            source_enumerator_metrics,
963            actor_info,
964            table_info,
965            sink_info,
966            relation_info,
967            system_param_info,
968            l0_compact_level_count,
969            compact_task_size,
970            compact_task_file_count,
971            compact_task_batch_count,
972            compact_task_trivial_move_sst_count,
973            table_write_throughput,
974            split_compaction_group_count,
975            state_table_count,
976            branched_sst_count,
977            compaction_event_consumed_latency,
978            compaction_event_loop_iteration_latency,
979            auto_schema_change_failure_cnt,
980            auto_schema_change_success_cnt,
981            auto_schema_change_latency,
982            merge_compaction_group_count,
983            time_travel_version_replay_latency,
984            compaction_group_count,
985            compaction_group_size,
986            compaction_group_file_count,
987            compaction_group_throughput,
988            refresh_job_duration,
989            refresh_job_finish_cnt,
990            refresh_cron_job_trigger_cnt,
991            refresh_cron_job_miss_cnt,
992            time_travel_vacuum_metadata_latency,
993            time_travel_write_metadata_latency,
994        }
995    }
996
997    #[cfg(test)]
998    pub fn for_test(registry: &Registry) -> Self {
999        Self::new(registry)
1000    }
1001}
1002impl Default for MetaMetrics {
1003    fn default() -> Self {
1004        GLOBAL_META_METRICS.clone()
1005    }
1006}
1007
1008/// Refresh `system_param_info` metrics by reading current system parameters.
1009pub async fn refresh_system_param_info_metrics(
1010    system_params_controller: &SystemParamsControllerRef,
1011    meta_metrics: Arc<MetaMetrics>,
1012) {
1013    let params_info = system_params_controller.get_params().await.get_all();
1014
1015    meta_metrics.system_param_info.reset();
1016    for info in params_info {
1017        meta_metrics
1018            .system_param_info
1019            .with_label_values(&[info.name, &info.value])
1020            .set(1);
1021    }
1022}
1023
1024pub fn start_worker_info_monitor(
1025    metadata_manager: MetadataManager,
1026    election_client: ElectionClientRef,
1027    interval: Duration,
1028    meta_metrics: Arc<MetaMetrics>,
1029) -> (JoinHandle<()>, Sender<()>) {
1030    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1031    let join_handle = tokio::spawn(async move {
1032        let mut monitor_interval = tokio::time::interval(interval);
1033        monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1034        loop {
1035            tokio::select! {
1036                // Wait for interval
1037                _ = monitor_interval.tick() => {},
1038                // Shutdown monitor
1039                _ = &mut shutdown_rx => {
1040                    tracing::info!("Worker number monitor is stopped");
1041                    return;
1042                }
1043            }
1044
1045            let node_map = match metadata_manager.count_worker_node().await {
1046                Ok(node_map) => node_map,
1047                Err(err) => {
1048                    tracing::warn!(error = %err.as_report(), "fail to count worker node");
1049                    continue;
1050                }
1051            };
1052
1053            // Reset metrics to clean the stale labels e.g. invalid lease ids
1054            meta_metrics.worker_num.reset();
1055            meta_metrics.meta_type.reset();
1056
1057            for (worker_type, worker_num) in node_map {
1058                meta_metrics
1059                    .worker_num
1060                    .with_label_values(&[(worker_type.as_str_name())])
1061                    .set(worker_num as i64);
1062            }
1063            if let Ok(meta_members) = election_client.get_members().await {
1064                meta_metrics
1065                    .worker_num
1066                    .with_label_values(&[WorkerType::Meta.as_str_name()])
1067                    .set(meta_members.len() as i64);
1068                meta_members.into_iter().for_each(|m| {
1069                    let role = if m.is_leader { "leader" } else { "follower" };
1070                    meta_metrics
1071                        .meta_type
1072                        .with_label_values(&[m.id.as_str(), role])
1073                        .set(1);
1074                });
1075            }
1076        }
1077    });
1078
1079    (join_handle, shutdown_tx)
1080}
1081
1082pub async fn refresh_fragment_info_metrics(
1083    catalog_controller: &CatalogControllerRef,
1084    cluster_controller: &ClusterControllerRef,
1085    hummock_manager: &HummockManagerRef,
1086    meta_metrics: Arc<MetaMetrics>,
1087) {
1088    let worker_nodes = match cluster_controller
1089        .list_workers(Some(WorkerType::ComputeNode.into()), None)
1090        .await
1091    {
1092        Ok(worker_nodes) => worker_nodes,
1093        Err(err) => {
1094            tracing::warn!(error=%err.as_report(), "fail to list worker node");
1095            return;
1096        }
1097    };
1098    let actor_locations = match catalog_controller.list_actor_locations() {
1099        Ok(actor_locations) => actor_locations,
1100        Err(err) => {
1101            tracing::warn!(error=%err.as_report(), "fail to get actor locations");
1102            return;
1103        }
1104    };
1105    let sink_actor_mapping = match catalog_controller.list_sink_actor_mapping().await {
1106        Ok(sink_actor_mapping) => sink_actor_mapping,
1107        Err(err) => {
1108            tracing::warn!(error=%err.as_report(), "fail to get sink actor mapping");
1109            return;
1110        }
1111    };
1112    let fragment_state_tables = match catalog_controller.list_fragment_state_tables().await {
1113        Ok(fragment_state_tables) => fragment_state_tables,
1114        Err(err) => {
1115            tracing::warn!(error=%err.as_report(), "fail to get fragment state tables");
1116            return;
1117        }
1118    };
1119    let table_name_and_type_mapping = match catalog_controller.get_table_name_type_mapping().await {
1120        Ok(mapping) => mapping,
1121        Err(err) => {
1122            tracing::warn!(error=%err.as_report(), "fail to get table name mapping");
1123            return;
1124        }
1125    };
1126
1127    let worker_addr_mapping: HashMap<WorkerId, String> = worker_nodes
1128        .into_iter()
1129        .map(|worker_node| {
1130            let addr = match worker_node.host {
1131                Some(host) => format!("{}:{}", host.host, host.port),
1132                None => "".to_owned(),
1133            };
1134            (worker_node.id, addr)
1135        })
1136        .collect();
1137    let table_compaction_group_id_mapping = hummock_manager
1138        .get_table_compaction_group_id_mapping()
1139        .await;
1140
1141    // Start fresh with a reset to clear all outdated labels. This is safe since we always
1142    // report full info on each interval.
1143    meta_metrics.actor_info.reset();
1144    meta_metrics.table_info.reset();
1145    meta_metrics.sink_info.reset();
1146    for actor_location in actor_locations {
1147        let actor_id_str = actor_location.actor_id.to_string();
1148        let fragment_id_str = actor_location.fragment_id.to_string();
1149        // Report a dummy gauge metrics with (fragment id, actor id, node
1150        // address) as its label
1151        if let Some(address) = worker_addr_mapping.get(&actor_location.worker_id) {
1152            meta_metrics
1153                .actor_info
1154                .with_label_values(&[&actor_id_str, &fragment_id_str, address])
1155                .set(1);
1156        }
1157    }
1158    for (sink_id, (sink_name, actor_ids)) in sink_actor_mapping {
1159        let sink_id_str = sink_id.to_string();
1160        for actor_id in actor_ids {
1161            let actor_id_str = actor_id.to_string();
1162            meta_metrics
1163                .sink_info
1164                .with_label_values(&[&actor_id_str, &sink_id_str, &sink_name])
1165                .set(1);
1166        }
1167    }
1168    for PartialFragmentStateTables {
1169        fragment_id,
1170        job_id,
1171        state_table_ids,
1172    } in fragment_state_tables
1173    {
1174        let fragment_id_str = fragment_id.to_string();
1175        let job_id_str = job_id.to_string();
1176        for table_id in state_table_ids.into_inner() {
1177            let table_id_str = table_id.to_string();
1178            let (table_name, table_type) = table_name_and_type_mapping
1179                .get(&table_id)
1180                .cloned()
1181                .unwrap_or_else(|| ("unknown".to_owned(), "unknown".to_owned()));
1182            let compaction_group_id = table_compaction_group_id_mapping
1183                .get(&table_id)
1184                .map(|cg_id| cg_id.to_string())
1185                .unwrap_or_else(|| "unknown".to_owned());
1186            meta_metrics
1187                .table_info
1188                .with_label_values(&[
1189                    &job_id_str,
1190                    &table_id_str,
1191                    &fragment_id_str,
1192                    &table_name,
1193                    &table_type,
1194                    &compaction_group_id,
1195                ])
1196                .set(1);
1197        }
1198    }
1199}
1200
1201pub async fn refresh_relation_info_metrics(
1202    catalog_controller: &CatalogControllerRef,
1203    meta_metrics: Arc<MetaMetrics>,
1204) {
1205    let table_objects = match catalog_controller.list_table_objects().await {
1206        Ok(table_objects) => table_objects,
1207        Err(err) => {
1208            tracing::warn!(error=%err.as_report(), "fail to get table objects");
1209            return;
1210        }
1211    };
1212
1213    let source_objects = match catalog_controller.list_source_objects().await {
1214        Ok(source_objects) => source_objects,
1215        Err(err) => {
1216            tracing::warn!(error=%err.as_report(), "fail to get source objects");
1217            return;
1218        }
1219    };
1220
1221    let sink_objects = match catalog_controller.list_sink_objects().await {
1222        Ok(sink_objects) => sink_objects,
1223        Err(err) => {
1224            tracing::warn!(error=%err.as_report(), "fail to get sink objects");
1225            return;
1226        }
1227    };
1228
1229    meta_metrics.relation_info.reset();
1230
1231    for (id, db, schema, name, resource_group, table_type) in table_objects {
1232        let relation_type = match table_type {
1233            TableType::Table => "table",
1234            TableType::MaterializedView => "materialized_view",
1235            TableType::Index | TableType::VectorIndex => "index",
1236            TableType::Internal => "internal",
1237        };
1238        meta_metrics
1239            .relation_info
1240            .with_label_values(&[
1241                &id.to_string(),
1242                &db,
1243                &schema,
1244                &name,
1245                &resource_group,
1246                &relation_type.to_owned(),
1247            ])
1248            .set(1);
1249    }
1250
1251    for (id, db, schema, name, resource_group) in source_objects {
1252        meta_metrics
1253            .relation_info
1254            .with_label_values(&[
1255                &id.to_string(),
1256                &db,
1257                &schema,
1258                &name,
1259                &resource_group,
1260                &"source".to_owned(),
1261            ])
1262            .set(1);
1263    }
1264
1265    for (id, db, schema, name, resource_group) in sink_objects {
1266        meta_metrics
1267            .relation_info
1268            .with_label_values(&[
1269                &id.to_string(),
1270                &db,
1271                &schema,
1272                &name,
1273                &resource_group,
1274                &"sink".to_owned(),
1275            ])
1276            .set(1);
1277    }
1278}
1279
1280pub fn start_info_monitor(
1281    metadata_manager: MetadataManager,
1282    hummock_manager: HummockManagerRef,
1283    system_params_controller: SystemParamsControllerRef,
1284    meta_metrics: Arc<MetaMetrics>,
1285) -> (JoinHandle<()>, Sender<()>) {
1286    const COLLECT_INTERVAL_SECONDS: u64 = 60;
1287
1288    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1289    let join_handle = tokio::spawn(async move {
1290        let mut monitor_interval =
1291            tokio::time::interval(Duration::from_secs(COLLECT_INTERVAL_SECONDS));
1292        monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1293        loop {
1294            tokio::select! {
1295                // Wait for interval
1296                _ = monitor_interval.tick() => {},
1297                // Shutdown monitor
1298                _ = &mut shutdown_rx => {
1299                    tracing::info!("Meta info monitor is stopped");
1300                    return;
1301                }
1302            }
1303
1304            // Fragment and relation info
1305            refresh_fragment_info_metrics(
1306                &metadata_manager.catalog_controller,
1307                &metadata_manager.cluster_controller,
1308                &hummock_manager,
1309                meta_metrics.clone(),
1310            )
1311            .await;
1312
1313            refresh_relation_info_metrics(
1314                &metadata_manager.catalog_controller,
1315                meta_metrics.clone(),
1316            )
1317            .await;
1318
1319            // System parameter info
1320            refresh_system_param_info_metrics(&system_params_controller, meta_metrics.clone())
1321                .await;
1322        }
1323    });
1324
1325    (join_handle, shutdown_tx)
1326}