risingwave_meta/rpc/
metrics.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::atomic::AtomicU64;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use prometheus::core::{AtomicF64, GenericGaugeVec};
21use prometheus::{
22    GaugeVec, Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
23    exponential_buckets, histogram_opts, register_gauge_vec_with_registry,
24    register_histogram_vec_with_registry, register_histogram_with_registry,
25    register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
26    register_int_gauge_with_registry,
27};
28use risingwave_common::catalog::{FragmentTypeFlag, TableId};
29use risingwave_common::metrics::{
30    LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
31    LabelGuardedUintGaugeVec,
32};
33use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
34use risingwave_common::system_param::reader::SystemParamsRead;
35use risingwave_common::util::stream_graph_visitor::{
36    visit_stream_node_source_backfill, visit_stream_node_stream_scan,
37};
38use risingwave_common::{
39    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
40    register_guarded_int_gauge_vec_with_registry, register_guarded_uint_gauge_vec_with_registry,
41};
42use risingwave_connector::source::monitor::EnumeratorMetrics as SourceEnumeratorMetrics;
43use risingwave_meta_model::table::TableType;
44use risingwave_meta_model::{ObjectId, WorkerId};
45use risingwave_object_store::object::object_metrics::{
46    GLOBAL_OBJECT_STORE_METRICS, ObjectStoreMetrics,
47};
48use risingwave_pb::common::WorkerType;
49use risingwave_pb::meta::FragmentDistribution;
50use thiserror_ext::AsReport;
51use tokio::sync::oneshot::Sender;
52use tokio::task::JoinHandle;
53
54use crate::barrier::BarrierManagerRef;
55use crate::controller::catalog::CatalogControllerRef;
56use crate::controller::cluster::ClusterControllerRef;
57use crate::controller::system_param::SystemParamsControllerRef;
58use crate::controller::utils::PartialFragmentStateTables;
59use crate::hummock::HummockManagerRef;
60use crate::manager::MetadataManager;
61use crate::rpc::ElectionClientRef;
62
63struct BackfillFragmentInfo {
64    job_id: u32,
65    fragment_id: u32,
66    backfill_state_table_id: u32,
67    backfill_target_relation_id: u32,
68    backfill_type: &'static str,
69    backfill_epoch: u64,
70}
71
72#[derive(Clone)]
73pub struct MetaMetrics {
74    // ********************************** Meta ************************************
75    /// The number of workers in the cluster.
76    pub worker_num: IntGaugeVec,
77    /// The roles of all meta nodes in the cluster.
78    pub meta_type: IntGaugeVec,
79
80    // ********************************** gRPC ************************************
81    /// gRPC latency of meta services
82    pub grpc_latency: HistogramVec,
83
84    // ********************************** Barrier ************************************
85    /// The duration from barrier injection to commit
86    /// It is the sum of inflight-latency, sync-latency and wait-commit-latency
87    pub barrier_latency: LabelGuardedHistogramVec,
88    /// The duration from barrier complete to commit
89    pub barrier_wait_commit_latency: Histogram,
90    /// Latency between each barrier send
91    pub barrier_send_latency: LabelGuardedHistogramVec,
92    /// The number of all barriers. It is the sum of barriers that are in-flight or completed but
93    /// waiting for other barriers
94    pub all_barrier_nums: LabelGuardedIntGaugeVec,
95    /// The number of in-flight barriers
96    pub in_flight_barrier_nums: LabelGuardedIntGaugeVec,
97    /// The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.
98    pub last_committed_barrier_time: IntGaugeVec,
99    /// The barrier interval of each database
100    pub barrier_interval_by_database: GaugeVec,
101
102    // ********************************** Snapshot Backfill ***************************
103    /// The barrier latency in second of `table_id` and snapshto backfill `barrier_type`
104    pub snapshot_backfill_barrier_latency: LabelGuardedHistogramVec, // (table_id, barrier_type)
105    /// The latency of commit epoch of `table_id`
106    pub snapshot_backfill_wait_commit_latency: LabelGuardedHistogramVec, // (table_id, )
107    /// The lags between the upstream epoch and the downstream epoch.
108    pub snapshot_backfill_lag: LabelGuardedIntGaugeVec, // (table_id, )
109    /// The number of inflight barriers of `table_id`
110    pub snapshot_backfill_inflight_barrier_num: LabelGuardedIntGaugeVec, // (table_id, _)
111
112    // ********************************** Recovery ************************************
113    pub recovery_failure_cnt: IntCounterVec,
114    pub recovery_latency: HistogramVec,
115
116    // ********************************** Hummock ************************************
117    /// Max committed epoch
118    pub max_committed_epoch: IntGauge,
119    /// Min committed epoch
120    pub min_committed_epoch: IntGauge,
121    /// The number of SSTs in each level
122    pub level_sst_num: IntGaugeVec,
123    /// The number of SSTs to be merged to next level in each level
124    pub level_compact_cnt: IntGaugeVec,
125    /// The number of compact tasks
126    pub compact_frequency: IntCounterVec,
127    /// Size of each level
128    pub level_file_size: IntGaugeVec,
129    /// Hummock version size
130    pub version_size: IntGauge,
131    /// The version Id of current version.
132    pub current_version_id: IntGauge,
133    /// The version id of checkpoint version.
134    pub checkpoint_version_id: IntGauge,
135    /// The smallest version id that is being pinned by worker nodes.
136    pub min_pinned_version_id: IntGauge,
137    /// The smallest version id that is being guarded by meta node safe points.
138    pub min_safepoint_version_id: IntGauge,
139    /// Compaction groups that is in write stop state.
140    pub write_stop_compaction_groups: IntGaugeVec,
141    /// The number of attempts to trigger full GC.
142    pub full_gc_trigger_count: IntGauge,
143    /// The number of candidate object to delete after scanning object store.
144    pub full_gc_candidate_object_count: Histogram,
145    /// The number of object to delete after filtering by meta node.
146    pub full_gc_selected_object_count: Histogram,
147    /// Hummock version stats
148    pub version_stats: IntGaugeVec,
149    /// Hummock version stats
150    pub materialized_view_stats: IntGaugeVec,
151    /// Total number of objects that is no longer referenced by versions.
152    pub stale_object_count: IntGauge,
153    /// Total size of objects that is no longer referenced by versions.
154    pub stale_object_size: IntGauge,
155    /// Total number of objects that is still referenced by non-current versions.
156    pub old_version_object_count: IntGauge,
157    /// Total size of objects that is still referenced by non-current versions.
158    pub old_version_object_size: IntGauge,
159    /// Total number of objects that is referenced by time travel.
160    pub time_travel_object_count: IntGauge,
161    /// Total number of objects that is referenced by current version.
162    pub current_version_object_count: IntGauge,
163    /// Total size of objects that is referenced by current version.
164    pub current_version_object_size: IntGauge,
165    /// Total number of objects that includes dangling objects.
166    pub total_object_count: IntGauge,
167    /// Total size of objects that includes dangling objects.
168    pub total_object_size: IntGauge,
169    /// Number of objects per table change log.
170    pub table_change_log_object_count: IntGaugeVec,
171    /// Size of objects per table change log.
172    pub table_change_log_object_size: IntGaugeVec,
173    /// Min epoch currently retained in table change log.
174    pub table_change_log_min_epoch: IntGaugeVec,
175    /// The number of hummock version delta log.
176    pub delta_log_count: IntGauge,
177    /// latency of version checkpoint
178    pub version_checkpoint_latency: Histogram,
179    /// Latency for hummock manager to acquire lock
180    pub hummock_manager_lock_time: HistogramVec,
181    /// Latency for hummock manager to really process a request after acquire the lock
182    pub hummock_manager_real_process_time: HistogramVec,
183    /// The number of compactions from one level to another level that have been skipped
184    pub compact_skip_frequency: IntCounterVec,
185    /// Bytes of lsm tree needed to reach balance
186    pub compact_pending_bytes: IntGaugeVec,
187    /// Per level compression ratio
188    pub compact_level_compression_ratio: GenericGaugeVec<AtomicF64>,
189    /// Per level number of running compaction task
190    pub level_compact_task_cnt: IntGaugeVec,
191    pub time_after_last_observation: Arc<AtomicU64>,
192    pub l0_compact_level_count: HistogramVec,
193    pub compact_task_size: HistogramVec,
194    pub compact_task_file_count: HistogramVec,
195    pub compact_task_batch_count: HistogramVec,
196    pub split_compaction_group_count: IntCounterVec,
197    pub state_table_count: IntGaugeVec,
198    pub branched_sst_count: IntGaugeVec,
199    pub compact_task_trivial_move_sst_count: HistogramVec,
200
201    pub compaction_event_consumed_latency: Histogram,
202    pub compaction_event_loop_iteration_latency: Histogram,
203    pub time_travel_vacuum_metadata_latency: Histogram,
204    pub time_travel_write_metadata_latency: Histogram,
205
206    // ********************************** Object Store ************************************
207    // Object store related metrics (for backup/restore and version checkpoint)
208    pub object_store_metric: Arc<ObjectStoreMetrics>,
209
210    // ********************************** Source ************************************
211    /// supervisor for which source is still up.
212    pub source_is_up: LabelGuardedIntGaugeVec,
213    pub source_enumerator_metrics: Arc<SourceEnumeratorMetrics>,
214
215    // ********************************** Fragment ************************************
216    /// A dummy gauge metrics with its label to be the mapping from actor id to fragment id
217    pub actor_info: IntGaugeVec,
218    /// A dummy gauge metrics with its label to be the mapping from table id to actor id
219    pub table_info: IntGaugeVec,
220    /// A dummy gauge metrics with its label to be the mapping from actor id to sink id
221    pub sink_info: IntGaugeVec,
222    /// A dummy gauge metrics with its label to be relation info
223    pub relation_info: IntGaugeVec,
224    /// Backfill progress per fragment
225    pub backfill_fragment_progress: IntGaugeVec,
226
227    // ********************************** System Params ************************************
228    /// A dummy gauge metric with labels carrying system parameter info.
229    /// Labels: (name, value)
230    pub system_param_info: IntGaugeVec,
231
232    /// Write throughput of commit epoch for each stable
233    pub table_write_throughput: IntCounterVec,
234
235    /// The number of compaction groups that have been triggered to move
236    pub merge_compaction_group_count: IntCounterVec,
237
238    // ********************************** Auto Schema Change ************************************
239    pub auto_schema_change_failure_cnt: LabelGuardedIntCounterVec,
240    pub auto_schema_change_success_cnt: LabelGuardedIntCounterVec,
241    pub auto_schema_change_latency: LabelGuardedHistogramVec,
242
243    pub time_travel_version_replay_latency: Histogram,
244
245    pub compaction_group_count: IntGauge,
246    pub compaction_group_size: IntGaugeVec,
247    pub compaction_group_file_count: IntGaugeVec,
248    pub compaction_group_throughput: IntGaugeVec,
249
250    // ********************************** Refresh Manager ************************************
251    pub refresh_job_duration: LabelGuardedUintGaugeVec,
252    pub refresh_job_finish_cnt: LabelGuardedIntCounterVec,
253    pub refresh_cron_job_trigger_cnt: LabelGuardedIntCounterVec,
254    pub refresh_cron_job_miss_cnt: LabelGuardedIntCounterVec,
255}
256
257pub static GLOBAL_META_METRICS: LazyLock<MetaMetrics> =
258    LazyLock::new(|| MetaMetrics::new(&GLOBAL_METRICS_REGISTRY));
259
260impl MetaMetrics {
261    fn new(registry: &Registry) -> Self {
262        let opts = histogram_opts!(
263            "meta_grpc_duration_seconds",
264            "gRPC latency of meta services",
265            exponential_buckets(0.0001, 2.0, 20).unwrap() // max 52s
266        );
267        let grpc_latency =
268            register_histogram_vec_with_registry!(opts, &["path"], registry).unwrap();
269
270        let opts = histogram_opts!(
271            "meta_barrier_duration_seconds",
272            "barrier latency",
273            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
274        );
275        let barrier_latency =
276            register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
277                .unwrap();
278
279        let opts = histogram_opts!(
280            "meta_barrier_wait_commit_duration_seconds",
281            "barrier_wait_commit_latency",
282            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
283        );
284        let barrier_wait_commit_latency =
285            register_histogram_with_registry!(opts, registry).unwrap();
286
287        let opts = histogram_opts!(
288            "meta_barrier_send_duration_seconds",
289            "barrier send latency",
290            exponential_buckets(0.1, 1.5, 19).unwrap() // max 148s
291        );
292        let barrier_send_latency =
293            register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
294                .unwrap();
295        let barrier_interval_by_database = register_gauge_vec_with_registry!(
296            "meta_barrier_interval_by_database",
297            "barrier interval of each database",
298            &["database_id"],
299            registry
300        )
301        .unwrap();
302
303        let all_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
304            "all_barrier_nums",
305            "num of of all_barrier",
306            &["database_id"],
307            registry
308        )
309        .unwrap();
310        let in_flight_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
311            "in_flight_barrier_nums",
312            "num of of in_flight_barrier",
313            &["database_id"],
314            registry
315        )
316        .unwrap();
317        let last_committed_barrier_time = register_int_gauge_vec_with_registry!(
318            "last_committed_barrier_time",
319            "The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.",
320            &["database_id"],
321            registry
322        )
323        .unwrap();
324
325        // snapshot backfill metrics
326        let opts = histogram_opts!(
327            "meta_snapshot_backfill_barrier_duration_seconds",
328            "snapshot backfill barrier latency",
329            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
330        );
331        let snapshot_backfill_barrier_latency = register_guarded_histogram_vec_with_registry!(
332            opts,
333            &["table_id", "barrier_type"],
334            registry
335        )
336        .unwrap();
337        let opts = histogram_opts!(
338            "meta_snapshot_backfill_barrier_wait_commit_duration_seconds",
339            "snapshot backfill barrier_wait_commit_latency",
340            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
341        );
342        let snapshot_backfill_wait_commit_latency =
343            register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
344
345        let snapshot_backfill_lag = register_guarded_int_gauge_vec_with_registry!(
346            "meta_snapshot_backfill_upstream_lag",
347            "snapshot backfill upstream_lag",
348            &["table_id"],
349            registry
350        )
351        .unwrap();
352        let snapshot_backfill_inflight_barrier_num = register_guarded_int_gauge_vec_with_registry!(
353            "meta_snapshot_backfill_inflight_barrier_num",
354            "snapshot backfill inflight_barrier_num",
355            &["table_id"],
356            registry
357        )
358        .unwrap();
359
360        let max_committed_epoch = register_int_gauge_with_registry!(
361            "storage_max_committed_epoch",
362            "max committed epoch",
363            registry
364        )
365        .unwrap();
366
367        let min_committed_epoch = register_int_gauge_with_registry!(
368            "storage_min_committed_epoch",
369            "min committed epoch",
370            registry
371        )
372        .unwrap();
373
374        let level_sst_num = register_int_gauge_vec_with_registry!(
375            "storage_level_sst_num",
376            "num of SSTs in each level",
377            &["level_index"],
378            registry
379        )
380        .unwrap();
381
382        let level_compact_cnt = register_int_gauge_vec_with_registry!(
383            "storage_level_compact_cnt",
384            "num of SSTs to be merged to next level in each level",
385            &["level_index"],
386            registry
387        )
388        .unwrap();
389
390        let compact_frequency = register_int_counter_vec_with_registry!(
391            "storage_level_compact_frequency",
392            "The number of compactions from one level to another level that have completed or failed.",
393            &["compactor", "group", "task_type", "result"],
394            registry
395        )
396        .unwrap();
397        let compact_skip_frequency = register_int_counter_vec_with_registry!(
398            "storage_skip_compact_frequency",
399            "The number of compactions from one level to another level that have been skipped.",
400            &["level", "type"],
401            registry
402        )
403        .unwrap();
404
405        let version_size =
406            register_int_gauge_with_registry!("storage_version_size", "version size", registry)
407                .unwrap();
408
409        let current_version_id = register_int_gauge_with_registry!(
410            "storage_current_version_id",
411            "current version id",
412            registry
413        )
414        .unwrap();
415
416        let checkpoint_version_id = register_int_gauge_with_registry!(
417            "storage_checkpoint_version_id",
418            "checkpoint version id",
419            registry
420        )
421        .unwrap();
422
423        let min_pinned_version_id = register_int_gauge_with_registry!(
424            "storage_min_pinned_version_id",
425            "min pinned version id",
426            registry
427        )
428        .unwrap();
429
430        let write_stop_compaction_groups = register_int_gauge_vec_with_registry!(
431            "storage_write_stop_compaction_groups",
432            "compaction groups of write stop state",
433            &["compaction_group_id"],
434            registry
435        )
436        .unwrap();
437
438        let full_gc_trigger_count = register_int_gauge_with_registry!(
439            "storage_full_gc_trigger_count",
440            "the number of attempts to trigger full GC",
441            registry
442        )
443        .unwrap();
444
445        let opts = histogram_opts!(
446            "storage_full_gc_candidate_object_count",
447            "the number of candidate object to delete after scanning object store",
448            exponential_buckets(1.0, 10.0, 6).unwrap()
449        );
450        let full_gc_candidate_object_count =
451            register_histogram_with_registry!(opts, registry).unwrap();
452
453        let opts = histogram_opts!(
454            "storage_full_gc_selected_object_count",
455            "the number of object to delete after filtering by meta node",
456            exponential_buckets(1.0, 10.0, 6).unwrap()
457        );
458        let full_gc_selected_object_count =
459            register_histogram_with_registry!(opts, registry).unwrap();
460
461        let min_safepoint_version_id = register_int_gauge_with_registry!(
462            "storage_min_safepoint_version_id",
463            "min safepoint version id",
464            registry
465        )
466        .unwrap();
467
468        let level_file_size = register_int_gauge_vec_with_registry!(
469            "storage_level_total_file_size",
470            "KBs total file bytes in each level",
471            &["level_index"],
472            registry
473        )
474        .unwrap();
475
476        let version_stats = register_int_gauge_vec_with_registry!(
477            "storage_version_stats",
478            "per table stats in current hummock version",
479            &["table_id", "metric"],
480            registry
481        )
482        .unwrap();
483
484        let materialized_view_stats = register_int_gauge_vec_with_registry!(
485            "storage_materialized_view_stats",
486            "per materialized view stats in current hummock version",
487            &["table_id", "metric"],
488            registry
489        )
490        .unwrap();
491
492        let stale_object_count = register_int_gauge_with_registry!(
493            "storage_stale_object_count",
494            "total number of objects that is no longer referenced by versions.",
495            registry
496        )
497        .unwrap();
498
499        let stale_object_size = register_int_gauge_with_registry!(
500            "storage_stale_object_size",
501            "total size of objects that is no longer referenced by versions.",
502            registry
503        )
504        .unwrap();
505
506        let old_version_object_count = register_int_gauge_with_registry!(
507            "storage_old_version_object_count",
508            "total number of objects that is still referenced by non-current versions",
509            registry
510        )
511        .unwrap();
512
513        let old_version_object_size = register_int_gauge_with_registry!(
514            "storage_old_version_object_size",
515            "total size of objects that is still referenced by non-current versions",
516            registry
517        )
518        .unwrap();
519
520        let current_version_object_count = register_int_gauge_with_registry!(
521            "storage_current_version_object_count",
522            "total number of objects that is referenced by current version",
523            registry
524        )
525        .unwrap();
526
527        let current_version_object_size = register_int_gauge_with_registry!(
528            "storage_current_version_object_size",
529            "total size of objects that is referenced by current version",
530            registry
531        )
532        .unwrap();
533
534        let total_object_count = register_int_gauge_with_registry!(
535            "storage_total_object_count",
536            "Total number of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
537            registry
538        ).unwrap();
539
540        let total_object_size = register_int_gauge_with_registry!(
541            "storage_total_object_size",
542            "Total size of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
543            registry
544        ).unwrap();
545
546        let table_change_log_object_count = register_int_gauge_vec_with_registry!(
547            "storage_table_change_log_object_count",
548            "per table change log object count",
549            &["table_id"],
550            registry
551        )
552        .unwrap();
553
554        let table_change_log_object_size = register_int_gauge_vec_with_registry!(
555            "storage_table_change_log_object_size",
556            "per table change log object size",
557            &["table_id"],
558            registry
559        )
560        .unwrap();
561
562        let table_change_log_min_epoch = register_int_gauge_vec_with_registry!(
563            "storage_table_change_log_min_epoch",
564            "min epoch currently retained in table change log",
565            &["table_id"],
566            registry
567        )
568        .unwrap();
569
570        let time_travel_object_count = register_int_gauge_with_registry!(
571            "storage_time_travel_object_count",
572            "total number of objects that is referenced by time travel.",
573            registry
574        )
575        .unwrap();
576
577        let delta_log_count = register_int_gauge_with_registry!(
578            "storage_delta_log_count",
579            "total number of hummock version delta log",
580            registry
581        )
582        .unwrap();
583
584        let opts = histogram_opts!(
585            "storage_version_checkpoint_latency",
586            "hummock version checkpoint latency",
587            exponential_buckets(0.1, 1.5, 20).unwrap()
588        );
589        let version_checkpoint_latency = register_histogram_with_registry!(opts, registry).unwrap();
590
591        let hummock_manager_lock_time = register_histogram_vec_with_registry!(
592            "hummock_manager_lock_time",
593            "latency for hummock manager to acquire the rwlock",
594            &["lock_name", "lock_type"],
595            registry
596        )
597        .unwrap();
598
599        let hummock_manager_real_process_time = register_histogram_vec_with_registry!(
600            "meta_hummock_manager_real_process_time",
601            "latency for hummock manager to really process the request",
602            &["method"],
603            registry
604        )
605        .unwrap();
606
607        let worker_num = register_int_gauge_vec_with_registry!(
608            "worker_num",
609            "number of nodes in the cluster",
610            &["worker_type"],
611            registry,
612        )
613        .unwrap();
614
615        let meta_type = register_int_gauge_vec_with_registry!(
616            "meta_num",
617            "role of meta nodes in the cluster",
618            &["worker_addr", "role"],
619            registry,
620        )
621        .unwrap();
622
623        let compact_pending_bytes = register_int_gauge_vec_with_registry!(
624            "storage_compact_pending_bytes",
625            "bytes of lsm tree needed to reach balance",
626            &["group"],
627            registry
628        )
629        .unwrap();
630
631        let compact_level_compression_ratio = register_gauge_vec_with_registry!(
632            "storage_compact_level_compression_ratio",
633            "compression ratio of each level of the lsm tree",
634            &["group", "level", "algorithm"],
635            registry
636        )
637        .unwrap();
638
639        let level_compact_task_cnt = register_int_gauge_vec_with_registry!(
640            "storage_level_compact_task_cnt",
641            "num of compact_task organized by group and level",
642            &["task"],
643            registry
644        )
645        .unwrap();
646
647        let time_travel_vacuum_metadata_latency = register_histogram_with_registry!(
648            histogram_opts!(
649                "storage_time_travel_vacuum_metadata_latency",
650                "Latency of vacuuming metadata for time travel",
651                exponential_buckets(0.1, 1.5, 20).unwrap()
652            ),
653            registry
654        )
655        .unwrap();
656        let time_travel_write_metadata_latency = register_histogram_with_registry!(
657            histogram_opts!(
658                "storage_time_travel_write_metadata_latency",
659                "Latency of writing metadata for time travel",
660                exponential_buckets(0.1, 1.5, 20).unwrap()
661            ),
662            registry
663        )
664        .unwrap();
665
666        let object_store_metric = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
667
668        let recovery_failure_cnt = register_int_counter_vec_with_registry!(
669            "recovery_failure_cnt",
670            "Number of failed recovery attempts",
671            &["recovery_type"],
672            registry
673        )
674        .unwrap();
675        let opts = histogram_opts!(
676            "recovery_latency",
677            "Latency of the recovery process",
678            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
679        );
680        let recovery_latency =
681            register_histogram_vec_with_registry!(opts, &["recovery_type"], registry).unwrap();
682
683        let auto_schema_change_failure_cnt = register_guarded_int_counter_vec_with_registry!(
684            "auto_schema_change_failure_cnt",
685            "Number of failed auto schema change",
686            &["table_id", "table_name"],
687            registry
688        )
689        .unwrap();
690
691        let auto_schema_change_success_cnt = register_guarded_int_counter_vec_with_registry!(
692            "auto_schema_change_success_cnt",
693            "Number of success auto schema change",
694            &["table_id", "table_name"],
695            registry
696        )
697        .unwrap();
698
699        let opts = histogram_opts!(
700            "auto_schema_change_latency",
701            "Latency of the auto schema change process",
702            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
703        );
704        let auto_schema_change_latency = register_guarded_histogram_vec_with_registry!(
705            opts,
706            &["table_id", "table_name"],
707            registry
708        )
709        .unwrap();
710
711        let source_is_up = register_guarded_int_gauge_vec_with_registry!(
712            "source_status_is_up",
713            "source is up or not",
714            &["source_id", "source_name"],
715            registry
716        )
717        .unwrap();
718        let source_enumerator_metrics = Arc::new(SourceEnumeratorMetrics::default());
719
720        let actor_info = register_int_gauge_vec_with_registry!(
721            "actor_info",
722            "Mapping from actor id to (fragment id, compute node)",
723            &["actor_id", "fragment_id", "compute_node"],
724            registry
725        )
726        .unwrap();
727
728        let table_info = register_int_gauge_vec_with_registry!(
729            "table_info",
730            "Mapping from table id to (actor id, table name)",
731            &[
732                "materialized_view_id",
733                "table_id",
734                "fragment_id",
735                "table_name",
736                "table_type",
737                "compaction_group_id"
738            ],
739            registry
740        )
741        .unwrap();
742
743        let sink_info = register_int_gauge_vec_with_registry!(
744            "sink_info",
745            "Mapping from actor id to (actor id, sink name)",
746            &["actor_id", "sink_id", "sink_name",],
747            registry
748        )
749        .unwrap();
750
751        let relation_info = register_int_gauge_vec_with_registry!(
752            "relation_info",
753            "Information of the database relation (table/source/sink/materialized view/index/internal)",
754            &["id", "database", "schema", "name", "resource_group", "type"],
755            registry
756        )
757        .unwrap();
758
759        let backfill_fragment_progress = register_int_gauge_vec_with_registry!(
760            "backfill_fragment_progress",
761            "Backfill progress per fragment",
762            &[
763                "job_id",
764                "fragment_id",
765                "backfill_state_table_id",
766                "backfill_target_relation_id",
767                "backfill_target_relation_name",
768                "backfill_target_relation_type",
769                "backfill_type",
770                "backfill_epoch",
771                "upstream_type",
772                "backfill_progress",
773            ],
774            registry
775        )
776        .unwrap();
777
778        let l0_compact_level_count = register_histogram_vec_with_registry!(
779            "storage_l0_compact_level_count",
780            "level_count of l0 compact task",
781            &["group", "type"],
782            registry
783        )
784        .unwrap();
785
786        // System parameter info
787        let system_param_info = register_int_gauge_vec_with_registry!(
788            "system_param_info",
789            "Information of system parameters",
790            &["name", "value"],
791            registry
792        )
793        .unwrap();
794
795        let opts = histogram_opts!(
796            "storage_compact_task_size",
797            "Total size of compact that have been issued to state store",
798            exponential_buckets(1048576.0, 2.0, 16).unwrap()
799        );
800
801        let compact_task_size =
802            register_histogram_vec_with_registry!(opts, &["group", "type"], registry).unwrap();
803
804        let compact_task_file_count = register_histogram_vec_with_registry!(
805            "storage_compact_task_file_count",
806            "file count of compact task",
807            &["group", "type"],
808            registry
809        )
810        .unwrap();
811        let opts = histogram_opts!(
812            "storage_compact_task_batch_count",
813            "count of compact task batch",
814            exponential_buckets(1.0, 2.0, 8).unwrap()
815        );
816        let compact_task_batch_count =
817            register_histogram_vec_with_registry!(opts, &["type"], registry).unwrap();
818
819        let table_write_throughput = register_int_counter_vec_with_registry!(
820            "storage_commit_write_throughput",
821            "The number of compactions from one level to another level that have been skipped.",
822            &["table_id"],
823            registry
824        )
825        .unwrap();
826
827        let split_compaction_group_count = register_int_counter_vec_with_registry!(
828            "storage_split_compaction_group_count",
829            "Count of trigger split compaction group",
830            &["group"],
831            registry
832        )
833        .unwrap();
834
835        let state_table_count = register_int_gauge_vec_with_registry!(
836            "storage_state_table_count",
837            "Count of stable table per compaction group",
838            &["group"],
839            registry
840        )
841        .unwrap();
842
843        let branched_sst_count = register_int_gauge_vec_with_registry!(
844            "storage_branched_sst_count",
845            "Count of branched sst per compaction group",
846            &["group"],
847            registry
848        )
849        .unwrap();
850
851        let opts = histogram_opts!(
852            "storage_compaction_event_consumed_latency",
853            "The latency(ms) of each event being consumed",
854            exponential_buckets(1.0, 1.5, 30).unwrap() // max 191s
855        );
856        let compaction_event_consumed_latency =
857            register_histogram_with_registry!(opts, registry).unwrap();
858
859        let opts = histogram_opts!(
860            "storage_compaction_event_loop_iteration_latency",
861            "The latency(ms) of each iteration of the compaction event loop",
862            exponential_buckets(1.0, 1.5, 30).unwrap() // max 191s
863        );
864        let compaction_event_loop_iteration_latency =
865            register_histogram_with_registry!(opts, registry).unwrap();
866
867        let merge_compaction_group_count = register_int_counter_vec_with_registry!(
868            "storage_merge_compaction_group_count",
869            "Count of trigger merge compaction group",
870            &["group"],
871            registry
872        )
873        .unwrap();
874
875        let opts = histogram_opts!(
876            "storage_time_travel_version_replay_latency",
877            "The latency(ms) of replaying a hummock version for time travel",
878            exponential_buckets(0.01, 10.0, 6).unwrap()
879        );
880        let time_travel_version_replay_latency =
881            register_histogram_with_registry!(opts, registry).unwrap();
882
883        let compaction_group_count = register_int_gauge_with_registry!(
884            "storage_compaction_group_count",
885            "The number of compaction groups",
886            registry,
887        )
888        .unwrap();
889
890        let compaction_group_size = register_int_gauge_vec_with_registry!(
891            "storage_compaction_group_size",
892            "The size of compaction group",
893            &["group"],
894            registry
895        )
896        .unwrap();
897
898        let compaction_group_file_count = register_int_gauge_vec_with_registry!(
899            "storage_compaction_group_file_count",
900            "The file count of compaction group",
901            &["group"],
902            registry
903        )
904        .unwrap();
905
906        let compaction_group_throughput = register_int_gauge_vec_with_registry!(
907            "storage_compaction_group_throughput",
908            "The throughput of compaction group",
909            &["group"],
910            registry
911        )
912        .unwrap();
913
914        let opts = histogram_opts!(
915            "storage_compact_task_trivial_move_sst_count",
916            "sst count of compact trivial-move task",
917            exponential_buckets(1.0, 2.0, 8).unwrap()
918        );
919        let compact_task_trivial_move_sst_count =
920            register_histogram_vec_with_registry!(opts, &["group"], registry).unwrap();
921
922        let refresh_job_duration = register_guarded_uint_gauge_vec_with_registry!(
923            "meta_refresh_job_duration",
924            "The duration of refresh job",
925            &["table_id", "status"],
926            registry
927        )
928        .unwrap();
929        let refresh_job_finish_cnt = register_guarded_int_counter_vec_with_registry!(
930            "meta_refresh_job_finish_cnt",
931            "The number of finished refresh jobs",
932            &["table_id", "status"],
933            registry
934        )
935        .unwrap();
936        let refresh_cron_job_trigger_cnt = register_guarded_int_counter_vec_with_registry!(
937            "meta_refresh_cron_job_trigger_cnt",
938            "The number of cron refresh jobs triggered",
939            &["table_id"],
940            registry
941        )
942        .unwrap();
943        let refresh_cron_job_miss_cnt = register_guarded_int_counter_vec_with_registry!(
944            "meta_refresh_cron_job_miss_cnt",
945            "The number of cron refresh jobs missed",
946            &["table_id"],
947            registry
948        )
949        .unwrap();
950
951        Self {
952            grpc_latency,
953            barrier_latency,
954            barrier_wait_commit_latency,
955            barrier_send_latency,
956            all_barrier_nums,
957            in_flight_barrier_nums,
958            last_committed_barrier_time,
959            barrier_interval_by_database,
960            snapshot_backfill_barrier_latency,
961            snapshot_backfill_wait_commit_latency,
962            snapshot_backfill_lag,
963            snapshot_backfill_inflight_barrier_num,
964            recovery_failure_cnt,
965            recovery_latency,
966
967            max_committed_epoch,
968            min_committed_epoch,
969            level_sst_num,
970            level_compact_cnt,
971            compact_frequency,
972            compact_skip_frequency,
973            level_file_size,
974            version_size,
975            version_stats,
976            materialized_view_stats,
977            stale_object_count,
978            stale_object_size,
979            old_version_object_count,
980            old_version_object_size,
981            time_travel_object_count,
982            current_version_object_count,
983            current_version_object_size,
984            total_object_count,
985            total_object_size,
986            table_change_log_object_count,
987            table_change_log_object_size,
988            table_change_log_min_epoch,
989            delta_log_count,
990            version_checkpoint_latency,
991            current_version_id,
992            checkpoint_version_id,
993            min_pinned_version_id,
994            min_safepoint_version_id,
995            write_stop_compaction_groups,
996            full_gc_trigger_count,
997            full_gc_candidate_object_count,
998            full_gc_selected_object_count,
999            hummock_manager_lock_time,
1000            hummock_manager_real_process_time,
1001            time_after_last_observation: Arc::new(AtomicU64::new(0)),
1002            worker_num,
1003            meta_type,
1004            compact_pending_bytes,
1005            compact_level_compression_ratio,
1006            level_compact_task_cnt,
1007            object_store_metric,
1008            source_is_up,
1009            source_enumerator_metrics,
1010            actor_info,
1011            table_info,
1012            sink_info,
1013            relation_info,
1014            backfill_fragment_progress,
1015            system_param_info,
1016            l0_compact_level_count,
1017            compact_task_size,
1018            compact_task_file_count,
1019            compact_task_batch_count,
1020            compact_task_trivial_move_sst_count,
1021            table_write_throughput,
1022            split_compaction_group_count,
1023            state_table_count,
1024            branched_sst_count,
1025            compaction_event_consumed_latency,
1026            compaction_event_loop_iteration_latency,
1027            auto_schema_change_failure_cnt,
1028            auto_schema_change_success_cnt,
1029            auto_schema_change_latency,
1030            merge_compaction_group_count,
1031            time_travel_version_replay_latency,
1032            compaction_group_count,
1033            compaction_group_size,
1034            compaction_group_file_count,
1035            compaction_group_throughput,
1036            refresh_job_duration,
1037            refresh_job_finish_cnt,
1038            refresh_cron_job_trigger_cnt,
1039            refresh_cron_job_miss_cnt,
1040            time_travel_vacuum_metadata_latency,
1041            time_travel_write_metadata_latency,
1042        }
1043    }
1044
1045    #[cfg(test)]
1046    pub fn for_test(registry: &Registry) -> Self {
1047        Self::new(registry)
1048    }
1049}
1050impl Default for MetaMetrics {
1051    fn default() -> Self {
1052        GLOBAL_META_METRICS.clone()
1053    }
1054}
1055
1056/// Refresh `system_param_info` metrics by reading current system parameters.
1057pub async fn refresh_system_param_info_metrics(
1058    system_params_controller: &SystemParamsControllerRef,
1059    meta_metrics: Arc<MetaMetrics>,
1060) {
1061    let params_info = system_params_controller.get_params().await.get_all();
1062
1063    meta_metrics.system_param_info.reset();
1064    for info in params_info {
1065        meta_metrics
1066            .system_param_info
1067            .with_label_values(&[info.name, &info.value])
1068            .set(1);
1069    }
1070}
1071
1072pub fn start_worker_info_monitor(
1073    metadata_manager: MetadataManager,
1074    election_client: ElectionClientRef,
1075    interval: Duration,
1076    meta_metrics: Arc<MetaMetrics>,
1077) -> (JoinHandle<()>, Sender<()>) {
1078    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1079    let join_handle = tokio::spawn(async move {
1080        let mut monitor_interval = tokio::time::interval(interval);
1081        monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1082        loop {
1083            tokio::select! {
1084                // Wait for interval
1085                _ = monitor_interval.tick() => {},
1086                // Shutdown monitor
1087                _ = &mut shutdown_rx => {
1088                    tracing::info!("Worker number monitor is stopped");
1089                    return;
1090                }
1091            }
1092
1093            let node_map = match metadata_manager.count_worker_node().await {
1094                Ok(node_map) => node_map,
1095                Err(err) => {
1096                    tracing::warn!(error = %err.as_report(), "fail to count worker node");
1097                    continue;
1098                }
1099            };
1100
1101            // Reset metrics to clean the stale labels e.g. invalid lease ids
1102            meta_metrics.worker_num.reset();
1103            meta_metrics.meta_type.reset();
1104
1105            for (worker_type, worker_num) in node_map {
1106                meta_metrics
1107                    .worker_num
1108                    .with_label_values(&[(worker_type.as_str_name())])
1109                    .set(worker_num as i64);
1110            }
1111            if let Ok(meta_members) = election_client.get_members().await {
1112                meta_metrics
1113                    .worker_num
1114                    .with_label_values(&[WorkerType::Meta.as_str_name()])
1115                    .set(meta_members.len() as i64);
1116                meta_members.into_iter().for_each(|m| {
1117                    let role = if m.is_leader { "leader" } else { "follower" };
1118                    meta_metrics
1119                        .meta_type
1120                        .with_label_values(&[m.id.as_str(), role])
1121                        .set(1);
1122                });
1123            }
1124        }
1125    });
1126
1127    (join_handle, shutdown_tx)
1128}
1129
1130pub async fn refresh_fragment_info_metrics(
1131    catalog_controller: &CatalogControllerRef,
1132    cluster_controller: &ClusterControllerRef,
1133    hummock_manager: &HummockManagerRef,
1134    meta_metrics: Arc<MetaMetrics>,
1135) {
1136    let worker_nodes = match cluster_controller
1137        .list_workers(Some(WorkerType::ComputeNode.into()), None)
1138        .await
1139    {
1140        Ok(worker_nodes) => worker_nodes,
1141        Err(err) => {
1142            tracing::warn!(error=%err.as_report(), "fail to list worker node");
1143            return;
1144        }
1145    };
1146    let actor_locations = match catalog_controller.list_actor_locations() {
1147        Ok(actor_locations) => actor_locations,
1148        Err(err) => {
1149            tracing::warn!(error=%err.as_report(), "fail to get actor locations");
1150            return;
1151        }
1152    };
1153    let sink_actor_mapping = match catalog_controller.list_sink_actor_mapping().await {
1154        Ok(sink_actor_mapping) => sink_actor_mapping,
1155        Err(err) => {
1156            tracing::warn!(error=%err.as_report(), "fail to get sink actor mapping");
1157            return;
1158        }
1159    };
1160    let fragment_state_tables = match catalog_controller.list_fragment_state_tables().await {
1161        Ok(fragment_state_tables) => fragment_state_tables,
1162        Err(err) => {
1163            tracing::warn!(error=%err.as_report(), "fail to get fragment state tables");
1164            return;
1165        }
1166    };
1167    let table_name_and_type_mapping = match catalog_controller.get_table_name_type_mapping().await {
1168        Ok(mapping) => mapping,
1169        Err(err) => {
1170            tracing::warn!(error=%err.as_report(), "fail to get table name mapping");
1171            return;
1172        }
1173    };
1174
1175    let worker_addr_mapping: HashMap<WorkerId, String> = worker_nodes
1176        .into_iter()
1177        .map(|worker_node| {
1178            let addr = match worker_node.host {
1179                Some(host) => format!("{}:{}", host.host, host.port),
1180                None => "".to_owned(),
1181            };
1182            (worker_node.id, addr)
1183        })
1184        .collect();
1185    let table_compaction_group_id_mapping = hummock_manager
1186        .get_table_compaction_group_id_mapping()
1187        .await;
1188
1189    // Start fresh with a reset to clear all outdated labels. This is safe since we always
1190    // report full info on each interval.
1191    meta_metrics.actor_info.reset();
1192    meta_metrics.table_info.reset();
1193    meta_metrics.sink_info.reset();
1194    for actor_location in actor_locations {
1195        let actor_id_str = actor_location.actor_id.to_string();
1196        let fragment_id_str = actor_location.fragment_id.to_string();
1197        // Report a dummy gauge metrics with (fragment id, actor id, node
1198        // address) as its label
1199        if let Some(address) = worker_addr_mapping.get(&actor_location.worker_id) {
1200            meta_metrics
1201                .actor_info
1202                .with_label_values(&[&actor_id_str, &fragment_id_str, address])
1203                .set(1);
1204        }
1205    }
1206    for (sink_id, (sink_name, actor_ids)) in sink_actor_mapping {
1207        let sink_id_str = sink_id.to_string();
1208        for actor_id in actor_ids {
1209            let actor_id_str = actor_id.to_string();
1210            meta_metrics
1211                .sink_info
1212                .with_label_values(&[&actor_id_str, &sink_id_str, &sink_name])
1213                .set(1);
1214        }
1215    }
1216    for PartialFragmentStateTables {
1217        fragment_id,
1218        job_id,
1219        state_table_ids,
1220    } in fragment_state_tables
1221    {
1222        let fragment_id_str = fragment_id.to_string();
1223        let job_id_str = job_id.to_string();
1224        for table_id in state_table_ids.into_inner() {
1225            let table_id_str = table_id.to_string();
1226            let (table_name, table_type) = table_name_and_type_mapping
1227                .get(&table_id)
1228                .cloned()
1229                .unwrap_or_else(|| ("unknown".to_owned(), "unknown".to_owned()));
1230            let compaction_group_id = table_compaction_group_id_mapping
1231                .get(&table_id)
1232                .map(|cg_id| cg_id.to_string())
1233                .unwrap_or_else(|| "unknown".to_owned());
1234            meta_metrics
1235                .table_info
1236                .with_label_values(&[
1237                    &job_id_str,
1238                    &table_id_str,
1239                    &fragment_id_str,
1240                    &table_name,
1241                    &table_type,
1242                    &compaction_group_id,
1243                ])
1244                .set(1);
1245        }
1246    }
1247}
1248
1249pub async fn refresh_relation_info_metrics(
1250    catalog_controller: &CatalogControllerRef,
1251    meta_metrics: Arc<MetaMetrics>,
1252) {
1253    let table_objects = match catalog_controller.list_table_objects().await {
1254        Ok(table_objects) => table_objects,
1255        Err(err) => {
1256            tracing::warn!(error=%err.as_report(), "fail to get table objects");
1257            return;
1258        }
1259    };
1260
1261    let source_objects = match catalog_controller.list_source_objects().await {
1262        Ok(source_objects) => source_objects,
1263        Err(err) => {
1264            tracing::warn!(error=%err.as_report(), "fail to get source objects");
1265            return;
1266        }
1267    };
1268
1269    let sink_objects = match catalog_controller.list_sink_objects().await {
1270        Ok(sink_objects) => sink_objects,
1271        Err(err) => {
1272            tracing::warn!(error=%err.as_report(), "fail to get sink objects");
1273            return;
1274        }
1275    };
1276
1277    meta_metrics.relation_info.reset();
1278
1279    for (id, db, schema, name, resource_group, table_type) in table_objects {
1280        let relation_type = match table_type {
1281            TableType::Table => "table",
1282            TableType::MaterializedView => "materialized_view",
1283            TableType::Index | TableType::VectorIndex => "index",
1284            TableType::Internal => "internal",
1285        };
1286        meta_metrics
1287            .relation_info
1288            .with_label_values(&[
1289                &id.to_string(),
1290                &db,
1291                &schema,
1292                &name,
1293                &resource_group,
1294                &relation_type.to_owned(),
1295            ])
1296            .set(1);
1297    }
1298
1299    for (id, db, schema, name, resource_group) in source_objects {
1300        meta_metrics
1301            .relation_info
1302            .with_label_values(&[
1303                &id.to_string(),
1304                &db,
1305                &schema,
1306                &name,
1307                &resource_group,
1308                &"source".to_owned(),
1309            ])
1310            .set(1);
1311    }
1312
1313    for (id, db, schema, name, resource_group) in sink_objects {
1314        meta_metrics
1315            .relation_info
1316            .with_label_values(&[
1317                &id.to_string(),
1318                &db,
1319                &schema,
1320                &name,
1321                &resource_group,
1322                &"sink".to_owned(),
1323            ])
1324            .set(1);
1325    }
1326}
1327
1328fn extract_backfill_fragment_info(
1329    distribution: &FragmentDistribution,
1330) -> Option<BackfillFragmentInfo> {
1331    let backfill_type =
1332        if distribution.fragment_type_mask & FragmentTypeFlag::SourceScan as u32 != 0 {
1333            "SOURCE"
1334        } else if distribution.fragment_type_mask
1335            & (FragmentTypeFlag::SnapshotBackfillStreamScan as u32
1336                | FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32)
1337            != 0
1338        {
1339            "SNAPSHOT_BACKFILL"
1340        } else if distribution.fragment_type_mask & FragmentTypeFlag::StreamScan as u32 != 0 {
1341            "ARRANGEMENT_OR_NO_SHUFFLE"
1342        } else {
1343            return None;
1344        };
1345
1346    let stream_node = distribution.node.as_ref()?;
1347    let mut info = None;
1348    match backfill_type {
1349        "SOURCE" => {
1350            visit_stream_node_source_backfill(stream_node, |node| {
1351                info = Some(BackfillFragmentInfo {
1352                    job_id: distribution.table_id.as_raw_id(),
1353                    fragment_id: distribution.fragment_id.as_raw_id(),
1354                    backfill_state_table_id: node
1355                        .state_table
1356                        .as_ref()
1357                        .map(|table| table.id.as_raw_id())
1358                        .unwrap_or_default(),
1359                    backfill_target_relation_id: node.upstream_source_id.as_raw_id(),
1360                    backfill_type,
1361                    backfill_epoch: 0,
1362                });
1363            });
1364        }
1365        "SNAPSHOT_BACKFILL" | "ARRANGEMENT_OR_NO_SHUFFLE" => {
1366            visit_stream_node_stream_scan(stream_node, |node| {
1367                info = Some(BackfillFragmentInfo {
1368                    job_id: distribution.table_id.as_raw_id(),
1369                    fragment_id: distribution.fragment_id.as_raw_id(),
1370                    backfill_state_table_id: node
1371                        .state_table
1372                        .as_ref()
1373                        .map(|table| table.id.as_raw_id())
1374                        .unwrap_or_default(),
1375                    backfill_target_relation_id: node.table_id.as_raw_id(),
1376                    backfill_type,
1377                    backfill_epoch: node.snapshot_backfill_epoch.unwrap_or_default(),
1378                });
1379            });
1380        }
1381        _ => {}
1382    }
1383
1384    info
1385}
1386
1387pub async fn refresh_backfill_progress_metrics(
1388    catalog_controller: &CatalogControllerRef,
1389    hummock_manager: &HummockManagerRef,
1390    barrier_manager: &BarrierManagerRef,
1391    meta_metrics: Arc<MetaMetrics>,
1392) {
1393    let fragment_descs = match catalog_controller.list_fragment_descs_with_node(true).await {
1394        Ok(fragment_descs) => fragment_descs,
1395        Err(err) => {
1396            tracing::warn!(error=%err.as_report(), "fail to list creating fragment descs");
1397            return;
1398        }
1399    };
1400
1401    let backfill_infos: HashMap<(u32, u32), BackfillFragmentInfo> = fragment_descs
1402        .iter()
1403        .filter_map(|(distribution, _)| extract_backfill_fragment_info(distribution))
1404        .map(|info| ((info.job_id, info.fragment_id), info))
1405        .collect();
1406
1407    let fragment_progresses = match barrier_manager.get_fragment_backfill_progress().await {
1408        Ok(progress) => progress,
1409        Err(err) => {
1410            tracing::warn!(error=%err.as_report(), "fail to get fragment backfill progress");
1411            return;
1412        }
1413    };
1414
1415    let progress_by_fragment: HashMap<(u32, u32), _> = fragment_progresses
1416        .into_iter()
1417        .map(|progress| {
1418            (
1419                (
1420                    progress.job_id.as_raw_id(),
1421                    progress.fragment_id.as_raw_id(),
1422                ),
1423                progress,
1424            )
1425        })
1426        .collect();
1427
1428    let version_stats = hummock_manager.get_version_stats().await;
1429
1430    let relation_ids: HashSet<_> = backfill_infos
1431        .values()
1432        .map(|info| ObjectId::new(info.backfill_target_relation_id))
1433        .collect();
1434    let relation_objects = match catalog_controller
1435        .list_relation_objects_by_ids(&relation_ids)
1436        .await
1437    {
1438        Ok(relation_objects) => relation_objects,
1439        Err(err) => {
1440            tracing::warn!(error=%err.as_report(), "fail to get relation objects");
1441            return;
1442        }
1443    };
1444
1445    let mut relation_info = HashMap::new();
1446    for (id, db, schema, name, rel_type) in relation_objects {
1447        relation_info.insert(id.as_raw_id(), (db, schema, name, rel_type));
1448    }
1449
1450    meta_metrics.backfill_fragment_progress.reset();
1451
1452    for info in backfill_infos.values() {
1453        let progress = progress_by_fragment.get(&(info.job_id, info.fragment_id));
1454        let (db, schema, name, rel_type) = relation_info
1455            .get(&info.backfill_target_relation_id)
1456            .cloned()
1457            .unwrap_or_else(|| {
1458                (
1459                    "unknown".to_owned(),
1460                    "unknown".to_owned(),
1461                    "unknown".to_owned(),
1462                    "unknown".to_owned(),
1463                )
1464            });
1465
1466        let job_id_str = info.job_id.to_string();
1467        let fragment_id_str = info.fragment_id.to_string();
1468        let backfill_state_table_id_str = info.backfill_state_table_id.to_string();
1469        let backfill_target_relation_id_str = info.backfill_target_relation_id.to_string();
1470        let backfill_target_relation_name_str = format!("{db}.{schema}.{name}");
1471        let backfill_target_relation_type_str = rel_type;
1472        let backfill_type_str = info.backfill_type.to_owned();
1473        let backfill_epoch_str = info.backfill_epoch.to_string();
1474        let total_key_count = version_stats
1475            .table_stats
1476            .get(&TableId::new(info.backfill_target_relation_id))
1477            .map(|stats| stats.total_key_count as u64);
1478
1479        let progress_label = match (info.backfill_type, progress) {
1480            ("SOURCE", Some(progress)) => format!("{} consumed rows", progress.consumed_rows),
1481            ("SOURCE", None) => "0 consumed rows".to_owned(),
1482            (_, Some(progress)) if progress.done => {
1483                let total = total_key_count.unwrap_or(0);
1484                format!("100.0000% ({}/{})", total, total)
1485            }
1486            (_, Some(progress)) => {
1487                let total = total_key_count.unwrap_or(0);
1488                if total == 0 {
1489                    "100.0000% (0/0)".to_owned()
1490                } else {
1491                    let raw = (progress.consumed_rows as f64) / (total as f64) * 100.0;
1492                    format!(
1493                        "{:.4}% ({}/{})",
1494                        raw.min(100.0),
1495                        progress.consumed_rows,
1496                        total
1497                    )
1498                }
1499            }
1500            (_, None) => "0.0000% (0/0)".to_owned(),
1501        };
1502        let upstream_type_str = progress
1503            .map(|progress| progress.upstream_type.to_string())
1504            .unwrap_or_else(|| "Unknown".to_owned());
1505
1506        meta_metrics
1507            .backfill_fragment_progress
1508            .with_label_values(&[
1509                &job_id_str,
1510                &fragment_id_str,
1511                &backfill_state_table_id_str,
1512                &backfill_target_relation_id_str,
1513                &backfill_target_relation_name_str,
1514                &backfill_target_relation_type_str,
1515                &backfill_type_str,
1516                &backfill_epoch_str,
1517                &upstream_type_str,
1518                &progress_label,
1519            ])
1520            .set(1);
1521    }
1522}
1523
1524pub fn start_info_monitor(
1525    metadata_manager: MetadataManager,
1526    hummock_manager: HummockManagerRef,
1527    barrier_manager: BarrierManagerRef,
1528    system_params_controller: SystemParamsControllerRef,
1529    meta_metrics: Arc<MetaMetrics>,
1530) -> (JoinHandle<()>, Sender<()>) {
1531    const COLLECT_INTERVAL_SECONDS: u64 = 60;
1532
1533    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1534    let join_handle = tokio::spawn(async move {
1535        let mut monitor_interval =
1536            tokio::time::interval(Duration::from_secs(COLLECT_INTERVAL_SECONDS));
1537        monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1538        loop {
1539            tokio::select! {
1540                // Wait for interval
1541                _ = monitor_interval.tick() => {},
1542                // Shutdown monitor
1543                _ = &mut shutdown_rx => {
1544                    tracing::info!("Meta info monitor is stopped");
1545                    return;
1546                }
1547            }
1548
1549            // Fragment and relation info
1550            refresh_fragment_info_metrics(
1551                &metadata_manager.catalog_controller,
1552                &metadata_manager.cluster_controller,
1553                &hummock_manager,
1554                meta_metrics.clone(),
1555            )
1556            .await;
1557
1558            refresh_relation_info_metrics(
1559                &metadata_manager.catalog_controller,
1560                meta_metrics.clone(),
1561            )
1562            .await;
1563
1564            refresh_backfill_progress_metrics(
1565                &metadata_manager.catalog_controller,
1566                &hummock_manager,
1567                &barrier_manager,
1568                meta_metrics.clone(),
1569            )
1570            .await;
1571
1572            // System parameter info
1573            refresh_system_param_info_metrics(&system_params_controller, meta_metrics.clone())
1574                .await;
1575        }
1576    });
1577
1578    (join_handle, shutdown_tx)
1579}