risingwave_meta/rpc/
metrics.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::atomic::AtomicU64;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use prometheus::core::{AtomicF64, GenericGaugeVec};
21use prometheus::{
22    GaugeVec, Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
23    exponential_buckets, histogram_opts, register_gauge_vec_with_registry,
24    register_histogram_vec_with_registry, register_histogram_with_registry,
25    register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
26    register_int_gauge_with_registry,
27};
28use risingwave_common::catalog::{FragmentTypeFlag, TableId};
29use risingwave_common::metrics::{
30    LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
31    LabelGuardedUintGaugeVec,
32};
33use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
34use risingwave_common::system_param::reader::SystemParamsRead;
35use risingwave_common::util::stream_graph_visitor::{
36    visit_stream_node_source_backfill, visit_stream_node_stream_scan,
37};
38use risingwave_common::{
39    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
40    register_guarded_int_gauge_vec_with_registry, register_guarded_uint_gauge_vec_with_registry,
41};
42use risingwave_connector::source::monitor::EnumeratorMetrics as SourceEnumeratorMetrics;
43use risingwave_meta_model::table::TableType;
44use risingwave_meta_model::{ObjectId, WorkerId};
45use risingwave_object_store::object::object_metrics::{
46    GLOBAL_OBJECT_STORE_METRICS, ObjectStoreMetrics,
47};
48use risingwave_pb::common::WorkerType;
49use risingwave_pb::meta::FragmentDistribution;
50use thiserror_ext::AsReport;
51use tokio::sync::oneshot::Sender;
52use tokio::task::JoinHandle;
53
54use crate::barrier::BarrierManagerRef;
55use crate::controller::catalog::CatalogControllerRef;
56use crate::controller::cluster::ClusterControllerRef;
57use crate::controller::system_param::SystemParamsControllerRef;
58use crate::controller::utils::PartialFragmentStateTables;
59use crate::hummock::HummockManagerRef;
60use crate::manager::MetadataManager;
61use crate::rpc::ElectionClientRef;
62
63struct BackfillFragmentInfo {
64    job_id: u32,
65    fragment_id: u32,
66    backfill_state_table_id: u32,
67    backfill_target_relation_id: u32,
68    backfill_type: &'static str,
69    backfill_epoch: u64,
70}
71
72#[derive(Clone)]
73pub struct MetaMetrics {
74    // ********************************** Meta ************************************
75    /// The number of workers in the cluster.
76    pub worker_num: IntGaugeVec,
77    /// The roles of all meta nodes in the cluster.
78    pub meta_type: IntGaugeVec,
79
80    // ********************************** gRPC ************************************
81    /// gRPC latency of meta services
82    pub grpc_latency: HistogramVec,
83
84    // ********************************** Barrier ************************************
85    /// The duration from barrier injection to commit
86    /// It is the sum of inflight-latency, sync-latency and wait-commit-latency
87    pub barrier_latency: LabelGuardedHistogramVec,
88    /// The duration from barrier complete to commit
89    pub barrier_wait_commit_latency: Histogram,
90    /// Latency between each barrier send
91    pub barrier_send_latency: LabelGuardedHistogramVec,
92    /// The number of all barriers. It is the sum of barriers that are in-flight or completed but
93    /// waiting for other barriers
94    pub all_barrier_nums: LabelGuardedIntGaugeVec,
95    /// The number of in-flight barriers
96    pub in_flight_barrier_nums: LabelGuardedIntGaugeVec,
97    /// The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.
98    pub last_committed_barrier_time: IntGaugeVec,
99    /// The barrier interval of each database
100    pub barrier_interval_by_database: GaugeVec,
101
102    // ********************************** Snapshot Backfill ***************************
103    /// The barrier latency in second of `table_id` and snapshto backfill `barrier_type`
104    pub snapshot_backfill_barrier_latency: LabelGuardedHistogramVec, // (table_id, barrier_type)
105    /// The lags between the upstream epoch and the downstream epoch.
106    pub snapshot_backfill_lag: LabelGuardedIntGaugeVec, // (table_id, )
107    /// The number of inflight barriers of `table_id`
108    pub snapshot_backfill_inflight_barrier_num: LabelGuardedIntGaugeVec, // (table_id, _)
109
110    // ********************************** Recovery ************************************
111    pub recovery_failure_cnt: IntCounterVec,
112    pub recovery_latency: HistogramVec,
113
114    // ********************************** Hummock ************************************
115    /// Max committed epoch
116    pub max_committed_epoch: IntGauge,
117    /// Min committed epoch
118    pub min_committed_epoch: IntGauge,
119    /// The number of SSTs in each level
120    pub level_sst_num: IntGaugeVec,
121    /// The number of SSTs to be merged to next level in each level
122    pub level_compact_cnt: IntGaugeVec,
123    /// The number of compact tasks
124    pub compact_frequency: IntCounterVec,
125    /// Size of each level
126    pub level_file_size: IntGaugeVec,
127    /// Hummock version size
128    pub version_size: IntGauge,
129    /// The version Id of current version.
130    pub current_version_id: IntGauge,
131    /// The version id of checkpoint version.
132    pub checkpoint_version_id: IntGauge,
133    /// The smallest version id that is being pinned by worker nodes.
134    pub min_pinned_version_id: IntGauge,
135    /// The smallest version id that is being guarded by meta node safe points.
136    pub min_safepoint_version_id: IntGauge,
137    /// Compaction groups that is in write stop state.
138    pub write_stop_compaction_groups: IntGaugeVec,
139    /// The number of attempts to trigger full GC.
140    pub full_gc_trigger_count: IntGauge,
141    /// The number of candidate object to delete after scanning object store.
142    pub full_gc_candidate_object_count: Histogram,
143    /// The number of object to delete after filtering by meta node.
144    pub full_gc_selected_object_count: Histogram,
145    /// Hummock version stats
146    pub version_stats: IntGaugeVec,
147    /// Hummock version stats
148    pub materialized_view_stats: IntGaugeVec,
149    /// Total number of objects that is no longer referenced by versions.
150    pub stale_object_count: IntGauge,
151    /// Total size of objects that is no longer referenced by versions.
152    pub stale_object_size: IntGauge,
153    /// Total number of objects that is still referenced by non-current versions.
154    pub old_version_object_count: IntGauge,
155    /// Total size of objects that is still referenced by non-current versions.
156    pub old_version_object_size: IntGauge,
157    /// Total number of objects that is referenced by time travel.
158    pub time_travel_object_count: IntGauge,
159    /// Total number of objects that is referenced by current version.
160    pub current_version_object_count: IntGauge,
161    /// Total size of objects that is referenced by current version.
162    pub current_version_object_size: IntGauge,
163    /// Total number of objects that includes dangling objects.
164    pub total_object_count: IntGauge,
165    /// Total size of objects that includes dangling objects.
166    pub total_object_size: IntGauge,
167    /// Number of objects per table change log.
168    pub table_change_log_object_count: IntGaugeVec,
169    /// Size of objects per table change log.
170    pub table_change_log_object_size: IntGaugeVec,
171    /// Min epoch currently retained in table change log.
172    pub table_change_log_min_epoch: IntGaugeVec,
173    /// The number of hummock version delta log.
174    pub delta_log_count: IntGauge,
175    /// latency of version checkpoint
176    pub version_checkpoint_latency: Histogram,
177    /// Latency for hummock manager to acquire lock
178    pub hummock_manager_lock_time: HistogramVec,
179    /// Latency for hummock manager to really process a request after acquire the lock
180    pub hummock_manager_real_process_time: HistogramVec,
181    /// The number of compactions from one level to another level that have been skipped
182    pub compact_skip_frequency: IntCounterVec,
183    /// Bytes of lsm tree needed to reach balance
184    pub compact_pending_bytes: IntGaugeVec,
185    /// Per level compression ratio
186    pub compact_level_compression_ratio: GenericGaugeVec<AtomicF64>,
187    /// Per level number of running compaction task
188    pub level_compact_task_cnt: IntGaugeVec,
189    pub time_after_last_observation: Arc<AtomicU64>,
190    pub l0_compact_level_count: HistogramVec,
191    pub compact_task_size: HistogramVec,
192    pub compact_task_file_count: HistogramVec,
193    pub compact_task_batch_count: HistogramVec,
194    pub split_compaction_group_count: IntCounterVec,
195    pub state_table_count: IntGaugeVec,
196    pub branched_sst_count: IntGaugeVec,
197    pub compact_task_trivial_move_sst_count: HistogramVec,
198
199    pub compaction_event_consumed_latency: Histogram,
200    pub compaction_event_loop_iteration_latency: Histogram,
201    pub time_travel_vacuum_metadata_latency: Histogram,
202    pub time_travel_write_metadata_latency: Histogram,
203
204    // ********************************** Object Store ************************************
205    // Object store related metrics (for backup/restore and version checkpoint)
206    pub object_store_metric: Arc<ObjectStoreMetrics>,
207
208    // ********************************** Source ************************************
209    /// supervisor for which source is still up.
210    pub source_is_up: LabelGuardedIntGaugeVec,
211    pub source_enumerator_metrics: Arc<SourceEnumeratorMetrics>,
212
213    // ********************************** Fragment ************************************
214    /// A dummy gauge metrics with its label to be the mapping from actor id to fragment id
215    pub actor_info: IntGaugeVec,
216    /// A dummy gauge metrics with its label to be the mapping from table id to actor id
217    pub table_info: IntGaugeVec,
218    /// A dummy gauge metrics with its label to be the mapping from actor id to sink id
219    pub sink_info: IntGaugeVec,
220    /// A dummy gauge metrics with its label to be relation info
221    pub relation_info: IntGaugeVec,
222    /// A dummy gauge metrics with its label to be the mapping from database id to database name
223    pub database_info: IntGaugeVec,
224    /// Backfill progress per fragment
225    pub backfill_fragment_progress: IntGaugeVec,
226
227    // ********************************** System Params ************************************
228    /// A dummy gauge metric with labels carrying system parameter info.
229    /// Labels: (name, value)
230    pub system_param_info: IntGaugeVec,
231
232    /// Write throughput of commit epoch for each stable
233    pub table_write_throughput: IntCounterVec,
234
235    /// The number of compaction groups that have been triggered to move
236    pub merge_compaction_group_count: IntCounterVec,
237
238    // ********************************** Auto Schema Change ************************************
239    pub auto_schema_change_failure_cnt: LabelGuardedIntCounterVec,
240    pub auto_schema_change_success_cnt: LabelGuardedIntCounterVec,
241    pub auto_schema_change_latency: LabelGuardedHistogramVec,
242
243    pub time_travel_version_replay_latency: Histogram,
244
245    pub compaction_group_count: IntGauge,
246    pub compaction_group_size: IntGaugeVec,
247    pub compaction_group_file_count: IntGaugeVec,
248    pub compaction_group_throughput: IntGaugeVec,
249
250    // ********************************** Refresh Manager ************************************
251    pub refresh_job_duration: LabelGuardedUintGaugeVec,
252    pub refresh_job_finish_cnt: LabelGuardedIntCounterVec,
253    pub refresh_cron_job_trigger_cnt: LabelGuardedIntCounterVec,
254    pub refresh_cron_job_miss_cnt: LabelGuardedIntCounterVec,
255}
256
257pub static GLOBAL_META_METRICS: LazyLock<MetaMetrics> =
258    LazyLock::new(|| MetaMetrics::new(&GLOBAL_METRICS_REGISTRY));
259
260impl MetaMetrics {
261    fn new(registry: &Registry) -> Self {
262        let opts = histogram_opts!(
263            "meta_grpc_duration_seconds",
264            "gRPC latency of meta services",
265            exponential_buckets(0.0001, 2.0, 20).unwrap() // max 52s
266        );
267        let grpc_latency =
268            register_histogram_vec_with_registry!(opts, &["path"], registry).unwrap();
269
270        let opts = histogram_opts!(
271            "meta_barrier_duration_seconds",
272            "barrier latency",
273            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
274        );
275        let barrier_latency =
276            register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
277                .unwrap();
278
279        let opts = histogram_opts!(
280            "meta_barrier_wait_commit_duration_seconds",
281            "barrier_wait_commit_latency",
282            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
283        );
284        let barrier_wait_commit_latency =
285            register_histogram_with_registry!(opts, registry).unwrap();
286
287        let opts = histogram_opts!(
288            "meta_barrier_send_duration_seconds",
289            "barrier send latency",
290            exponential_buckets(0.1, 1.5, 19).unwrap() // max 148s
291        );
292        let barrier_send_latency =
293            register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
294                .unwrap();
295        let barrier_interval_by_database = register_gauge_vec_with_registry!(
296            "meta_barrier_interval_by_database",
297            "barrier interval of each database",
298            &["database_id"],
299            registry
300        )
301        .unwrap();
302
303        let all_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
304            "all_barrier_nums",
305            "num of of all_barrier",
306            &["database_id"],
307            registry
308        )
309        .unwrap();
310        let in_flight_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
311            "in_flight_barrier_nums",
312            "num of of in_flight_barrier",
313            &["database_id"],
314            registry
315        )
316        .unwrap();
317        let last_committed_barrier_time = register_int_gauge_vec_with_registry!(
318            "last_committed_barrier_time",
319            "The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.",
320            &["database_id"],
321            registry
322        )
323        .unwrap();
324
325        // snapshot backfill metrics
326        let opts = histogram_opts!(
327            "meta_snapshot_backfill_barrier_duration_seconds",
328            "snapshot backfill barrier latency",
329            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
330        );
331        let snapshot_backfill_barrier_latency = register_guarded_histogram_vec_with_registry!(
332            opts,
333            &["table_id", "barrier_type"],
334            registry
335        )
336        .unwrap();
337
338        let snapshot_backfill_lag = register_guarded_int_gauge_vec_with_registry!(
339            "meta_snapshot_backfill_upstream_lag",
340            "snapshot backfill upstream_lag",
341            &["table_id"],
342            registry
343        )
344        .unwrap();
345        let snapshot_backfill_inflight_barrier_num = register_guarded_int_gauge_vec_with_registry!(
346            "meta_snapshot_backfill_inflight_barrier_num",
347            "snapshot backfill inflight_barrier_num",
348            &["table_id"],
349            registry
350        )
351        .unwrap();
352
353        let max_committed_epoch = register_int_gauge_with_registry!(
354            "storage_max_committed_epoch",
355            "max committed epoch",
356            registry
357        )
358        .unwrap();
359
360        let min_committed_epoch = register_int_gauge_with_registry!(
361            "storage_min_committed_epoch",
362            "min committed epoch",
363            registry
364        )
365        .unwrap();
366
367        let level_sst_num = register_int_gauge_vec_with_registry!(
368            "storage_level_sst_num",
369            "num of SSTs in each level",
370            &["level_index"],
371            registry
372        )
373        .unwrap();
374
375        let level_compact_cnt = register_int_gauge_vec_with_registry!(
376            "storage_level_compact_cnt",
377            "num of SSTs to be merged to next level in each level",
378            &["level_index"],
379            registry
380        )
381        .unwrap();
382
383        let compact_frequency = register_int_counter_vec_with_registry!(
384            "storage_level_compact_frequency",
385            "The number of compactions from one level to another level that have completed or failed.",
386            &["compactor", "group", "task_type", "result"],
387            registry
388        )
389        .unwrap();
390        let compact_skip_frequency = register_int_counter_vec_with_registry!(
391            "storage_skip_compact_frequency",
392            "The number of compactions from one level to another level that have been skipped.",
393            &["level", "type"],
394            registry
395        )
396        .unwrap();
397
398        let version_size =
399            register_int_gauge_with_registry!("storage_version_size", "version size", registry)
400                .unwrap();
401
402        let current_version_id = register_int_gauge_with_registry!(
403            "storage_current_version_id",
404            "current version id",
405            registry
406        )
407        .unwrap();
408
409        let checkpoint_version_id = register_int_gauge_with_registry!(
410            "storage_checkpoint_version_id",
411            "checkpoint version id",
412            registry
413        )
414        .unwrap();
415
416        let min_pinned_version_id = register_int_gauge_with_registry!(
417            "storage_min_pinned_version_id",
418            "min pinned version id",
419            registry
420        )
421        .unwrap();
422
423        let write_stop_compaction_groups = register_int_gauge_vec_with_registry!(
424            "storage_write_stop_compaction_groups",
425            "compaction groups of write stop state",
426            &["compaction_group_id"],
427            registry
428        )
429        .unwrap();
430
431        let full_gc_trigger_count = register_int_gauge_with_registry!(
432            "storage_full_gc_trigger_count",
433            "the number of attempts to trigger full GC",
434            registry
435        )
436        .unwrap();
437
438        let opts = histogram_opts!(
439            "storage_full_gc_candidate_object_count",
440            "the number of candidate object to delete after scanning object store",
441            exponential_buckets(1.0, 10.0, 6).unwrap()
442        );
443        let full_gc_candidate_object_count =
444            register_histogram_with_registry!(opts, registry).unwrap();
445
446        let opts = histogram_opts!(
447            "storage_full_gc_selected_object_count",
448            "the number of object to delete after filtering by meta node",
449            exponential_buckets(1.0, 10.0, 6).unwrap()
450        );
451        let full_gc_selected_object_count =
452            register_histogram_with_registry!(opts, registry).unwrap();
453
454        let min_safepoint_version_id = register_int_gauge_with_registry!(
455            "storage_min_safepoint_version_id",
456            "min safepoint version id",
457            registry
458        )
459        .unwrap();
460
461        let level_file_size = register_int_gauge_vec_with_registry!(
462            "storage_level_total_file_size",
463            "KBs total file bytes in each level",
464            &["level_index"],
465            registry
466        )
467        .unwrap();
468
469        let version_stats = register_int_gauge_vec_with_registry!(
470            "storage_version_stats",
471            "per table stats in current hummock version",
472            &["table_id", "metric"],
473            registry
474        )
475        .unwrap();
476
477        let materialized_view_stats = register_int_gauge_vec_with_registry!(
478            "storage_materialized_view_stats",
479            "per materialized view stats in current hummock version",
480            &["table_id", "metric"],
481            registry
482        )
483        .unwrap();
484
485        let stale_object_count = register_int_gauge_with_registry!(
486            "storage_stale_object_count",
487            "total number of objects that is no longer referenced by versions.",
488            registry
489        )
490        .unwrap();
491
492        let stale_object_size = register_int_gauge_with_registry!(
493            "storage_stale_object_size",
494            "total size of objects that is no longer referenced by versions.",
495            registry
496        )
497        .unwrap();
498
499        let old_version_object_count = register_int_gauge_with_registry!(
500            "storage_old_version_object_count",
501            "total number of objects that is still referenced by non-current versions",
502            registry
503        )
504        .unwrap();
505
506        let old_version_object_size = register_int_gauge_with_registry!(
507            "storage_old_version_object_size",
508            "total size of objects that is still referenced by non-current versions",
509            registry
510        )
511        .unwrap();
512
513        let current_version_object_count = register_int_gauge_with_registry!(
514            "storage_current_version_object_count",
515            "total number of objects that is referenced by current version",
516            registry
517        )
518        .unwrap();
519
520        let current_version_object_size = register_int_gauge_with_registry!(
521            "storage_current_version_object_size",
522            "total size of objects that is referenced by current version",
523            registry
524        )
525        .unwrap();
526
527        let total_object_count = register_int_gauge_with_registry!(
528            "storage_total_object_count",
529            "Total number of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
530            registry
531        ).unwrap();
532
533        let total_object_size = register_int_gauge_with_registry!(
534            "storage_total_object_size",
535            "Total size of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
536            registry
537        ).unwrap();
538
539        let table_change_log_object_count = register_int_gauge_vec_with_registry!(
540            "storage_table_change_log_object_count",
541            "per table change log object count",
542            &["table_id"],
543            registry
544        )
545        .unwrap();
546
547        let table_change_log_object_size = register_int_gauge_vec_with_registry!(
548            "storage_table_change_log_object_size",
549            "per table change log object size",
550            &["table_id"],
551            registry
552        )
553        .unwrap();
554
555        let table_change_log_min_epoch = register_int_gauge_vec_with_registry!(
556            "storage_table_change_log_min_epoch",
557            "min epoch currently retained in table change log",
558            &["table_id"],
559            registry
560        )
561        .unwrap();
562
563        let time_travel_object_count = register_int_gauge_with_registry!(
564            "storage_time_travel_object_count",
565            "total number of objects that is referenced by time travel.",
566            registry
567        )
568        .unwrap();
569
570        let delta_log_count = register_int_gauge_with_registry!(
571            "storage_delta_log_count",
572            "total number of hummock version delta log",
573            registry
574        )
575        .unwrap();
576
577        let opts = histogram_opts!(
578            "storage_version_checkpoint_latency",
579            "hummock version checkpoint latency",
580            exponential_buckets(0.1, 1.5, 20).unwrap()
581        );
582        let version_checkpoint_latency = register_histogram_with_registry!(opts, registry).unwrap();
583
584        let hummock_manager_lock_time = register_histogram_vec_with_registry!(
585            "hummock_manager_lock_time",
586            "latency for hummock manager to acquire the rwlock",
587            &["lock_name", "lock_type"],
588            registry
589        )
590        .unwrap();
591
592        let hummock_manager_real_process_time = register_histogram_vec_with_registry!(
593            "meta_hummock_manager_real_process_time",
594            "latency for hummock manager to really process the request",
595            &["method"],
596            registry
597        )
598        .unwrap();
599
600        let worker_num = register_int_gauge_vec_with_registry!(
601            "worker_num",
602            "number of nodes in the cluster",
603            &["worker_type"],
604            registry,
605        )
606        .unwrap();
607
608        let meta_type = register_int_gauge_vec_with_registry!(
609            "meta_num",
610            "role of meta nodes in the cluster",
611            &["worker_addr", "role"],
612            registry,
613        )
614        .unwrap();
615
616        let compact_pending_bytes = register_int_gauge_vec_with_registry!(
617            "storage_compact_pending_bytes",
618            "bytes of lsm tree needed to reach balance",
619            &["group"],
620            registry
621        )
622        .unwrap();
623
624        let compact_level_compression_ratio = register_gauge_vec_with_registry!(
625            "storage_compact_level_compression_ratio",
626            "compression ratio of each level of the lsm tree",
627            &["group", "level", "algorithm"],
628            registry
629        )
630        .unwrap();
631
632        let level_compact_task_cnt = register_int_gauge_vec_with_registry!(
633            "storage_level_compact_task_cnt",
634            "num of compact_task organized by group and level",
635            &["task"],
636            registry
637        )
638        .unwrap();
639
640        let time_travel_vacuum_metadata_latency = register_histogram_with_registry!(
641            histogram_opts!(
642                "storage_time_travel_vacuum_metadata_latency",
643                "Latency of vacuuming metadata for time travel",
644                exponential_buckets(0.1, 1.5, 20).unwrap()
645            ),
646            registry
647        )
648        .unwrap();
649        let time_travel_write_metadata_latency = register_histogram_with_registry!(
650            histogram_opts!(
651                "storage_time_travel_write_metadata_latency",
652                "Latency of writing metadata for time travel",
653                exponential_buckets(0.1, 1.5, 20).unwrap()
654            ),
655            registry
656        )
657        .unwrap();
658
659        let object_store_metric = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
660
661        let recovery_failure_cnt = register_int_counter_vec_with_registry!(
662            "recovery_failure_cnt",
663            "Number of failed recovery attempts",
664            &["recovery_type"],
665            registry
666        )
667        .unwrap();
668        let opts = histogram_opts!(
669            "recovery_latency",
670            "Latency of the recovery process",
671            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
672        );
673        let recovery_latency =
674            register_histogram_vec_with_registry!(opts, &["recovery_type"], registry).unwrap();
675
676        let auto_schema_change_failure_cnt = register_guarded_int_counter_vec_with_registry!(
677            "auto_schema_change_failure_cnt",
678            "Number of failed auto schema change",
679            &["table_id", "table_name"],
680            registry
681        )
682        .unwrap();
683
684        let auto_schema_change_success_cnt = register_guarded_int_counter_vec_with_registry!(
685            "auto_schema_change_success_cnt",
686            "Number of success auto schema change",
687            &["table_id", "table_name"],
688            registry
689        )
690        .unwrap();
691
692        let opts = histogram_opts!(
693            "auto_schema_change_latency",
694            "Latency of the auto schema change process",
695            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
696        );
697        let auto_schema_change_latency = register_guarded_histogram_vec_with_registry!(
698            opts,
699            &["table_id", "table_name"],
700            registry
701        )
702        .unwrap();
703
704        let source_is_up = register_guarded_int_gauge_vec_with_registry!(
705            "source_status_is_up",
706            "source is up or not",
707            &["source_id", "source_name"],
708            registry
709        )
710        .unwrap();
711        let source_enumerator_metrics = Arc::new(SourceEnumeratorMetrics::default());
712
713        let actor_info = register_int_gauge_vec_with_registry!(
714            "actor_info",
715            "Mapping from actor id to (fragment id, compute node)",
716            &["actor_id", "fragment_id", "compute_node"],
717            registry
718        )
719        .unwrap();
720
721        let table_info = register_int_gauge_vec_with_registry!(
722            "table_info",
723            "Mapping from table id to (actor id, table name)",
724            &[
725                "materialized_view_id",
726                "table_id",
727                "fragment_id",
728                "table_name",
729                "table_type",
730                "compaction_group_id"
731            ],
732            registry
733        )
734        .unwrap();
735
736        let sink_info = register_int_gauge_vec_with_registry!(
737            "sink_info",
738            "Mapping from actor id to (actor id, sink name)",
739            &["actor_id", "sink_id", "sink_name",],
740            registry
741        )
742        .unwrap();
743
744        let relation_info = register_int_gauge_vec_with_registry!(
745            "relation_info",
746            "Information of the database relation (table/source/sink/materialized view/index/internal)",
747            &["id", "database", "schema", "name", "resource_group", "type"],
748            registry
749        )
750        .unwrap();
751
752        let database_info = register_int_gauge_vec_with_registry!(
753            "database_info",
754            "Mapping from database id to database name",
755            &["database_id", "database_name"],
756            registry
757        )
758        .unwrap();
759
760        let backfill_fragment_progress = register_int_gauge_vec_with_registry!(
761            "backfill_fragment_progress",
762            "Backfill progress per fragment",
763            &[
764                "job_id",
765                "fragment_id",
766                "backfill_state_table_id",
767                "backfill_target_relation_id",
768                "backfill_target_relation_name",
769                "backfill_target_relation_type",
770                "backfill_type",
771                "backfill_epoch",
772                "upstream_type",
773                "backfill_progress",
774            ],
775            registry
776        )
777        .unwrap();
778
779        let l0_compact_level_count = register_histogram_vec_with_registry!(
780            "storage_l0_compact_level_count",
781            "level_count of l0 compact task",
782            &["group", "type"],
783            registry
784        )
785        .unwrap();
786
787        // System parameter info
788        let system_param_info = register_int_gauge_vec_with_registry!(
789            "system_param_info",
790            "Information of system parameters",
791            &["name", "value"],
792            registry
793        )
794        .unwrap();
795
796        let opts = histogram_opts!(
797            "storage_compact_task_size",
798            "Total size of compact that have been issued to state store",
799            exponential_buckets(1048576.0, 2.0, 16).unwrap()
800        );
801
802        let compact_task_size =
803            register_histogram_vec_with_registry!(opts, &["group", "type"], registry).unwrap();
804
805        let compact_task_file_count = register_histogram_vec_with_registry!(
806            "storage_compact_task_file_count",
807            "file count of compact task",
808            &["group", "type"],
809            registry
810        )
811        .unwrap();
812        let opts = histogram_opts!(
813            "storage_compact_task_batch_count",
814            "count of compact task batch",
815            exponential_buckets(1.0, 2.0, 8).unwrap()
816        );
817        let compact_task_batch_count =
818            register_histogram_vec_with_registry!(opts, &["type"], registry).unwrap();
819
820        let table_write_throughput = register_int_counter_vec_with_registry!(
821            "storage_commit_write_throughput",
822            "The number of compactions from one level to another level that have been skipped.",
823            &["table_id"],
824            registry
825        )
826        .unwrap();
827
828        let split_compaction_group_count = register_int_counter_vec_with_registry!(
829            "storage_split_compaction_group_count",
830            "Count of trigger split compaction group",
831            &["group"],
832            registry
833        )
834        .unwrap();
835
836        let state_table_count = register_int_gauge_vec_with_registry!(
837            "storage_state_table_count",
838            "Count of stable table per compaction group",
839            &["group"],
840            registry
841        )
842        .unwrap();
843
844        let branched_sst_count = register_int_gauge_vec_with_registry!(
845            "storage_branched_sst_count",
846            "Count of branched sst per compaction group",
847            &["group"],
848            registry
849        )
850        .unwrap();
851
852        let opts = histogram_opts!(
853            "storage_compaction_event_consumed_latency",
854            "The latency(ms) of each event being consumed",
855            exponential_buckets(1.0, 1.5, 30).unwrap() // max 191s
856        );
857        let compaction_event_consumed_latency =
858            register_histogram_with_registry!(opts, registry).unwrap();
859
860        let opts = histogram_opts!(
861            "storage_compaction_event_loop_iteration_latency",
862            "The latency(ms) of each iteration of the compaction event loop",
863            exponential_buckets(1.0, 1.5, 30).unwrap() // max 191s
864        );
865        let compaction_event_loop_iteration_latency =
866            register_histogram_with_registry!(opts, registry).unwrap();
867
868        let merge_compaction_group_count = register_int_counter_vec_with_registry!(
869            "storage_merge_compaction_group_count",
870            "Count of trigger merge compaction group",
871            &["group"],
872            registry
873        )
874        .unwrap();
875
876        let opts = histogram_opts!(
877            "storage_time_travel_version_replay_latency",
878            "The latency(ms) of replaying a hummock version for time travel",
879            exponential_buckets(0.01, 10.0, 6).unwrap()
880        );
881        let time_travel_version_replay_latency =
882            register_histogram_with_registry!(opts, registry).unwrap();
883
884        let compaction_group_count = register_int_gauge_with_registry!(
885            "storage_compaction_group_count",
886            "The number of compaction groups",
887            registry,
888        )
889        .unwrap();
890
891        let compaction_group_size = register_int_gauge_vec_with_registry!(
892            "storage_compaction_group_size",
893            "The size of compaction group",
894            &["group"],
895            registry
896        )
897        .unwrap();
898
899        let compaction_group_file_count = register_int_gauge_vec_with_registry!(
900            "storage_compaction_group_file_count",
901            "The file count of compaction group",
902            &["group"],
903            registry
904        )
905        .unwrap();
906
907        let compaction_group_throughput = register_int_gauge_vec_with_registry!(
908            "storage_compaction_group_throughput",
909            "The throughput of compaction group",
910            &["group"],
911            registry
912        )
913        .unwrap();
914
915        let opts = histogram_opts!(
916            "storage_compact_task_trivial_move_sst_count",
917            "sst count of compact trivial-move task",
918            exponential_buckets(1.0, 2.0, 8).unwrap()
919        );
920        let compact_task_trivial_move_sst_count =
921            register_histogram_vec_with_registry!(opts, &["group"], registry).unwrap();
922
923        let refresh_job_duration = register_guarded_uint_gauge_vec_with_registry!(
924            "meta_refresh_job_duration",
925            "The duration of refresh job",
926            &["table_id", "status"],
927            registry
928        )
929        .unwrap();
930        let refresh_job_finish_cnt = register_guarded_int_counter_vec_with_registry!(
931            "meta_refresh_job_finish_cnt",
932            "The number of finished refresh jobs",
933            &["table_id", "status"],
934            registry
935        )
936        .unwrap();
937        let refresh_cron_job_trigger_cnt = register_guarded_int_counter_vec_with_registry!(
938            "meta_refresh_cron_job_trigger_cnt",
939            "The number of cron refresh jobs triggered",
940            &["table_id"],
941            registry
942        )
943        .unwrap();
944        let refresh_cron_job_miss_cnt = register_guarded_int_counter_vec_with_registry!(
945            "meta_refresh_cron_job_miss_cnt",
946            "The number of cron refresh jobs missed",
947            &["table_id"],
948            registry
949        )
950        .unwrap();
951
952        Self {
953            grpc_latency,
954            barrier_latency,
955            barrier_wait_commit_latency,
956            barrier_send_latency,
957            all_barrier_nums,
958            in_flight_barrier_nums,
959            last_committed_barrier_time,
960            barrier_interval_by_database,
961            snapshot_backfill_barrier_latency,
962            snapshot_backfill_lag,
963            snapshot_backfill_inflight_barrier_num,
964            recovery_failure_cnt,
965            recovery_latency,
966
967            max_committed_epoch,
968            min_committed_epoch,
969            level_sst_num,
970            level_compact_cnt,
971            compact_frequency,
972            compact_skip_frequency,
973            level_file_size,
974            version_size,
975            version_stats,
976            materialized_view_stats,
977            stale_object_count,
978            stale_object_size,
979            old_version_object_count,
980            old_version_object_size,
981            time_travel_object_count,
982            current_version_object_count,
983            current_version_object_size,
984            total_object_count,
985            total_object_size,
986            table_change_log_object_count,
987            table_change_log_object_size,
988            table_change_log_min_epoch,
989            delta_log_count,
990            version_checkpoint_latency,
991            current_version_id,
992            checkpoint_version_id,
993            min_pinned_version_id,
994            min_safepoint_version_id,
995            write_stop_compaction_groups,
996            full_gc_trigger_count,
997            full_gc_candidate_object_count,
998            full_gc_selected_object_count,
999            hummock_manager_lock_time,
1000            hummock_manager_real_process_time,
1001            time_after_last_observation: Arc::new(AtomicU64::new(0)),
1002            worker_num,
1003            meta_type,
1004            compact_pending_bytes,
1005            compact_level_compression_ratio,
1006            level_compact_task_cnt,
1007            object_store_metric,
1008            source_is_up,
1009            source_enumerator_metrics,
1010            actor_info,
1011            table_info,
1012            sink_info,
1013            relation_info,
1014            database_info,
1015            backfill_fragment_progress,
1016            system_param_info,
1017            l0_compact_level_count,
1018            compact_task_size,
1019            compact_task_file_count,
1020            compact_task_batch_count,
1021            compact_task_trivial_move_sst_count,
1022            table_write_throughput,
1023            split_compaction_group_count,
1024            state_table_count,
1025            branched_sst_count,
1026            compaction_event_consumed_latency,
1027            compaction_event_loop_iteration_latency,
1028            auto_schema_change_failure_cnt,
1029            auto_schema_change_success_cnt,
1030            auto_schema_change_latency,
1031            merge_compaction_group_count,
1032            time_travel_version_replay_latency,
1033            compaction_group_count,
1034            compaction_group_size,
1035            compaction_group_file_count,
1036            compaction_group_throughput,
1037            refresh_job_duration,
1038            refresh_job_finish_cnt,
1039            refresh_cron_job_trigger_cnt,
1040            refresh_cron_job_miss_cnt,
1041            time_travel_vacuum_metadata_latency,
1042            time_travel_write_metadata_latency,
1043        }
1044    }
1045
1046    #[cfg(test)]
1047    pub fn for_test(registry: &Registry) -> Self {
1048        Self::new(registry)
1049    }
1050}
1051impl Default for MetaMetrics {
1052    fn default() -> Self {
1053        GLOBAL_META_METRICS.clone()
1054    }
1055}
1056
1057/// Refresh `system_param_info` metrics by reading current system parameters.
1058pub async fn refresh_system_param_info_metrics(
1059    system_params_controller: &SystemParamsControllerRef,
1060    meta_metrics: Arc<MetaMetrics>,
1061) {
1062    let params_info = system_params_controller.get_params().await.get_all();
1063
1064    meta_metrics.system_param_info.reset();
1065    for info in params_info {
1066        meta_metrics
1067            .system_param_info
1068            .with_label_values(&[info.name, &info.value])
1069            .set(1);
1070    }
1071}
1072
1073pub fn start_worker_info_monitor(
1074    metadata_manager: MetadataManager,
1075    election_client: ElectionClientRef,
1076    interval: Duration,
1077    meta_metrics: Arc<MetaMetrics>,
1078) -> (JoinHandle<()>, Sender<()>) {
1079    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1080    let join_handle = tokio::spawn(async move {
1081        let mut monitor_interval = tokio::time::interval(interval);
1082        monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1083        loop {
1084            tokio::select! {
1085                // Wait for interval
1086                _ = monitor_interval.tick() => {},
1087                // Shutdown monitor
1088                _ = &mut shutdown_rx => {
1089                    tracing::info!("Worker number monitor is stopped");
1090                    return;
1091                }
1092            }
1093
1094            let node_map = match metadata_manager.count_worker_node().await {
1095                Ok(node_map) => node_map,
1096                Err(err) => {
1097                    tracing::warn!(error = %err.as_report(), "fail to count worker node");
1098                    continue;
1099                }
1100            };
1101
1102            // Reset metrics to clean the stale labels e.g. invalid lease ids
1103            meta_metrics.worker_num.reset();
1104            meta_metrics.meta_type.reset();
1105
1106            for (worker_type, worker_num) in node_map {
1107                meta_metrics
1108                    .worker_num
1109                    .with_label_values(&[(worker_type.as_str_name())])
1110                    .set(worker_num as i64);
1111            }
1112            if let Ok(meta_members) = election_client.get_members().await {
1113                meta_metrics
1114                    .worker_num
1115                    .with_label_values(&[WorkerType::Meta.as_str_name()])
1116                    .set(meta_members.len() as i64);
1117                meta_members.into_iter().for_each(|m| {
1118                    let role = if m.is_leader { "leader" } else { "follower" };
1119                    meta_metrics
1120                        .meta_type
1121                        .with_label_values(&[m.id.as_str(), role])
1122                        .set(1);
1123                });
1124            }
1125        }
1126    });
1127
1128    (join_handle, shutdown_tx)
1129}
1130
1131pub async fn refresh_fragment_info_metrics(
1132    catalog_controller: &CatalogControllerRef,
1133    cluster_controller: &ClusterControllerRef,
1134    hummock_manager: &HummockManagerRef,
1135    meta_metrics: Arc<MetaMetrics>,
1136) {
1137    let worker_nodes = match cluster_controller
1138        .list_workers(Some(WorkerType::ComputeNode.into()), None)
1139        .await
1140    {
1141        Ok(worker_nodes) => worker_nodes,
1142        Err(err) => {
1143            tracing::warn!(error=%err.as_report(), "fail to list worker node");
1144            return;
1145        }
1146    };
1147    let actor_locations = match catalog_controller.list_actor_locations() {
1148        Ok(actor_locations) => actor_locations,
1149        Err(err) => {
1150            tracing::warn!(error=%err.as_report(), "fail to get actor locations");
1151            return;
1152        }
1153    };
1154    let sink_actor_mapping = match catalog_controller.list_sink_actor_mapping().await {
1155        Ok(sink_actor_mapping) => sink_actor_mapping,
1156        Err(err) => {
1157            tracing::warn!(error=%err.as_report(), "fail to get sink actor mapping");
1158            return;
1159        }
1160    };
1161    let fragment_state_tables = match catalog_controller.list_fragment_state_tables().await {
1162        Ok(fragment_state_tables) => fragment_state_tables,
1163        Err(err) => {
1164            tracing::warn!(error=%err.as_report(), "fail to get fragment state tables");
1165            return;
1166        }
1167    };
1168    let table_name_and_type_mapping = match catalog_controller.get_table_name_type_mapping().await {
1169        Ok(mapping) => mapping,
1170        Err(err) => {
1171            tracing::warn!(error=%err.as_report(), "fail to get table name mapping");
1172            return;
1173        }
1174    };
1175
1176    let worker_addr_mapping: HashMap<WorkerId, String> = worker_nodes
1177        .into_iter()
1178        .map(|worker_node| {
1179            let addr = match worker_node.host {
1180                Some(host) => format!("{}:{}", host.host, host.port),
1181                None => "".to_owned(),
1182            };
1183            (worker_node.id, addr)
1184        })
1185        .collect();
1186    let table_compaction_group_id_mapping = hummock_manager
1187        .get_table_compaction_group_id_mapping()
1188        .await;
1189
1190    // Start fresh with a reset to clear all outdated labels. This is safe since we always
1191    // report full info on each interval.
1192    meta_metrics.actor_info.reset();
1193    meta_metrics.table_info.reset();
1194    meta_metrics.sink_info.reset();
1195    for actor_location in actor_locations {
1196        let actor_id_str = actor_location.actor_id.to_string();
1197        let fragment_id_str = actor_location.fragment_id.to_string();
1198        // Report a dummy gauge metrics with (fragment id, actor id, node
1199        // address) as its label
1200        if let Some(address) = worker_addr_mapping.get(&actor_location.worker_id) {
1201            meta_metrics
1202                .actor_info
1203                .with_label_values(&[&actor_id_str, &fragment_id_str, address])
1204                .set(1);
1205        }
1206    }
1207    for (sink_id, (sink_name, actor_ids)) in sink_actor_mapping {
1208        let sink_id_str = sink_id.to_string();
1209        for actor_id in actor_ids {
1210            let actor_id_str = actor_id.to_string();
1211            meta_metrics
1212                .sink_info
1213                .with_label_values(&[&actor_id_str, &sink_id_str, &sink_name])
1214                .set(1);
1215        }
1216    }
1217    for PartialFragmentStateTables {
1218        fragment_id,
1219        job_id,
1220        state_table_ids,
1221    } in fragment_state_tables
1222    {
1223        let fragment_id_str = fragment_id.to_string();
1224        let job_id_str = job_id.to_string();
1225        for table_id in state_table_ids.into_inner() {
1226            let table_id_str = table_id.to_string();
1227            let (table_name, table_type) = table_name_and_type_mapping
1228                .get(&table_id)
1229                .cloned()
1230                .unwrap_or_else(|| ("unknown".to_owned(), "unknown".to_owned()));
1231            let compaction_group_id = table_compaction_group_id_mapping
1232                .get(&table_id)
1233                .map(|cg_id| cg_id.to_string())
1234                .unwrap_or_else(|| "unknown".to_owned());
1235            meta_metrics
1236                .table_info
1237                .with_label_values(&[
1238                    &job_id_str,
1239                    &table_id_str,
1240                    &fragment_id_str,
1241                    &table_name,
1242                    &table_type,
1243                    &compaction_group_id,
1244                ])
1245                .set(1);
1246        }
1247    }
1248}
1249
1250pub async fn refresh_relation_info_metrics(
1251    catalog_controller: &CatalogControllerRef,
1252    meta_metrics: Arc<MetaMetrics>,
1253) {
1254    let table_objects = match catalog_controller.list_table_objects().await {
1255        Ok(table_objects) => table_objects,
1256        Err(err) => {
1257            tracing::warn!(error=%err.as_report(), "fail to get table objects");
1258            return;
1259        }
1260    };
1261
1262    let source_objects = match catalog_controller.list_source_objects().await {
1263        Ok(source_objects) => source_objects,
1264        Err(err) => {
1265            tracing::warn!(error=%err.as_report(), "fail to get source objects");
1266            return;
1267        }
1268    };
1269
1270    let sink_objects = match catalog_controller.list_sink_objects().await {
1271        Ok(sink_objects) => sink_objects,
1272        Err(err) => {
1273            tracing::warn!(error=%err.as_report(), "fail to get sink objects");
1274            return;
1275        }
1276    };
1277
1278    meta_metrics.relation_info.reset();
1279
1280    for (id, db, schema, name, resource_group, table_type) in table_objects {
1281        let relation_type = match table_type {
1282            TableType::Table => "table",
1283            TableType::MaterializedView => "materialized_view",
1284            TableType::Index | TableType::VectorIndex => "index",
1285            TableType::Internal => "internal",
1286        };
1287        meta_metrics
1288            .relation_info
1289            .with_label_values(&[
1290                &id.to_string(),
1291                &db,
1292                &schema,
1293                &name,
1294                &resource_group,
1295                &relation_type.to_owned(),
1296            ])
1297            .set(1);
1298    }
1299
1300    for (id, db, schema, name, resource_group) in source_objects {
1301        meta_metrics
1302            .relation_info
1303            .with_label_values(&[
1304                &id.to_string(),
1305                &db,
1306                &schema,
1307                &name,
1308                &resource_group,
1309                &"source".to_owned(),
1310            ])
1311            .set(1);
1312    }
1313
1314    for (id, db, schema, name, resource_group) in sink_objects {
1315        meta_metrics
1316            .relation_info
1317            .with_label_values(&[
1318                &id.to_string(),
1319                &db,
1320                &schema,
1321                &name,
1322                &resource_group,
1323                &"sink".to_owned(),
1324            ])
1325            .set(1);
1326    }
1327}
1328
1329pub async fn refresh_database_info_metrics(
1330    catalog_controller: &CatalogControllerRef,
1331    meta_metrics: Arc<MetaMetrics>,
1332) {
1333    let databases = match catalog_controller.list_databases().await {
1334        Ok(databases) => databases,
1335        Err(err) => {
1336            tracing::warn!(error=%err.as_report(), "fail to get databases");
1337            return;
1338        }
1339    };
1340
1341    meta_metrics.database_info.reset();
1342
1343    for db in databases {
1344        meta_metrics
1345            .database_info
1346            .with_label_values(&[&db.id.to_string(), &db.name])
1347            .set(1);
1348    }
1349}
1350
1351fn extract_backfill_fragment_info(
1352    distribution: &FragmentDistribution,
1353) -> Option<BackfillFragmentInfo> {
1354    let backfill_type =
1355        if distribution.fragment_type_mask & FragmentTypeFlag::SourceScan as u32 != 0 {
1356            "SOURCE"
1357        } else if distribution.fragment_type_mask
1358            & (FragmentTypeFlag::SnapshotBackfillStreamScan as u32
1359                | FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32)
1360            != 0
1361        {
1362            "SNAPSHOT_BACKFILL"
1363        } else if distribution.fragment_type_mask & FragmentTypeFlag::StreamScan as u32 != 0 {
1364            "ARRANGEMENT_OR_NO_SHUFFLE"
1365        } else {
1366            return None;
1367        };
1368
1369    let stream_node = distribution.node.as_ref()?;
1370    let mut info = None;
1371    match backfill_type {
1372        "SOURCE" => {
1373            visit_stream_node_source_backfill(stream_node, |node| {
1374                info = Some(BackfillFragmentInfo {
1375                    job_id: distribution.table_id.as_raw_id(),
1376                    fragment_id: distribution.fragment_id.as_raw_id(),
1377                    backfill_state_table_id: node
1378                        .state_table
1379                        .as_ref()
1380                        .map(|table| table.id.as_raw_id())
1381                        .unwrap_or_default(),
1382                    backfill_target_relation_id: node.upstream_source_id.as_raw_id(),
1383                    backfill_type,
1384                    backfill_epoch: 0,
1385                });
1386            });
1387        }
1388        "SNAPSHOT_BACKFILL" | "ARRANGEMENT_OR_NO_SHUFFLE" => {
1389            visit_stream_node_stream_scan(stream_node, |node| {
1390                info = Some(BackfillFragmentInfo {
1391                    job_id: distribution.table_id.as_raw_id(),
1392                    fragment_id: distribution.fragment_id.as_raw_id(),
1393                    backfill_state_table_id: node
1394                        .state_table
1395                        .as_ref()
1396                        .map(|table| table.id.as_raw_id())
1397                        .unwrap_or_default(),
1398                    backfill_target_relation_id: node.table_id.as_raw_id(),
1399                    backfill_type,
1400                    backfill_epoch: node.snapshot_backfill_epoch.unwrap_or_default(),
1401                });
1402            });
1403        }
1404        _ => {}
1405    }
1406
1407    info
1408}
1409
1410pub async fn refresh_backfill_progress_metrics(
1411    catalog_controller: &CatalogControllerRef,
1412    hummock_manager: &HummockManagerRef,
1413    barrier_manager: &BarrierManagerRef,
1414    meta_metrics: Arc<MetaMetrics>,
1415) {
1416    let fragment_descs = match catalog_controller.list_fragment_descs_with_node(true).await {
1417        Ok(fragment_descs) => fragment_descs,
1418        Err(err) => {
1419            tracing::warn!(error=%err.as_report(), "fail to list creating fragment descs");
1420            return;
1421        }
1422    };
1423
1424    let backfill_infos: HashMap<(u32, u32), BackfillFragmentInfo> = fragment_descs
1425        .iter()
1426        .filter_map(|(distribution, _)| extract_backfill_fragment_info(distribution))
1427        .map(|info| ((info.job_id, info.fragment_id), info))
1428        .collect();
1429
1430    let fragment_progresses = match barrier_manager.get_fragment_backfill_progress().await {
1431        Ok(progress) => progress,
1432        Err(err) => {
1433            tracing::warn!(error=%err.as_report(), "fail to get fragment backfill progress");
1434            return;
1435        }
1436    };
1437
1438    let progress_by_fragment: HashMap<(u32, u32), _> = fragment_progresses
1439        .into_iter()
1440        .map(|progress| {
1441            (
1442                (
1443                    progress.job_id.as_raw_id(),
1444                    progress.fragment_id.as_raw_id(),
1445                ),
1446                progress,
1447            )
1448        })
1449        .collect();
1450
1451    let version_stats = hummock_manager.get_version_stats().await;
1452
1453    let relation_ids: HashSet<_> = backfill_infos
1454        .values()
1455        .map(|info| ObjectId::new(info.backfill_target_relation_id))
1456        .collect();
1457    let relation_objects = match catalog_controller
1458        .list_relation_objects_by_ids(&relation_ids)
1459        .await
1460    {
1461        Ok(relation_objects) => relation_objects,
1462        Err(err) => {
1463            tracing::warn!(error=%err.as_report(), "fail to get relation objects");
1464            return;
1465        }
1466    };
1467
1468    let mut relation_info = HashMap::new();
1469    for (id, db, schema, name, rel_type) in relation_objects {
1470        relation_info.insert(id.as_raw_id(), (db, schema, name, rel_type));
1471    }
1472
1473    meta_metrics.backfill_fragment_progress.reset();
1474
1475    for info in backfill_infos.values() {
1476        let progress = progress_by_fragment.get(&(info.job_id, info.fragment_id));
1477        let (db, schema, name, rel_type) = relation_info
1478            .get(&info.backfill_target_relation_id)
1479            .cloned()
1480            .unwrap_or_else(|| {
1481                (
1482                    "unknown".to_owned(),
1483                    "unknown".to_owned(),
1484                    "unknown".to_owned(),
1485                    "unknown".to_owned(),
1486                )
1487            });
1488
1489        let job_id_str = info.job_id.to_string();
1490        let fragment_id_str = info.fragment_id.to_string();
1491        let backfill_state_table_id_str = info.backfill_state_table_id.to_string();
1492        let backfill_target_relation_id_str = info.backfill_target_relation_id.to_string();
1493        let backfill_target_relation_name_str = format!("{db}.{schema}.{name}");
1494        let backfill_target_relation_type_str = rel_type;
1495        let backfill_type_str = info.backfill_type.to_owned();
1496        let backfill_epoch_str = info.backfill_epoch.to_string();
1497        let total_key_count = version_stats
1498            .table_stats
1499            .get(&TableId::new(info.backfill_target_relation_id))
1500            .map(|stats| stats.total_key_count as u64);
1501
1502        let progress_label = match (info.backfill_type, progress) {
1503            ("SOURCE", Some(progress)) => format!("{} consumed rows", progress.consumed_rows),
1504            ("SOURCE", None) => "0 consumed rows".to_owned(),
1505            (_, Some(progress)) if progress.done => {
1506                let total = total_key_count.unwrap_or(0);
1507                format!("100.0000% ({}/{})", total, total)
1508            }
1509            (_, Some(progress)) => {
1510                let total = total_key_count.unwrap_or(0);
1511                if total == 0 {
1512                    "100.0000% (0/0)".to_owned()
1513                } else {
1514                    let raw = (progress.consumed_rows as f64) / (total as f64) * 100.0;
1515                    format!(
1516                        "{:.4}% ({}/{})",
1517                        raw.min(100.0),
1518                        progress.consumed_rows,
1519                        total
1520                    )
1521                }
1522            }
1523            (_, None) => "0.0000% (0/0)".to_owned(),
1524        };
1525        let upstream_type_str = progress
1526            .map(|progress| progress.upstream_type.to_string())
1527            .unwrap_or_else(|| "Unknown".to_owned());
1528
1529        meta_metrics
1530            .backfill_fragment_progress
1531            .with_label_values(&[
1532                &job_id_str,
1533                &fragment_id_str,
1534                &backfill_state_table_id_str,
1535                &backfill_target_relation_id_str,
1536                &backfill_target_relation_name_str,
1537                &backfill_target_relation_type_str,
1538                &backfill_type_str,
1539                &backfill_epoch_str,
1540                &upstream_type_str,
1541                &progress_label,
1542            ])
1543            .set(1);
1544    }
1545}
1546
1547pub fn start_info_monitor(
1548    metadata_manager: MetadataManager,
1549    hummock_manager: HummockManagerRef,
1550    barrier_manager: BarrierManagerRef,
1551    system_params_controller: SystemParamsControllerRef,
1552    meta_metrics: Arc<MetaMetrics>,
1553) -> (JoinHandle<()>, Sender<()>) {
1554    const COLLECT_INTERVAL_SECONDS: u64 = 60;
1555
1556    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1557    let join_handle = tokio::spawn(async move {
1558        let mut monitor_interval =
1559            tokio::time::interval(Duration::from_secs(COLLECT_INTERVAL_SECONDS));
1560        monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1561        loop {
1562            tokio::select! {
1563                // Wait for interval
1564                _ = monitor_interval.tick() => {},
1565                // Shutdown monitor
1566                _ = &mut shutdown_rx => {
1567                    tracing::info!("Meta info monitor is stopped");
1568                    return;
1569                }
1570            }
1571
1572            // Fragment and relation info
1573            refresh_fragment_info_metrics(
1574                &metadata_manager.catalog_controller,
1575                &metadata_manager.cluster_controller,
1576                &hummock_manager,
1577                meta_metrics.clone(),
1578            )
1579            .await;
1580
1581            refresh_relation_info_metrics(
1582                &metadata_manager.catalog_controller,
1583                meta_metrics.clone(),
1584            )
1585            .await;
1586
1587            refresh_database_info_metrics(
1588                &metadata_manager.catalog_controller,
1589                meta_metrics.clone(),
1590            )
1591            .await;
1592
1593            refresh_backfill_progress_metrics(
1594                &metadata_manager.catalog_controller,
1595                &hummock_manager,
1596                &barrier_manager,
1597                meta_metrics.clone(),
1598            )
1599            .await;
1600
1601            // System parameter info
1602            refresh_system_param_info_metrics(&system_params_controller, meta_metrics.clone())
1603                .await;
1604        }
1605    });
1606
1607    (join_handle, shutdown_tx)
1608}