risingwave_meta/rpc/
metrics.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::atomic::AtomicU64;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use prometheus::core::{AtomicF64, GenericGaugeVec};
21use prometheus::{
22    GaugeVec, Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
23    exponential_buckets, histogram_opts, register_gauge_vec_with_registry,
24    register_histogram_vec_with_registry, register_histogram_with_registry,
25    register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
26    register_int_gauge_with_registry,
27};
28use risingwave_common::catalog::{FragmentTypeFlag, TableId};
29use risingwave_common::metrics::{
30    LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
31    LabelGuardedUintGaugeVec,
32};
33use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
34use risingwave_common::system_param::reader::SystemParamsRead;
35use risingwave_common::util::stream_graph_visitor::{
36    visit_stream_node_source_backfill, visit_stream_node_stream_scan,
37};
38use risingwave_common::{
39    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
40    register_guarded_int_gauge_vec_with_registry, register_guarded_uint_gauge_vec_with_registry,
41};
42use risingwave_connector::source::monitor::EnumeratorMetrics as SourceEnumeratorMetrics;
43use risingwave_meta_model::table::TableType;
44use risingwave_meta_model::{ObjectId, WorkerId};
45use risingwave_object_store::object::object_metrics::{
46    GLOBAL_OBJECT_STORE_METRICS, ObjectStoreMetrics,
47};
48use risingwave_pb::common::WorkerType;
49use risingwave_pb::meta::FragmentDistribution;
50use thiserror_ext::AsReport;
51use tokio::sync::oneshot::Sender;
52use tokio::task::JoinHandle;
53
54use crate::barrier::BarrierManagerRef;
55use crate::controller::catalog::CatalogControllerRef;
56use crate::controller::cluster::ClusterControllerRef;
57use crate::controller::system_param::SystemParamsControllerRef;
58use crate::controller::utils::PartialFragmentStateTables;
59use crate::hummock::HummockManagerRef;
60use crate::manager::MetadataManager;
61use crate::rpc::ElectionClientRef;
62
63struct BackfillFragmentInfo {
64    job_id: u32,
65    fragment_id: u32,
66    backfill_state_table_id: u32,
67    backfill_target_relation_id: u32,
68    backfill_type: &'static str,
69    backfill_epoch: u64,
70}
71
72#[derive(Clone)]
73pub struct MetaMetrics {
74    // ********************************** Meta ************************************
75    /// The number of workers in the cluster.
76    pub worker_num: IntGaugeVec,
77    /// The roles of all meta nodes in the cluster.
78    pub meta_type: IntGaugeVec,
79
80    // ********************************** gRPC ************************************
81    /// gRPC latency of meta services
82    pub grpc_latency: HistogramVec,
83
84    // ********************************** Barrier ************************************
85    /// The duration from barrier injection to commit
86    /// It is the sum of inflight-latency, sync-latency and wait-commit-latency
87    pub barrier_latency: LabelGuardedHistogramVec,
88    /// The duration from barrier complete to commit
89    pub barrier_wait_commit_latency: Histogram,
90    /// Latency between each barrier send
91    pub barrier_send_latency: LabelGuardedHistogramVec,
92    /// The number of all barriers. It is the sum of barriers that are in-flight or completed but
93    /// waiting for other barriers
94    pub all_barrier_nums: LabelGuardedIntGaugeVec,
95    /// The number of in-flight barriers
96    pub in_flight_barrier_nums: LabelGuardedIntGaugeVec,
97    /// The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.
98    pub last_committed_barrier_time: IntGaugeVec,
99    /// The barrier interval of each database
100    pub barrier_interval_by_database: GaugeVec,
101
102    // ********************************** Snapshot Backfill ***************************
103    /// The barrier latency in second of `table_id` and snapshto backfill `barrier_type`
104    pub snapshot_backfill_barrier_latency: LabelGuardedHistogramVec, // (table_id, barrier_type)
105    /// The latency of commit epoch of `table_id`
106    pub snapshot_backfill_wait_commit_latency: LabelGuardedHistogramVec, // (table_id, )
107    /// The lags between the upstream epoch and the downstream epoch.
108    pub snapshot_backfill_lag: LabelGuardedIntGaugeVec, // (table_id, )
109    /// The number of inflight barriers of `table_id`
110    pub snapshot_backfill_inflight_barrier_num: LabelGuardedIntGaugeVec, // (table_id, _)
111
112    // ********************************** Recovery ************************************
113    pub recovery_failure_cnt: IntCounterVec,
114    pub recovery_latency: HistogramVec,
115
116    // ********************************** Hummock ************************************
117    /// Max committed epoch
118    pub max_committed_epoch: IntGauge,
119    /// Min committed epoch
120    pub min_committed_epoch: IntGauge,
121    /// The number of SSTs in each level
122    pub level_sst_num: IntGaugeVec,
123    /// The number of SSTs to be merged to next level in each level
124    pub level_compact_cnt: IntGaugeVec,
125    /// The number of compact tasks
126    pub compact_frequency: IntCounterVec,
127    /// Size of each level
128    pub level_file_size: IntGaugeVec,
129    /// Hummock version size
130    pub version_size: IntGauge,
131    /// The version Id of current version.
132    pub current_version_id: IntGauge,
133    /// The version id of checkpoint version.
134    pub checkpoint_version_id: IntGauge,
135    /// The smallest version id that is being pinned by worker nodes.
136    pub min_pinned_version_id: IntGauge,
137    /// The smallest version id that is being guarded by meta node safe points.
138    pub min_safepoint_version_id: IntGauge,
139    /// Compaction groups that is in write stop state.
140    pub write_stop_compaction_groups: IntGaugeVec,
141    /// The number of attempts to trigger full GC.
142    pub full_gc_trigger_count: IntGauge,
143    /// The number of candidate object to delete after scanning object store.
144    pub full_gc_candidate_object_count: Histogram,
145    /// The number of object to delete after filtering by meta node.
146    pub full_gc_selected_object_count: Histogram,
147    /// Hummock version stats
148    pub version_stats: IntGaugeVec,
149    /// Hummock version stats
150    pub materialized_view_stats: IntGaugeVec,
151    /// Total number of objects that is no longer referenced by versions.
152    pub stale_object_count: IntGauge,
153    /// Total size of objects that is no longer referenced by versions.
154    pub stale_object_size: IntGauge,
155    /// Total number of objects that is still referenced by non-current versions.
156    pub old_version_object_count: IntGauge,
157    /// Total size of objects that is still referenced by non-current versions.
158    pub old_version_object_size: IntGauge,
159    /// Total number of objects that is referenced by time travel.
160    pub time_travel_object_count: IntGauge,
161    /// Total number of objects that is referenced by current version.
162    pub current_version_object_count: IntGauge,
163    /// Total size of objects that is referenced by current version.
164    pub current_version_object_size: IntGauge,
165    /// Total number of objects that includes dangling objects.
166    pub total_object_count: IntGauge,
167    /// Total size of objects that includes dangling objects.
168    pub total_object_size: IntGauge,
169    /// Number of objects per table change log.
170    pub table_change_log_object_count: IntGaugeVec,
171    /// Size of objects per table change log.
172    pub table_change_log_object_size: IntGaugeVec,
173    /// The number of hummock version delta log.
174    pub delta_log_count: IntGauge,
175    /// latency of version checkpoint
176    pub version_checkpoint_latency: Histogram,
177    /// Latency for hummock manager to acquire lock
178    pub hummock_manager_lock_time: HistogramVec,
179    /// Latency for hummock manager to really process a request after acquire the lock
180    pub hummock_manager_real_process_time: HistogramVec,
181    /// The number of compactions from one level to another level that have been skipped
182    pub compact_skip_frequency: IntCounterVec,
183    /// Bytes of lsm tree needed to reach balance
184    pub compact_pending_bytes: IntGaugeVec,
185    /// Per level compression ratio
186    pub compact_level_compression_ratio: GenericGaugeVec<AtomicF64>,
187    /// Per level number of running compaction task
188    pub level_compact_task_cnt: IntGaugeVec,
189    pub time_after_last_observation: Arc<AtomicU64>,
190    pub l0_compact_level_count: HistogramVec,
191    pub compact_task_size: HistogramVec,
192    pub compact_task_file_count: HistogramVec,
193    pub compact_task_batch_count: HistogramVec,
194    pub split_compaction_group_count: IntCounterVec,
195    pub state_table_count: IntGaugeVec,
196    pub branched_sst_count: IntGaugeVec,
197    pub compact_task_trivial_move_sst_count: HistogramVec,
198
199    pub compaction_event_consumed_latency: Histogram,
200    pub compaction_event_loop_iteration_latency: Histogram,
201    pub time_travel_vacuum_metadata_latency: Histogram,
202    pub time_travel_write_metadata_latency: Histogram,
203
204    // ********************************** Object Store ************************************
205    // Object store related metrics (for backup/restore and version checkpoint)
206    pub object_store_metric: Arc<ObjectStoreMetrics>,
207
208    // ********************************** Source ************************************
209    /// supervisor for which source is still up.
210    pub source_is_up: LabelGuardedIntGaugeVec,
211    pub source_enumerator_metrics: Arc<SourceEnumeratorMetrics>,
212
213    // ********************************** Fragment ************************************
214    /// A dummy gauge metrics with its label to be the mapping from actor id to fragment id
215    pub actor_info: IntGaugeVec,
216    /// A dummy gauge metrics with its label to be the mapping from table id to actor id
217    pub table_info: IntGaugeVec,
218    /// A dummy gauge metrics with its label to be the mapping from actor id to sink id
219    pub sink_info: IntGaugeVec,
220    /// A dummy gauge metrics with its label to be relation info
221    pub relation_info: IntGaugeVec,
222    /// Backfill progress per fragment
223    pub backfill_fragment_progress: IntGaugeVec,
224
225    // ********************************** System Params ************************************
226    /// A dummy gauge metric with labels carrying system parameter info.
227    /// Labels: (name, value)
228    pub system_param_info: IntGaugeVec,
229
230    /// Write throughput of commit epoch for each stable
231    pub table_write_throughput: IntCounterVec,
232
233    /// The number of compaction groups that have been triggered to move
234    pub merge_compaction_group_count: IntCounterVec,
235
236    // ********************************** Auto Schema Change ************************************
237    pub auto_schema_change_failure_cnt: LabelGuardedIntCounterVec,
238    pub auto_schema_change_success_cnt: LabelGuardedIntCounterVec,
239    pub auto_schema_change_latency: LabelGuardedHistogramVec,
240
241    pub time_travel_version_replay_latency: Histogram,
242
243    pub compaction_group_count: IntGauge,
244    pub compaction_group_size: IntGaugeVec,
245    pub compaction_group_file_count: IntGaugeVec,
246    pub compaction_group_throughput: IntGaugeVec,
247
248    // ********************************** Refresh Manager ************************************
249    pub refresh_job_duration: LabelGuardedUintGaugeVec,
250    pub refresh_job_finish_cnt: LabelGuardedIntCounterVec,
251    pub refresh_cron_job_trigger_cnt: LabelGuardedIntCounterVec,
252    pub refresh_cron_job_miss_cnt: LabelGuardedIntCounterVec,
253}
254
255pub static GLOBAL_META_METRICS: LazyLock<MetaMetrics> =
256    LazyLock::new(|| MetaMetrics::new(&GLOBAL_METRICS_REGISTRY));
257
258impl MetaMetrics {
259    fn new(registry: &Registry) -> Self {
260        let opts = histogram_opts!(
261            "meta_grpc_duration_seconds",
262            "gRPC latency of meta services",
263            exponential_buckets(0.0001, 2.0, 20).unwrap() // max 52s
264        );
265        let grpc_latency =
266            register_histogram_vec_with_registry!(opts, &["path"], registry).unwrap();
267
268        let opts = histogram_opts!(
269            "meta_barrier_duration_seconds",
270            "barrier latency",
271            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
272        );
273        let barrier_latency =
274            register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
275                .unwrap();
276
277        let opts = histogram_opts!(
278            "meta_barrier_wait_commit_duration_seconds",
279            "barrier_wait_commit_latency",
280            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
281        );
282        let barrier_wait_commit_latency =
283            register_histogram_with_registry!(opts, registry).unwrap();
284
285        let opts = histogram_opts!(
286            "meta_barrier_send_duration_seconds",
287            "barrier send latency",
288            exponential_buckets(0.1, 1.5, 19).unwrap() // max 148s
289        );
290        let barrier_send_latency =
291            register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
292                .unwrap();
293        let barrier_interval_by_database = register_gauge_vec_with_registry!(
294            "meta_barrier_interval_by_database",
295            "barrier interval of each database",
296            &["database_id"],
297            registry
298        )
299        .unwrap();
300
301        let all_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
302            "all_barrier_nums",
303            "num of of all_barrier",
304            &["database_id"],
305            registry
306        )
307        .unwrap();
308        let in_flight_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
309            "in_flight_barrier_nums",
310            "num of of in_flight_barrier",
311            &["database_id"],
312            registry
313        )
314        .unwrap();
315        let last_committed_barrier_time = register_int_gauge_vec_with_registry!(
316            "last_committed_barrier_time",
317            "The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.",
318            &["database_id"],
319            registry
320        )
321        .unwrap();
322
323        // snapshot backfill metrics
324        let opts = histogram_opts!(
325            "meta_snapshot_backfill_barrier_duration_seconds",
326            "snapshot backfill barrier latency",
327            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
328        );
329        let snapshot_backfill_barrier_latency = register_guarded_histogram_vec_with_registry!(
330            opts,
331            &["table_id", "barrier_type"],
332            registry
333        )
334        .unwrap();
335        let opts = histogram_opts!(
336            "meta_snapshot_backfill_barrier_wait_commit_duration_seconds",
337            "snapshot backfill barrier_wait_commit_latency",
338            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
339        );
340        let snapshot_backfill_wait_commit_latency =
341            register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
342
343        let snapshot_backfill_lag = register_guarded_int_gauge_vec_with_registry!(
344            "meta_snapshot_backfill_upstream_lag",
345            "snapshot backfill upstream_lag",
346            &["table_id"],
347            registry
348        )
349        .unwrap();
350        let snapshot_backfill_inflight_barrier_num = register_guarded_int_gauge_vec_with_registry!(
351            "meta_snapshot_backfill_inflight_barrier_num",
352            "snapshot backfill inflight_barrier_num",
353            &["table_id"],
354            registry
355        )
356        .unwrap();
357
358        let max_committed_epoch = register_int_gauge_with_registry!(
359            "storage_max_committed_epoch",
360            "max committed epoch",
361            registry
362        )
363        .unwrap();
364
365        let min_committed_epoch = register_int_gauge_with_registry!(
366            "storage_min_committed_epoch",
367            "min committed epoch",
368            registry
369        )
370        .unwrap();
371
372        let level_sst_num = register_int_gauge_vec_with_registry!(
373            "storage_level_sst_num",
374            "num of SSTs in each level",
375            &["level_index"],
376            registry
377        )
378        .unwrap();
379
380        let level_compact_cnt = register_int_gauge_vec_with_registry!(
381            "storage_level_compact_cnt",
382            "num of SSTs to be merged to next level in each level",
383            &["level_index"],
384            registry
385        )
386        .unwrap();
387
388        let compact_frequency = register_int_counter_vec_with_registry!(
389            "storage_level_compact_frequency",
390            "The number of compactions from one level to another level that have completed or failed.",
391            &["compactor", "group", "task_type", "result"],
392            registry
393        )
394        .unwrap();
395        let compact_skip_frequency = register_int_counter_vec_with_registry!(
396            "storage_skip_compact_frequency",
397            "The number of compactions from one level to another level that have been skipped.",
398            &["level", "type"],
399            registry
400        )
401        .unwrap();
402
403        let version_size =
404            register_int_gauge_with_registry!("storage_version_size", "version size", registry)
405                .unwrap();
406
407        let current_version_id = register_int_gauge_with_registry!(
408            "storage_current_version_id",
409            "current version id",
410            registry
411        )
412        .unwrap();
413
414        let checkpoint_version_id = register_int_gauge_with_registry!(
415            "storage_checkpoint_version_id",
416            "checkpoint version id",
417            registry
418        )
419        .unwrap();
420
421        let min_pinned_version_id = register_int_gauge_with_registry!(
422            "storage_min_pinned_version_id",
423            "min pinned version id",
424            registry
425        )
426        .unwrap();
427
428        let write_stop_compaction_groups = register_int_gauge_vec_with_registry!(
429            "storage_write_stop_compaction_groups",
430            "compaction groups of write stop state",
431            &["compaction_group_id"],
432            registry
433        )
434        .unwrap();
435
436        let full_gc_trigger_count = register_int_gauge_with_registry!(
437            "storage_full_gc_trigger_count",
438            "the number of attempts to trigger full GC",
439            registry
440        )
441        .unwrap();
442
443        let opts = histogram_opts!(
444            "storage_full_gc_candidate_object_count",
445            "the number of candidate object to delete after scanning object store",
446            exponential_buckets(1.0, 10.0, 6).unwrap()
447        );
448        let full_gc_candidate_object_count =
449            register_histogram_with_registry!(opts, registry).unwrap();
450
451        let opts = histogram_opts!(
452            "storage_full_gc_selected_object_count",
453            "the number of object to delete after filtering by meta node",
454            exponential_buckets(1.0, 10.0, 6).unwrap()
455        );
456        let full_gc_selected_object_count =
457            register_histogram_with_registry!(opts, registry).unwrap();
458
459        let min_safepoint_version_id = register_int_gauge_with_registry!(
460            "storage_min_safepoint_version_id",
461            "min safepoint version id",
462            registry
463        )
464        .unwrap();
465
466        let level_file_size = register_int_gauge_vec_with_registry!(
467            "storage_level_total_file_size",
468            "KBs total file bytes in each level",
469            &["level_index"],
470            registry
471        )
472        .unwrap();
473
474        let version_stats = register_int_gauge_vec_with_registry!(
475            "storage_version_stats",
476            "per table stats in current hummock version",
477            &["table_id", "metric"],
478            registry
479        )
480        .unwrap();
481
482        let materialized_view_stats = register_int_gauge_vec_with_registry!(
483            "storage_materialized_view_stats",
484            "per materialized view stats in current hummock version",
485            &["table_id", "metric"],
486            registry
487        )
488        .unwrap();
489
490        let stale_object_count = register_int_gauge_with_registry!(
491            "storage_stale_object_count",
492            "total number of objects that is no longer referenced by versions.",
493            registry
494        )
495        .unwrap();
496
497        let stale_object_size = register_int_gauge_with_registry!(
498            "storage_stale_object_size",
499            "total size of objects that is no longer referenced by versions.",
500            registry
501        )
502        .unwrap();
503
504        let old_version_object_count = register_int_gauge_with_registry!(
505            "storage_old_version_object_count",
506            "total number of objects that is still referenced by non-current versions",
507            registry
508        )
509        .unwrap();
510
511        let old_version_object_size = register_int_gauge_with_registry!(
512            "storage_old_version_object_size",
513            "total size of objects that is still referenced by non-current versions",
514            registry
515        )
516        .unwrap();
517
518        let current_version_object_count = register_int_gauge_with_registry!(
519            "storage_current_version_object_count",
520            "total number of objects that is referenced by current version",
521            registry
522        )
523        .unwrap();
524
525        let current_version_object_size = register_int_gauge_with_registry!(
526            "storage_current_version_object_size",
527            "total size of objects that is referenced by current version",
528            registry
529        )
530        .unwrap();
531
532        let total_object_count = register_int_gauge_with_registry!(
533            "storage_total_object_count",
534            "Total number of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
535            registry
536        ).unwrap();
537
538        let total_object_size = register_int_gauge_with_registry!(
539            "storage_total_object_size",
540            "Total size of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
541            registry
542        ).unwrap();
543
544        let table_change_log_object_count = register_int_gauge_vec_with_registry!(
545            "storage_table_change_log_object_count",
546            "per table change log object count",
547            &["table_id"],
548            registry
549        )
550        .unwrap();
551
552        let table_change_log_object_size = register_int_gauge_vec_with_registry!(
553            "storage_table_change_log_object_size",
554            "per table change log object size",
555            &["table_id"],
556            registry
557        )
558        .unwrap();
559
560        let time_travel_object_count = register_int_gauge_with_registry!(
561            "storage_time_travel_object_count",
562            "total number of objects that is referenced by time travel.",
563            registry
564        )
565        .unwrap();
566
567        let delta_log_count = register_int_gauge_with_registry!(
568            "storage_delta_log_count",
569            "total number of hummock version delta log",
570            registry
571        )
572        .unwrap();
573
574        let opts = histogram_opts!(
575            "storage_version_checkpoint_latency",
576            "hummock version checkpoint latency",
577            exponential_buckets(0.1, 1.5, 20).unwrap()
578        );
579        let version_checkpoint_latency = register_histogram_with_registry!(opts, registry).unwrap();
580
581        let hummock_manager_lock_time = register_histogram_vec_with_registry!(
582            "hummock_manager_lock_time",
583            "latency for hummock manager to acquire the rwlock",
584            &["lock_name", "lock_type"],
585            registry
586        )
587        .unwrap();
588
589        let hummock_manager_real_process_time = register_histogram_vec_with_registry!(
590            "meta_hummock_manager_real_process_time",
591            "latency for hummock manager to really process the request",
592            &["method"],
593            registry
594        )
595        .unwrap();
596
597        let worker_num = register_int_gauge_vec_with_registry!(
598            "worker_num",
599            "number of nodes in the cluster",
600            &["worker_type"],
601            registry,
602        )
603        .unwrap();
604
605        let meta_type = register_int_gauge_vec_with_registry!(
606            "meta_num",
607            "role of meta nodes in the cluster",
608            &["worker_addr", "role"],
609            registry,
610        )
611        .unwrap();
612
613        let compact_pending_bytes = register_int_gauge_vec_with_registry!(
614            "storage_compact_pending_bytes",
615            "bytes of lsm tree needed to reach balance",
616            &["group"],
617            registry
618        )
619        .unwrap();
620
621        let compact_level_compression_ratio = register_gauge_vec_with_registry!(
622            "storage_compact_level_compression_ratio",
623            "compression ratio of each level of the lsm tree",
624            &["group", "level", "algorithm"],
625            registry
626        )
627        .unwrap();
628
629        let level_compact_task_cnt = register_int_gauge_vec_with_registry!(
630            "storage_level_compact_task_cnt",
631            "num of compact_task organized by group and level",
632            &["task"],
633            registry
634        )
635        .unwrap();
636
637        let time_travel_vacuum_metadata_latency = register_histogram_with_registry!(
638            histogram_opts!(
639                "storage_time_travel_vacuum_metadata_latency",
640                "Latency of vacuuming metadata for time travel",
641                exponential_buckets(0.1, 1.5, 20).unwrap()
642            ),
643            registry
644        )
645        .unwrap();
646        let time_travel_write_metadata_latency = register_histogram_with_registry!(
647            histogram_opts!(
648                "storage_time_travel_write_metadata_latency",
649                "Latency of writing metadata for time travel",
650                exponential_buckets(0.1, 1.5, 20).unwrap()
651            ),
652            registry
653        )
654        .unwrap();
655
656        let object_store_metric = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
657
658        let recovery_failure_cnt = register_int_counter_vec_with_registry!(
659            "recovery_failure_cnt",
660            "Number of failed recovery attempts",
661            &["recovery_type"],
662            registry
663        )
664        .unwrap();
665        let opts = histogram_opts!(
666            "recovery_latency",
667            "Latency of the recovery process",
668            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
669        );
670        let recovery_latency =
671            register_histogram_vec_with_registry!(opts, &["recovery_type"], registry).unwrap();
672
673        let auto_schema_change_failure_cnt = register_guarded_int_counter_vec_with_registry!(
674            "auto_schema_change_failure_cnt",
675            "Number of failed auto schema change",
676            &["table_id", "table_name"],
677            registry
678        )
679        .unwrap();
680
681        let auto_schema_change_success_cnt = register_guarded_int_counter_vec_with_registry!(
682            "auto_schema_change_success_cnt",
683            "Number of success auto schema change",
684            &["table_id", "table_name"],
685            registry
686        )
687        .unwrap();
688
689        let opts = histogram_opts!(
690            "auto_schema_change_latency",
691            "Latency of the auto schema change process",
692            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
693        );
694        let auto_schema_change_latency = register_guarded_histogram_vec_with_registry!(
695            opts,
696            &["table_id", "table_name"],
697            registry
698        )
699        .unwrap();
700
701        let source_is_up = register_guarded_int_gauge_vec_with_registry!(
702            "source_status_is_up",
703            "source is up or not",
704            &["source_id", "source_name"],
705            registry
706        )
707        .unwrap();
708        let source_enumerator_metrics = Arc::new(SourceEnumeratorMetrics::default());
709
710        let actor_info = register_int_gauge_vec_with_registry!(
711            "actor_info",
712            "Mapping from actor id to (fragment id, compute node)",
713            &["actor_id", "fragment_id", "compute_node"],
714            registry
715        )
716        .unwrap();
717
718        let table_info = register_int_gauge_vec_with_registry!(
719            "table_info",
720            "Mapping from table id to (actor id, table name)",
721            &[
722                "materialized_view_id",
723                "table_id",
724                "fragment_id",
725                "table_name",
726                "table_type",
727                "compaction_group_id"
728            ],
729            registry
730        )
731        .unwrap();
732
733        let sink_info = register_int_gauge_vec_with_registry!(
734            "sink_info",
735            "Mapping from actor id to (actor id, sink name)",
736            &["actor_id", "sink_id", "sink_name",],
737            registry
738        )
739        .unwrap();
740
741        let relation_info = register_int_gauge_vec_with_registry!(
742            "relation_info",
743            "Information of the database relation (table/source/sink/materialized view/index/internal)",
744            &["id", "database", "schema", "name", "resource_group", "type"],
745            registry
746        )
747        .unwrap();
748
749        let backfill_fragment_progress = register_int_gauge_vec_with_registry!(
750            "backfill_fragment_progress",
751            "Backfill progress per fragment",
752            &[
753                "job_id",
754                "fragment_id",
755                "backfill_state_table_id",
756                "backfill_target_relation_id",
757                "backfill_target_relation_name",
758                "backfill_target_relation_type",
759                "backfill_type",
760                "backfill_epoch",
761                "upstream_type",
762                "backfill_progress",
763            ],
764            registry
765        )
766        .unwrap();
767
768        let l0_compact_level_count = register_histogram_vec_with_registry!(
769            "storage_l0_compact_level_count",
770            "level_count of l0 compact task",
771            &["group", "type"],
772            registry
773        )
774        .unwrap();
775
776        // System parameter info
777        let system_param_info = register_int_gauge_vec_with_registry!(
778            "system_param_info",
779            "Information of system parameters",
780            &["name", "value"],
781            registry
782        )
783        .unwrap();
784
785        let opts = histogram_opts!(
786            "storage_compact_task_size",
787            "Total size of compact that have been issued to state store",
788            exponential_buckets(1048576.0, 2.0, 16).unwrap()
789        );
790
791        let compact_task_size =
792            register_histogram_vec_with_registry!(opts, &["group", "type"], registry).unwrap();
793
794        let compact_task_file_count = register_histogram_vec_with_registry!(
795            "storage_compact_task_file_count",
796            "file count of compact task",
797            &["group", "type"],
798            registry
799        )
800        .unwrap();
801        let opts = histogram_opts!(
802            "storage_compact_task_batch_count",
803            "count of compact task batch",
804            exponential_buckets(1.0, 2.0, 8).unwrap()
805        );
806        let compact_task_batch_count =
807            register_histogram_vec_with_registry!(opts, &["type"], registry).unwrap();
808
809        let table_write_throughput = register_int_counter_vec_with_registry!(
810            "storage_commit_write_throughput",
811            "The number of compactions from one level to another level that have been skipped.",
812            &["table_id"],
813            registry
814        )
815        .unwrap();
816
817        let split_compaction_group_count = register_int_counter_vec_with_registry!(
818            "storage_split_compaction_group_count",
819            "Count of trigger split compaction group",
820            &["group"],
821            registry
822        )
823        .unwrap();
824
825        let state_table_count = register_int_gauge_vec_with_registry!(
826            "storage_state_table_count",
827            "Count of stable table per compaction group",
828            &["group"],
829            registry
830        )
831        .unwrap();
832
833        let branched_sst_count = register_int_gauge_vec_with_registry!(
834            "storage_branched_sst_count",
835            "Count of branched sst per compaction group",
836            &["group"],
837            registry
838        )
839        .unwrap();
840
841        let opts = histogram_opts!(
842            "storage_compaction_event_consumed_latency",
843            "The latency(ms) of each event being consumed",
844            exponential_buckets(1.0, 1.5, 30).unwrap() // max 191s
845        );
846        let compaction_event_consumed_latency =
847            register_histogram_with_registry!(opts, registry).unwrap();
848
849        let opts = histogram_opts!(
850            "storage_compaction_event_loop_iteration_latency",
851            "The latency(ms) of each iteration of the compaction event loop",
852            exponential_buckets(1.0, 1.5, 30).unwrap() // max 191s
853        );
854        let compaction_event_loop_iteration_latency =
855            register_histogram_with_registry!(opts, registry).unwrap();
856
857        let merge_compaction_group_count = register_int_counter_vec_with_registry!(
858            "storage_merge_compaction_group_count",
859            "Count of trigger merge compaction group",
860            &["group"],
861            registry
862        )
863        .unwrap();
864
865        let opts = histogram_opts!(
866            "storage_time_travel_version_replay_latency",
867            "The latency(ms) of replaying a hummock version for time travel",
868            exponential_buckets(0.01, 10.0, 6).unwrap()
869        );
870        let time_travel_version_replay_latency =
871            register_histogram_with_registry!(opts, registry).unwrap();
872
873        let compaction_group_count = register_int_gauge_with_registry!(
874            "storage_compaction_group_count",
875            "The number of compaction groups",
876            registry,
877        )
878        .unwrap();
879
880        let compaction_group_size = register_int_gauge_vec_with_registry!(
881            "storage_compaction_group_size",
882            "The size of compaction group",
883            &["group"],
884            registry
885        )
886        .unwrap();
887
888        let compaction_group_file_count = register_int_gauge_vec_with_registry!(
889            "storage_compaction_group_file_count",
890            "The file count of compaction group",
891            &["group"],
892            registry
893        )
894        .unwrap();
895
896        let compaction_group_throughput = register_int_gauge_vec_with_registry!(
897            "storage_compaction_group_throughput",
898            "The throughput of compaction group",
899            &["group"],
900            registry
901        )
902        .unwrap();
903
904        let opts = histogram_opts!(
905            "storage_compact_task_trivial_move_sst_count",
906            "sst count of compact trivial-move task",
907            exponential_buckets(1.0, 2.0, 8).unwrap()
908        );
909        let compact_task_trivial_move_sst_count =
910            register_histogram_vec_with_registry!(opts, &["group"], registry).unwrap();
911
912        let refresh_job_duration = register_guarded_uint_gauge_vec_with_registry!(
913            "meta_refresh_job_duration",
914            "The duration of refresh job",
915            &["table_id", "status"],
916            registry
917        )
918        .unwrap();
919        let refresh_job_finish_cnt = register_guarded_int_counter_vec_with_registry!(
920            "meta_refresh_job_finish_cnt",
921            "The number of finished refresh jobs",
922            &["table_id", "status"],
923            registry
924        )
925        .unwrap();
926        let refresh_cron_job_trigger_cnt = register_guarded_int_counter_vec_with_registry!(
927            "meta_refresh_cron_job_trigger_cnt",
928            "The number of cron refresh jobs triggered",
929            &["table_id"],
930            registry
931        )
932        .unwrap();
933        let refresh_cron_job_miss_cnt = register_guarded_int_counter_vec_with_registry!(
934            "meta_refresh_cron_job_miss_cnt",
935            "The number of cron refresh jobs missed",
936            &["table_id"],
937            registry
938        )
939        .unwrap();
940
941        Self {
942            grpc_latency,
943            barrier_latency,
944            barrier_wait_commit_latency,
945            barrier_send_latency,
946            all_barrier_nums,
947            in_flight_barrier_nums,
948            last_committed_barrier_time,
949            barrier_interval_by_database,
950            snapshot_backfill_barrier_latency,
951            snapshot_backfill_wait_commit_latency,
952            snapshot_backfill_lag,
953            snapshot_backfill_inflight_barrier_num,
954            recovery_failure_cnt,
955            recovery_latency,
956
957            max_committed_epoch,
958            min_committed_epoch,
959            level_sst_num,
960            level_compact_cnt,
961            compact_frequency,
962            compact_skip_frequency,
963            level_file_size,
964            version_size,
965            version_stats,
966            materialized_view_stats,
967            stale_object_count,
968            stale_object_size,
969            old_version_object_count,
970            old_version_object_size,
971            time_travel_object_count,
972            current_version_object_count,
973            current_version_object_size,
974            total_object_count,
975            total_object_size,
976            table_change_log_object_count,
977            table_change_log_object_size,
978            delta_log_count,
979            version_checkpoint_latency,
980            current_version_id,
981            checkpoint_version_id,
982            min_pinned_version_id,
983            min_safepoint_version_id,
984            write_stop_compaction_groups,
985            full_gc_trigger_count,
986            full_gc_candidate_object_count,
987            full_gc_selected_object_count,
988            hummock_manager_lock_time,
989            hummock_manager_real_process_time,
990            time_after_last_observation: Arc::new(AtomicU64::new(0)),
991            worker_num,
992            meta_type,
993            compact_pending_bytes,
994            compact_level_compression_ratio,
995            level_compact_task_cnt,
996            object_store_metric,
997            source_is_up,
998            source_enumerator_metrics,
999            actor_info,
1000            table_info,
1001            sink_info,
1002            relation_info,
1003            backfill_fragment_progress,
1004            system_param_info,
1005            l0_compact_level_count,
1006            compact_task_size,
1007            compact_task_file_count,
1008            compact_task_batch_count,
1009            compact_task_trivial_move_sst_count,
1010            table_write_throughput,
1011            split_compaction_group_count,
1012            state_table_count,
1013            branched_sst_count,
1014            compaction_event_consumed_latency,
1015            compaction_event_loop_iteration_latency,
1016            auto_schema_change_failure_cnt,
1017            auto_schema_change_success_cnt,
1018            auto_schema_change_latency,
1019            merge_compaction_group_count,
1020            time_travel_version_replay_latency,
1021            compaction_group_count,
1022            compaction_group_size,
1023            compaction_group_file_count,
1024            compaction_group_throughput,
1025            refresh_job_duration,
1026            refresh_job_finish_cnt,
1027            refresh_cron_job_trigger_cnt,
1028            refresh_cron_job_miss_cnt,
1029            time_travel_vacuum_metadata_latency,
1030            time_travel_write_metadata_latency,
1031        }
1032    }
1033
1034    #[cfg(test)]
1035    pub fn for_test(registry: &Registry) -> Self {
1036        Self::new(registry)
1037    }
1038}
1039impl Default for MetaMetrics {
1040    fn default() -> Self {
1041        GLOBAL_META_METRICS.clone()
1042    }
1043}
1044
1045/// Refresh `system_param_info` metrics by reading current system parameters.
1046pub async fn refresh_system_param_info_metrics(
1047    system_params_controller: &SystemParamsControllerRef,
1048    meta_metrics: Arc<MetaMetrics>,
1049) {
1050    let params_info = system_params_controller.get_params().await.get_all();
1051
1052    meta_metrics.system_param_info.reset();
1053    for info in params_info {
1054        meta_metrics
1055            .system_param_info
1056            .with_label_values(&[info.name, &info.value])
1057            .set(1);
1058    }
1059}
1060
1061pub fn start_worker_info_monitor(
1062    metadata_manager: MetadataManager,
1063    election_client: ElectionClientRef,
1064    interval: Duration,
1065    meta_metrics: Arc<MetaMetrics>,
1066) -> (JoinHandle<()>, Sender<()>) {
1067    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1068    let join_handle = tokio::spawn(async move {
1069        let mut monitor_interval = tokio::time::interval(interval);
1070        monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1071        loop {
1072            tokio::select! {
1073                // Wait for interval
1074                _ = monitor_interval.tick() => {},
1075                // Shutdown monitor
1076                _ = &mut shutdown_rx => {
1077                    tracing::info!("Worker number monitor is stopped");
1078                    return;
1079                }
1080            }
1081
1082            let node_map = match metadata_manager.count_worker_node().await {
1083                Ok(node_map) => node_map,
1084                Err(err) => {
1085                    tracing::warn!(error = %err.as_report(), "fail to count worker node");
1086                    continue;
1087                }
1088            };
1089
1090            // Reset metrics to clean the stale labels e.g. invalid lease ids
1091            meta_metrics.worker_num.reset();
1092            meta_metrics.meta_type.reset();
1093
1094            for (worker_type, worker_num) in node_map {
1095                meta_metrics
1096                    .worker_num
1097                    .with_label_values(&[(worker_type.as_str_name())])
1098                    .set(worker_num as i64);
1099            }
1100            if let Ok(meta_members) = election_client.get_members().await {
1101                meta_metrics
1102                    .worker_num
1103                    .with_label_values(&[WorkerType::Meta.as_str_name()])
1104                    .set(meta_members.len() as i64);
1105                meta_members.into_iter().for_each(|m| {
1106                    let role = if m.is_leader { "leader" } else { "follower" };
1107                    meta_metrics
1108                        .meta_type
1109                        .with_label_values(&[m.id.as_str(), role])
1110                        .set(1);
1111                });
1112            }
1113        }
1114    });
1115
1116    (join_handle, shutdown_tx)
1117}
1118
1119pub async fn refresh_fragment_info_metrics(
1120    catalog_controller: &CatalogControllerRef,
1121    cluster_controller: &ClusterControllerRef,
1122    hummock_manager: &HummockManagerRef,
1123    meta_metrics: Arc<MetaMetrics>,
1124) {
1125    let worker_nodes = match cluster_controller
1126        .list_workers(Some(WorkerType::ComputeNode.into()), None)
1127        .await
1128    {
1129        Ok(worker_nodes) => worker_nodes,
1130        Err(err) => {
1131            tracing::warn!(error=%err.as_report(), "fail to list worker node");
1132            return;
1133        }
1134    };
1135    let actor_locations = match catalog_controller.list_actor_locations() {
1136        Ok(actor_locations) => actor_locations,
1137        Err(err) => {
1138            tracing::warn!(error=%err.as_report(), "fail to get actor locations");
1139            return;
1140        }
1141    };
1142    let sink_actor_mapping = match catalog_controller.list_sink_actor_mapping().await {
1143        Ok(sink_actor_mapping) => sink_actor_mapping,
1144        Err(err) => {
1145            tracing::warn!(error=%err.as_report(), "fail to get sink actor mapping");
1146            return;
1147        }
1148    };
1149    let fragment_state_tables = match catalog_controller.list_fragment_state_tables().await {
1150        Ok(fragment_state_tables) => fragment_state_tables,
1151        Err(err) => {
1152            tracing::warn!(error=%err.as_report(), "fail to get fragment state tables");
1153            return;
1154        }
1155    };
1156    let table_name_and_type_mapping = match catalog_controller.get_table_name_type_mapping().await {
1157        Ok(mapping) => mapping,
1158        Err(err) => {
1159            tracing::warn!(error=%err.as_report(), "fail to get table name mapping");
1160            return;
1161        }
1162    };
1163
1164    let worker_addr_mapping: HashMap<WorkerId, String> = worker_nodes
1165        .into_iter()
1166        .map(|worker_node| {
1167            let addr = match worker_node.host {
1168                Some(host) => format!("{}:{}", host.host, host.port),
1169                None => "".to_owned(),
1170            };
1171            (worker_node.id, addr)
1172        })
1173        .collect();
1174    let table_compaction_group_id_mapping = hummock_manager
1175        .get_table_compaction_group_id_mapping()
1176        .await;
1177
1178    // Start fresh with a reset to clear all outdated labels. This is safe since we always
1179    // report full info on each interval.
1180    meta_metrics.actor_info.reset();
1181    meta_metrics.table_info.reset();
1182    meta_metrics.sink_info.reset();
1183    for actor_location in actor_locations {
1184        let actor_id_str = actor_location.actor_id.to_string();
1185        let fragment_id_str = actor_location.fragment_id.to_string();
1186        // Report a dummy gauge metrics with (fragment id, actor id, node
1187        // address) as its label
1188        if let Some(address) = worker_addr_mapping.get(&actor_location.worker_id) {
1189            meta_metrics
1190                .actor_info
1191                .with_label_values(&[&actor_id_str, &fragment_id_str, address])
1192                .set(1);
1193        }
1194    }
1195    for (sink_id, (sink_name, actor_ids)) in sink_actor_mapping {
1196        let sink_id_str = sink_id.to_string();
1197        for actor_id in actor_ids {
1198            let actor_id_str = actor_id.to_string();
1199            meta_metrics
1200                .sink_info
1201                .with_label_values(&[&actor_id_str, &sink_id_str, &sink_name])
1202                .set(1);
1203        }
1204    }
1205    for PartialFragmentStateTables {
1206        fragment_id,
1207        job_id,
1208        state_table_ids,
1209    } in fragment_state_tables
1210    {
1211        let fragment_id_str = fragment_id.to_string();
1212        let job_id_str = job_id.to_string();
1213        for table_id in state_table_ids.into_inner() {
1214            let table_id_str = table_id.to_string();
1215            let (table_name, table_type) = table_name_and_type_mapping
1216                .get(&table_id)
1217                .cloned()
1218                .unwrap_or_else(|| ("unknown".to_owned(), "unknown".to_owned()));
1219            let compaction_group_id = table_compaction_group_id_mapping
1220                .get(&table_id)
1221                .map(|cg_id| cg_id.to_string())
1222                .unwrap_or_else(|| "unknown".to_owned());
1223            meta_metrics
1224                .table_info
1225                .with_label_values(&[
1226                    &job_id_str,
1227                    &table_id_str,
1228                    &fragment_id_str,
1229                    &table_name,
1230                    &table_type,
1231                    &compaction_group_id,
1232                ])
1233                .set(1);
1234        }
1235    }
1236}
1237
1238pub async fn refresh_relation_info_metrics(
1239    catalog_controller: &CatalogControllerRef,
1240    meta_metrics: Arc<MetaMetrics>,
1241) {
1242    let table_objects = match catalog_controller.list_table_objects().await {
1243        Ok(table_objects) => table_objects,
1244        Err(err) => {
1245            tracing::warn!(error=%err.as_report(), "fail to get table objects");
1246            return;
1247        }
1248    };
1249
1250    let source_objects = match catalog_controller.list_source_objects().await {
1251        Ok(source_objects) => source_objects,
1252        Err(err) => {
1253            tracing::warn!(error=%err.as_report(), "fail to get source objects");
1254            return;
1255        }
1256    };
1257
1258    let sink_objects = match catalog_controller.list_sink_objects().await {
1259        Ok(sink_objects) => sink_objects,
1260        Err(err) => {
1261            tracing::warn!(error=%err.as_report(), "fail to get sink objects");
1262            return;
1263        }
1264    };
1265
1266    meta_metrics.relation_info.reset();
1267
1268    for (id, db, schema, name, resource_group, table_type) in table_objects {
1269        let relation_type = match table_type {
1270            TableType::Table => "table",
1271            TableType::MaterializedView => "materialized_view",
1272            TableType::Index | TableType::VectorIndex => "index",
1273            TableType::Internal => "internal",
1274        };
1275        meta_metrics
1276            .relation_info
1277            .with_label_values(&[
1278                &id.to_string(),
1279                &db,
1280                &schema,
1281                &name,
1282                &resource_group,
1283                &relation_type.to_owned(),
1284            ])
1285            .set(1);
1286    }
1287
1288    for (id, db, schema, name, resource_group) in source_objects {
1289        meta_metrics
1290            .relation_info
1291            .with_label_values(&[
1292                &id.to_string(),
1293                &db,
1294                &schema,
1295                &name,
1296                &resource_group,
1297                &"source".to_owned(),
1298            ])
1299            .set(1);
1300    }
1301
1302    for (id, db, schema, name, resource_group) in sink_objects {
1303        meta_metrics
1304            .relation_info
1305            .with_label_values(&[
1306                &id.to_string(),
1307                &db,
1308                &schema,
1309                &name,
1310                &resource_group,
1311                &"sink".to_owned(),
1312            ])
1313            .set(1);
1314    }
1315}
1316
1317fn extract_backfill_fragment_info(
1318    distribution: &FragmentDistribution,
1319) -> Option<BackfillFragmentInfo> {
1320    let backfill_type =
1321        if distribution.fragment_type_mask & FragmentTypeFlag::SourceScan as u32 != 0 {
1322            "SOURCE"
1323        } else if distribution.fragment_type_mask
1324            & (FragmentTypeFlag::SnapshotBackfillStreamScan as u32
1325                | FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32)
1326            != 0
1327        {
1328            "SNAPSHOT_BACKFILL"
1329        } else if distribution.fragment_type_mask & FragmentTypeFlag::StreamScan as u32 != 0 {
1330            "ARRANGEMENT_OR_NO_SHUFFLE"
1331        } else {
1332            return None;
1333        };
1334
1335    let stream_node = distribution.node.as_ref()?;
1336    let mut info = None;
1337    match backfill_type {
1338        "SOURCE" => {
1339            visit_stream_node_source_backfill(stream_node, |node| {
1340                info = Some(BackfillFragmentInfo {
1341                    job_id: distribution.table_id.as_raw_id(),
1342                    fragment_id: distribution.fragment_id.as_raw_id(),
1343                    backfill_state_table_id: node
1344                        .state_table
1345                        .as_ref()
1346                        .map(|table| table.id.as_raw_id())
1347                        .unwrap_or_default(),
1348                    backfill_target_relation_id: node.upstream_source_id.as_raw_id(),
1349                    backfill_type,
1350                    backfill_epoch: 0,
1351                });
1352            });
1353        }
1354        "SNAPSHOT_BACKFILL" | "ARRANGEMENT_OR_NO_SHUFFLE" => {
1355            visit_stream_node_stream_scan(stream_node, |node| {
1356                info = Some(BackfillFragmentInfo {
1357                    job_id: distribution.table_id.as_raw_id(),
1358                    fragment_id: distribution.fragment_id.as_raw_id(),
1359                    backfill_state_table_id: node
1360                        .state_table
1361                        .as_ref()
1362                        .map(|table| table.id.as_raw_id())
1363                        .unwrap_or_default(),
1364                    backfill_target_relation_id: node.table_id.as_raw_id(),
1365                    backfill_type,
1366                    backfill_epoch: node.snapshot_backfill_epoch.unwrap_or_default(),
1367                });
1368            });
1369        }
1370        _ => {}
1371    }
1372
1373    info
1374}
1375
1376pub async fn refresh_backfill_progress_metrics(
1377    catalog_controller: &CatalogControllerRef,
1378    hummock_manager: &HummockManagerRef,
1379    barrier_manager: &BarrierManagerRef,
1380    meta_metrics: Arc<MetaMetrics>,
1381) {
1382    let fragment_descs = match catalog_controller.list_fragment_descs_with_node(true).await {
1383        Ok(fragment_descs) => fragment_descs,
1384        Err(err) => {
1385            tracing::warn!(error=%err.as_report(), "fail to list creating fragment descs");
1386            return;
1387        }
1388    };
1389
1390    let backfill_infos: HashMap<(u32, u32), BackfillFragmentInfo> = fragment_descs
1391        .iter()
1392        .filter_map(|(distribution, _)| extract_backfill_fragment_info(distribution))
1393        .map(|info| ((info.job_id, info.fragment_id), info))
1394        .collect();
1395
1396    let fragment_progresses = match barrier_manager.get_fragment_backfill_progress().await {
1397        Ok(progress) => progress,
1398        Err(err) => {
1399            tracing::warn!(error=%err.as_report(), "fail to get fragment backfill progress");
1400            return;
1401        }
1402    };
1403
1404    let progress_by_fragment: HashMap<(u32, u32), _> = fragment_progresses
1405        .into_iter()
1406        .map(|progress| {
1407            (
1408                (
1409                    progress.job_id.as_raw_id(),
1410                    progress.fragment_id.as_raw_id(),
1411                ),
1412                progress,
1413            )
1414        })
1415        .collect();
1416
1417    let version_stats = hummock_manager.get_version_stats().await;
1418
1419    let relation_ids: HashSet<_> = backfill_infos
1420        .values()
1421        .map(|info| ObjectId::new(info.backfill_target_relation_id))
1422        .collect();
1423    let relation_objects = match catalog_controller
1424        .list_relation_objects_by_ids(&relation_ids)
1425        .await
1426    {
1427        Ok(relation_objects) => relation_objects,
1428        Err(err) => {
1429            tracing::warn!(error=%err.as_report(), "fail to get relation objects");
1430            return;
1431        }
1432    };
1433
1434    let mut relation_info = HashMap::new();
1435    for (id, db, schema, name, rel_type) in relation_objects {
1436        relation_info.insert(id.as_raw_id(), (db, schema, name, rel_type));
1437    }
1438
1439    meta_metrics.backfill_fragment_progress.reset();
1440
1441    for info in backfill_infos.values() {
1442        let progress = progress_by_fragment.get(&(info.job_id, info.fragment_id));
1443        let (db, schema, name, rel_type) = relation_info
1444            .get(&info.backfill_target_relation_id)
1445            .cloned()
1446            .unwrap_or_else(|| {
1447                (
1448                    "unknown".to_owned(),
1449                    "unknown".to_owned(),
1450                    "unknown".to_owned(),
1451                    "unknown".to_owned(),
1452                )
1453            });
1454
1455        let job_id_str = info.job_id.to_string();
1456        let fragment_id_str = info.fragment_id.to_string();
1457        let backfill_state_table_id_str = info.backfill_state_table_id.to_string();
1458        let backfill_target_relation_id_str = info.backfill_target_relation_id.to_string();
1459        let backfill_target_relation_name_str = format!("{db}.{schema}.{name}");
1460        let backfill_target_relation_type_str = rel_type;
1461        let backfill_type_str = info.backfill_type.to_owned();
1462        let backfill_epoch_str = info.backfill_epoch.to_string();
1463        let total_key_count = version_stats
1464            .table_stats
1465            .get(&TableId::new(info.backfill_target_relation_id))
1466            .map(|stats| stats.total_key_count as u64);
1467
1468        let progress_label = match (info.backfill_type, progress) {
1469            ("SOURCE", Some(progress)) => format!("{} consumed rows", progress.consumed_rows),
1470            ("SOURCE", None) => "0 consumed rows".to_owned(),
1471            (_, Some(progress)) if progress.done => {
1472                let total = total_key_count.unwrap_or(0);
1473                format!("100.0000% ({}/{})", total, total)
1474            }
1475            (_, Some(progress)) => {
1476                let total = total_key_count.unwrap_or(0);
1477                if total == 0 {
1478                    "100.0000% (0/0)".to_owned()
1479                } else {
1480                    let raw = (progress.consumed_rows as f64) / (total as f64) * 100.0;
1481                    format!(
1482                        "{:.4}% ({}/{})",
1483                        raw.min(100.0),
1484                        progress.consumed_rows,
1485                        total
1486                    )
1487                }
1488            }
1489            (_, None) => "0.0000% (0/0)".to_owned(),
1490        };
1491        let upstream_type_str = progress
1492            .map(|progress| progress.upstream_type.to_string())
1493            .unwrap_or_else(|| "Unknown".to_owned());
1494
1495        meta_metrics
1496            .backfill_fragment_progress
1497            .with_label_values(&[
1498                &job_id_str,
1499                &fragment_id_str,
1500                &backfill_state_table_id_str,
1501                &backfill_target_relation_id_str,
1502                &backfill_target_relation_name_str,
1503                &backfill_target_relation_type_str,
1504                &backfill_type_str,
1505                &backfill_epoch_str,
1506                &upstream_type_str,
1507                &progress_label,
1508            ])
1509            .set(1);
1510    }
1511}
1512
1513pub fn start_info_monitor(
1514    metadata_manager: MetadataManager,
1515    hummock_manager: HummockManagerRef,
1516    barrier_manager: BarrierManagerRef,
1517    system_params_controller: SystemParamsControllerRef,
1518    meta_metrics: Arc<MetaMetrics>,
1519) -> (JoinHandle<()>, Sender<()>) {
1520    const COLLECT_INTERVAL_SECONDS: u64 = 60;
1521
1522    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1523    let join_handle = tokio::spawn(async move {
1524        let mut monitor_interval =
1525            tokio::time::interval(Duration::from_secs(COLLECT_INTERVAL_SECONDS));
1526        monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1527        loop {
1528            tokio::select! {
1529                // Wait for interval
1530                _ = monitor_interval.tick() => {},
1531                // Shutdown monitor
1532                _ = &mut shutdown_rx => {
1533                    tracing::info!("Meta info monitor is stopped");
1534                    return;
1535                }
1536            }
1537
1538            // Fragment and relation info
1539            refresh_fragment_info_metrics(
1540                &metadata_manager.catalog_controller,
1541                &metadata_manager.cluster_controller,
1542                &hummock_manager,
1543                meta_metrics.clone(),
1544            )
1545            .await;
1546
1547            refresh_relation_info_metrics(
1548                &metadata_manager.catalog_controller,
1549                meta_metrics.clone(),
1550            )
1551            .await;
1552
1553            refresh_backfill_progress_metrics(
1554                &metadata_manager.catalog_controller,
1555                &hummock_manager,
1556                &barrier_manager,
1557                meta_metrics.clone(),
1558            )
1559            .await;
1560
1561            // System parameter info
1562            refresh_system_param_info_metrics(&system_params_controller, meta_metrics.clone())
1563                .await;
1564        }
1565    });
1566
1567    (join_handle, shutdown_tx)
1568}