risingwave_meta/rpc/
metrics.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::atomic::AtomicU64;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use prometheus::core::{AtomicF64, GenericGaugeVec};
21use prometheus::{
22    GaugeVec, Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
23    exponential_buckets, histogram_opts, register_gauge_vec_with_registry,
24    register_histogram_vec_with_registry, register_histogram_with_registry,
25    register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
26    register_int_gauge_with_registry,
27};
28use risingwave_common::catalog::{FragmentTypeFlag, TableId};
29use risingwave_common::metrics::{
30    LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
31    LabelGuardedUintGaugeVec,
32};
33use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
34use risingwave_common::system_param::reader::SystemParamsRead;
35use risingwave_common::util::stream_graph_visitor::{
36    visit_stream_node_source_backfill, visit_stream_node_stream_scan,
37};
38use risingwave_common::{
39    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
40    register_guarded_int_gauge_vec_with_registry, register_guarded_uint_gauge_vec_with_registry,
41};
42use risingwave_connector::source::monitor::EnumeratorMetrics as SourceEnumeratorMetrics;
43use risingwave_meta_model::table::TableType;
44use risingwave_meta_model::{ObjectId, WorkerId};
45use risingwave_object_store::object::object_metrics::{
46    GLOBAL_OBJECT_STORE_METRICS, ObjectStoreMetrics,
47};
48use risingwave_pb::common::WorkerType;
49use risingwave_pb::meta::FragmentDistribution;
50use thiserror_ext::AsReport;
51use tokio::sync::oneshot::Sender;
52use tokio::task::JoinHandle;
53
54use crate::barrier::BarrierManagerRef;
55use crate::controller::catalog::CatalogControllerRef;
56use crate::controller::cluster::ClusterControllerRef;
57use crate::controller::system_param::SystemParamsControllerRef;
58use crate::controller::utils::PartialFragmentStateTables;
59use crate::hummock::HummockManagerRef;
60use crate::manager::MetadataManager;
61use crate::rpc::ElectionClientRef;
62
63struct BackfillFragmentInfo {
64    job_id: u32,
65    fragment_id: u32,
66    backfill_state_table_id: u32,
67    backfill_target_relation_id: u32,
68    backfill_type: &'static str,
69    backfill_epoch: u64,
70}
71
72#[derive(Clone)]
73pub struct MetaMetrics {
74    // ********************************** Meta ************************************
75    /// The number of workers in the cluster.
76    pub worker_num: IntGaugeVec,
77    /// The roles of all meta nodes in the cluster.
78    pub meta_type: IntGaugeVec,
79
80    // ********************************** gRPC ************************************
81    /// gRPC latency of meta services
82    pub grpc_latency: HistogramVec,
83
84    // ********************************** Barrier ************************************
85    /// The duration from barrier injection to commit
86    /// It is the sum of inflight-latency, sync-latency and wait-commit-latency
87    pub barrier_latency: LabelGuardedHistogramVec,
88    /// The duration from barrier complete to commit
89    pub barrier_wait_commit_latency: Histogram,
90    /// Latency between each barrier send
91    pub barrier_send_latency: LabelGuardedHistogramVec,
92    /// The number of all barriers. It is the sum of barriers that are in-flight or completed but
93    /// waiting for other barriers
94    pub all_barrier_nums: LabelGuardedIntGaugeVec,
95    /// The number of in-flight barriers
96    pub in_flight_barrier_nums: LabelGuardedIntGaugeVec,
97    /// The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.
98    pub last_committed_barrier_time: IntGaugeVec,
99    /// The barrier interval of each database
100    pub barrier_interval_by_database: GaugeVec,
101
102    // ********************************** Snapshot Backfill ***************************
103    /// The barrier latency in second of `table_id` and snapshto backfill `barrier_type`
104    pub snapshot_backfill_barrier_latency: LabelGuardedHistogramVec, // (table_id, barrier_type)
105    /// The lags between the upstream epoch and the downstream epoch.
106    pub snapshot_backfill_lag: LabelGuardedIntGaugeVec, // (table_id, )
107    /// The number of inflight barriers of `table_id`
108    pub snapshot_backfill_inflight_barrier_num: LabelGuardedIntGaugeVec, // (table_id, _)
109
110    // ********************************** Recovery ************************************
111    pub recovery_failure_cnt: IntCounterVec,
112    pub recovery_latency: HistogramVec,
113
114    // ********************************** Hummock ************************************
115    /// Max committed epoch
116    pub max_committed_epoch: IntGauge,
117    /// Min committed epoch
118    pub min_committed_epoch: IntGauge,
119    /// The number of SSTs in each level
120    pub level_sst_num: IntGaugeVec,
121    /// The number of SSTs to be merged to next level in each level
122    pub level_compact_cnt: IntGaugeVec,
123    /// The number of compact tasks
124    pub compact_frequency: IntCounterVec,
125    /// Size of each level
126    pub level_file_size: IntGaugeVec,
127    /// Hummock version size
128    pub version_size: IntGauge,
129    /// The version Id of current version.
130    pub current_version_id: IntGauge,
131    /// The version id of checkpoint version.
132    pub checkpoint_version_id: IntGauge,
133    /// The smallest version id that is being pinned by worker nodes.
134    pub min_pinned_version_id: IntGauge,
135    /// The smallest version id that is being guarded by meta node safe points.
136    pub min_safepoint_version_id: IntGauge,
137    /// Compaction groups that is in write stop state.
138    pub write_stop_compaction_groups: IntGaugeVec,
139    /// The number of attempts to trigger full GC.
140    pub full_gc_trigger_count: IntGauge,
141    /// The number of candidate object to delete after scanning object store.
142    pub full_gc_candidate_object_count: Histogram,
143    /// The number of object to delete after filtering by meta node.
144    pub full_gc_selected_object_count: Histogram,
145    /// Hummock version stats
146    pub version_stats: IntGaugeVec,
147    /// Hummock version stats
148    pub materialized_view_stats: IntGaugeVec,
149    /// Total number of objects that is no longer referenced by versions.
150    pub stale_object_count: IntGauge,
151    /// Total size of objects that is no longer referenced by versions.
152    pub stale_object_size: IntGauge,
153    /// Total number of objects that is still referenced by non-current versions.
154    pub old_version_object_count: IntGauge,
155    /// Total size of objects that is still referenced by non-current versions.
156    pub old_version_object_size: IntGauge,
157    /// Total number of objects that is referenced by time travel.
158    pub time_travel_object_count: IntGauge,
159    /// Total number of objects that is referenced by current version.
160    pub current_version_object_count: IntGauge,
161    /// Total size of objects that is referenced by current version.
162    pub current_version_object_size: IntGauge,
163    /// Total number of objects that includes dangling objects.
164    pub total_object_count: IntGauge,
165    /// Total size of objects that includes dangling objects.
166    pub total_object_size: IntGauge,
167    /// Number of objects per table change log.
168    pub table_change_log_object_count: IntGaugeVec,
169    /// Size of objects per table change log.
170    pub table_change_log_object_size: IntGaugeVec,
171    /// Min epoch currently retained in table change log.
172    pub table_change_log_min_epoch: IntGaugeVec,
173    /// The number of hummock version delta log.
174    pub delta_log_count: IntGauge,
175    /// latency of version checkpoint
176    pub version_checkpoint_latency: Histogram,
177    /// Latency for hummock manager to acquire lock
178    pub hummock_manager_lock_time: HistogramVec,
179    /// Latency for hummock manager to really process a request after acquire the lock
180    pub hummock_manager_real_process_time: HistogramVec,
181    /// The number of compactions from one level to another level that have been skipped
182    pub compact_skip_frequency: IntCounterVec,
183    /// Bytes of lsm tree needed to reach balance
184    pub compact_pending_bytes: IntGaugeVec,
185    /// Per level compression ratio
186    pub compact_level_compression_ratio: GenericGaugeVec<AtomicF64>,
187    /// Per level number of running compaction task
188    pub level_compact_task_cnt: IntGaugeVec,
189    pub time_after_last_observation: Arc<AtomicU64>,
190    pub l0_compact_level_count: HistogramVec,
191    pub compact_task_size: HistogramVec,
192    pub compact_task_file_count: HistogramVec,
193    pub compact_task_batch_count: HistogramVec,
194    pub split_compaction_group_count: IntCounterVec,
195    pub state_table_count: IntGaugeVec,
196    pub branched_sst_count: IntGaugeVec,
197    pub compact_task_trivial_move_sst_count: HistogramVec,
198
199    pub compaction_event_consumed_latency: Histogram,
200    pub compaction_event_loop_iteration_latency: Histogram,
201    pub time_travel_vacuum_metadata_latency: Histogram,
202    pub time_travel_write_metadata_latency: Histogram,
203
204    // ********************************** Object Store ************************************
205    // Object store related metrics (for backup/restore and version checkpoint)
206    pub object_store_metric: Arc<ObjectStoreMetrics>,
207
208    // ********************************** Source ************************************
209    /// supervisor for which source is still up.
210    pub source_is_up: LabelGuardedIntGaugeVec,
211    pub source_enumerator_metrics: Arc<SourceEnumeratorMetrics>,
212
213    // ********************************** Fragment ************************************
214    /// A dummy gauge metrics with its label to be the mapping from actor id to fragment id
215    pub actor_info: IntGaugeVec,
216    /// A dummy gauge metrics with its label to be the mapping from table id to actor id
217    pub table_info: IntGaugeVec,
218    /// A dummy gauge metrics with its label to be the mapping from actor id to sink id
219    pub sink_info: IntGaugeVec,
220    /// A dummy gauge metrics with its label to be relation info
221    pub relation_info: IntGaugeVec,
222    /// A dummy gauge metrics with its label to be the mapping from database id to database name
223    pub database_info: IntGaugeVec,
224    /// Backfill progress per fragment
225    pub backfill_fragment_progress: IntGaugeVec,
226    /// Max subscription retention configured for the table's changelog.
227    pub streaming_table_change_log_retention_seconds: IntGaugeVec,
228
229    // ********************************** System Params ************************************
230    /// A dummy gauge metric with labels carrying system parameter info.
231    /// Labels: (name, value)
232    pub system_param_info: IntGaugeVec,
233
234    /// Write throughput of commit epoch for each stable
235    pub table_write_throughput: IntCounterVec,
236
237    /// The number of compaction groups that have been triggered to move
238    pub merge_compaction_group_count: IntCounterVec,
239
240    // ********************************** Auto Schema Change ************************************
241    pub auto_schema_change_failure_cnt: LabelGuardedIntCounterVec,
242    pub auto_schema_change_success_cnt: LabelGuardedIntCounterVec,
243    pub auto_schema_change_latency: LabelGuardedHistogramVec,
244
245    pub time_travel_version_replay_latency: Histogram,
246
247    pub compaction_group_count: IntGauge,
248    pub compaction_group_size: IntGaugeVec,
249    pub compaction_group_file_count: IntGaugeVec,
250    pub compaction_group_throughput: IntGaugeVec,
251
252    // ********************************** Refresh Manager ************************************
253    pub refresh_job_duration: LabelGuardedUintGaugeVec,
254    pub refresh_job_finish_cnt: LabelGuardedIntCounterVec,
255    pub refresh_cron_job_trigger_cnt: LabelGuardedIntCounterVec,
256    pub refresh_cron_job_miss_cnt: LabelGuardedIntCounterVec,
257}
258
259pub static GLOBAL_META_METRICS: LazyLock<MetaMetrics> =
260    LazyLock::new(|| MetaMetrics::new(&GLOBAL_METRICS_REGISTRY));
261
262impl MetaMetrics {
263    fn new(registry: &Registry) -> Self {
264        let opts = histogram_opts!(
265            "meta_grpc_duration_seconds",
266            "gRPC latency of meta services",
267            exponential_buckets(0.0001, 2.0, 20).unwrap() // max 52s
268        );
269        let grpc_latency =
270            register_histogram_vec_with_registry!(opts, &["path"], registry).unwrap();
271
272        let opts = histogram_opts!(
273            "meta_barrier_duration_seconds",
274            "barrier latency",
275            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
276        );
277        let barrier_latency =
278            register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
279                .unwrap();
280
281        let opts = histogram_opts!(
282            "meta_barrier_wait_commit_duration_seconds",
283            "barrier_wait_commit_latency",
284            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
285        );
286        let barrier_wait_commit_latency =
287            register_histogram_with_registry!(opts, registry).unwrap();
288
289        let opts = histogram_opts!(
290            "meta_barrier_send_duration_seconds",
291            "barrier send latency",
292            exponential_buckets(0.1, 1.5, 19).unwrap() // max 148s
293        );
294        let barrier_send_latency =
295            register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
296                .unwrap();
297        let barrier_interval_by_database = register_gauge_vec_with_registry!(
298            "meta_barrier_interval_by_database",
299            "barrier interval of each database",
300            &["database_id"],
301            registry
302        )
303        .unwrap();
304
305        let all_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
306            "all_barrier_nums",
307            "num of of all_barrier",
308            &["database_id"],
309            registry
310        )
311        .unwrap();
312        let in_flight_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
313            "in_flight_barrier_nums",
314            "num of of in_flight_barrier",
315            &["database_id"],
316            registry
317        )
318        .unwrap();
319        let last_committed_barrier_time = register_int_gauge_vec_with_registry!(
320            "last_committed_barrier_time",
321            "The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.",
322            &["database_id"],
323            registry
324        )
325        .unwrap();
326
327        // snapshot backfill metrics
328        let opts = histogram_opts!(
329            "meta_snapshot_backfill_barrier_duration_seconds",
330            "snapshot backfill barrier latency",
331            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
332        );
333        let snapshot_backfill_barrier_latency = register_guarded_histogram_vec_with_registry!(
334            opts,
335            &["table_id", "barrier_type"],
336            registry
337        )
338        .unwrap();
339
340        let snapshot_backfill_lag = register_guarded_int_gauge_vec_with_registry!(
341            "meta_snapshot_backfill_upstream_lag",
342            "snapshot backfill upstream_lag",
343            &["table_id"],
344            registry
345        )
346        .unwrap();
347        let snapshot_backfill_inflight_barrier_num = register_guarded_int_gauge_vec_with_registry!(
348            "meta_snapshot_backfill_inflight_barrier_num",
349            "snapshot backfill inflight_barrier_num",
350            &["table_id"],
351            registry
352        )
353        .unwrap();
354
355        let max_committed_epoch = register_int_gauge_with_registry!(
356            "storage_max_committed_epoch",
357            "max committed epoch",
358            registry
359        )
360        .unwrap();
361
362        let min_committed_epoch = register_int_gauge_with_registry!(
363            "storage_min_committed_epoch",
364            "min committed epoch",
365            registry
366        )
367        .unwrap();
368
369        let level_sst_num = register_int_gauge_vec_with_registry!(
370            "storage_level_sst_num",
371            "num of SSTs in each level",
372            &["level_index"],
373            registry
374        )
375        .unwrap();
376
377        let level_compact_cnt = register_int_gauge_vec_with_registry!(
378            "storage_level_compact_cnt",
379            "num of SSTs to be merged to next level in each level",
380            &["level_index"],
381            registry
382        )
383        .unwrap();
384
385        let compact_frequency = register_int_counter_vec_with_registry!(
386            "storage_level_compact_frequency",
387            "The number of compactions from one level to another level that have completed or failed.",
388            &["compactor", "group", "task_type", "result"],
389            registry
390        )
391        .unwrap();
392        let compact_skip_frequency = register_int_counter_vec_with_registry!(
393            "storage_skip_compact_frequency",
394            "The number of compactions from one level to another level that have been skipped.",
395            &["level", "type"],
396            registry
397        )
398        .unwrap();
399
400        let version_size =
401            register_int_gauge_with_registry!("storage_version_size", "version size", registry)
402                .unwrap();
403
404        let current_version_id = register_int_gauge_with_registry!(
405            "storage_current_version_id",
406            "current version id",
407            registry
408        )
409        .unwrap();
410
411        let checkpoint_version_id = register_int_gauge_with_registry!(
412            "storage_checkpoint_version_id",
413            "checkpoint version id",
414            registry
415        )
416        .unwrap();
417
418        let min_pinned_version_id = register_int_gauge_with_registry!(
419            "storage_min_pinned_version_id",
420            "min pinned version id",
421            registry
422        )
423        .unwrap();
424
425        let write_stop_compaction_groups = register_int_gauge_vec_with_registry!(
426            "storage_write_stop_compaction_groups",
427            "compaction groups of write stop state",
428            &["compaction_group_id"],
429            registry
430        )
431        .unwrap();
432
433        let full_gc_trigger_count = register_int_gauge_with_registry!(
434            "storage_full_gc_trigger_count",
435            "the number of attempts to trigger full GC",
436            registry
437        )
438        .unwrap();
439
440        let opts = histogram_opts!(
441            "storage_full_gc_candidate_object_count",
442            "the number of candidate object to delete after scanning object store",
443            exponential_buckets(1.0, 10.0, 6).unwrap()
444        );
445        let full_gc_candidate_object_count =
446            register_histogram_with_registry!(opts, registry).unwrap();
447
448        let opts = histogram_opts!(
449            "storage_full_gc_selected_object_count",
450            "the number of object to delete after filtering by meta node",
451            exponential_buckets(1.0, 10.0, 6).unwrap()
452        );
453        let full_gc_selected_object_count =
454            register_histogram_with_registry!(opts, registry).unwrap();
455
456        let min_safepoint_version_id = register_int_gauge_with_registry!(
457            "storage_min_safepoint_version_id",
458            "min safepoint version id",
459            registry
460        )
461        .unwrap();
462
463        let level_file_size = register_int_gauge_vec_with_registry!(
464            "storage_level_total_file_size",
465            "KBs total file bytes in each level",
466            &["level_index"],
467            registry
468        )
469        .unwrap();
470
471        let version_stats = register_int_gauge_vec_with_registry!(
472            "storage_version_stats",
473            "per table stats in current hummock version",
474            &["table_id", "metric"],
475            registry
476        )
477        .unwrap();
478
479        let materialized_view_stats = register_int_gauge_vec_with_registry!(
480            "storage_materialized_view_stats",
481            "per materialized view stats in current hummock version",
482            &["table_id", "metric"],
483            registry
484        )
485        .unwrap();
486
487        let stale_object_count = register_int_gauge_with_registry!(
488            "storage_stale_object_count",
489            "total number of objects that is no longer referenced by versions.",
490            registry
491        )
492        .unwrap();
493
494        let stale_object_size = register_int_gauge_with_registry!(
495            "storage_stale_object_size",
496            "total size of objects that is no longer referenced by versions.",
497            registry
498        )
499        .unwrap();
500
501        let old_version_object_count = register_int_gauge_with_registry!(
502            "storage_old_version_object_count",
503            "total number of objects that is still referenced by non-current versions",
504            registry
505        )
506        .unwrap();
507
508        let old_version_object_size = register_int_gauge_with_registry!(
509            "storage_old_version_object_size",
510            "total size of objects that is still referenced by non-current versions",
511            registry
512        )
513        .unwrap();
514
515        let current_version_object_count = register_int_gauge_with_registry!(
516            "storage_current_version_object_count",
517            "total number of objects that is referenced by current version",
518            registry
519        )
520        .unwrap();
521
522        let current_version_object_size = register_int_gauge_with_registry!(
523            "storage_current_version_object_size",
524            "total size of objects that is referenced by current version",
525            registry
526        )
527        .unwrap();
528
529        let total_object_count = register_int_gauge_with_registry!(
530            "storage_total_object_count",
531            "Total number of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
532            registry
533        ).unwrap();
534
535        let total_object_size = register_int_gauge_with_registry!(
536            "storage_total_object_size",
537            "Total size of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
538            registry
539        ).unwrap();
540
541        let table_change_log_object_count = register_int_gauge_vec_with_registry!(
542            "storage_table_change_log_object_count",
543            "per table change log object count",
544            &["table_id"],
545            registry
546        )
547        .unwrap();
548
549        let table_change_log_object_size = register_int_gauge_vec_with_registry!(
550            "storage_table_change_log_object_size",
551            "per table change log object size",
552            &["table_id"],
553            registry
554        )
555        .unwrap();
556
557        let table_change_log_min_epoch = register_int_gauge_vec_with_registry!(
558            "storage_table_change_log_min_epoch",
559            "min epoch currently retained in table change log",
560            &["table_id"],
561            registry
562        )
563        .unwrap();
564
565        let time_travel_object_count = register_int_gauge_with_registry!(
566            "storage_time_travel_object_count",
567            "total number of objects that is referenced by time travel.",
568            registry
569        )
570        .unwrap();
571
572        let delta_log_count = register_int_gauge_with_registry!(
573            "storage_delta_log_count",
574            "total number of hummock version delta log",
575            registry
576        )
577        .unwrap();
578
579        let opts = histogram_opts!(
580            "storage_version_checkpoint_latency",
581            "hummock version checkpoint latency",
582            exponential_buckets(0.1, 1.5, 20).unwrap()
583        );
584        let version_checkpoint_latency = register_histogram_with_registry!(opts, registry).unwrap();
585
586        let hummock_manager_lock_time = register_histogram_vec_with_registry!(
587            "hummock_manager_lock_time",
588            "latency for hummock manager to acquire the rwlock",
589            &["lock_name", "lock_type"],
590            registry
591        )
592        .unwrap();
593
594        let hummock_manager_real_process_time = register_histogram_vec_with_registry!(
595            "meta_hummock_manager_real_process_time",
596            "latency for hummock manager to really process the request",
597            &["method"],
598            registry
599        )
600        .unwrap();
601
602        let worker_num = register_int_gauge_vec_with_registry!(
603            "worker_num",
604            "number of nodes in the cluster",
605            &["worker_type"],
606            registry,
607        )
608        .unwrap();
609
610        let meta_type = register_int_gauge_vec_with_registry!(
611            "meta_num",
612            "role of meta nodes in the cluster",
613            &["worker_addr", "role"],
614            registry,
615        )
616        .unwrap();
617
618        let compact_pending_bytes = register_int_gauge_vec_with_registry!(
619            "storage_compact_pending_bytes",
620            "bytes of lsm tree needed to reach balance",
621            &["group"],
622            registry
623        )
624        .unwrap();
625
626        let compact_level_compression_ratio = register_gauge_vec_with_registry!(
627            "storage_compact_level_compression_ratio",
628            "compression ratio of each level of the lsm tree",
629            &["group", "level", "algorithm"],
630            registry
631        )
632        .unwrap();
633
634        let level_compact_task_cnt = register_int_gauge_vec_with_registry!(
635            "storage_level_compact_task_cnt",
636            "num of compact_task organized by group and level",
637            &["task"],
638            registry
639        )
640        .unwrap();
641
642        let time_travel_vacuum_metadata_latency = register_histogram_with_registry!(
643            histogram_opts!(
644                "storage_time_travel_vacuum_metadata_latency",
645                "Latency of vacuuming metadata for time travel",
646                exponential_buckets(0.1, 1.5, 20).unwrap()
647            ),
648            registry
649        )
650        .unwrap();
651        let time_travel_write_metadata_latency = register_histogram_with_registry!(
652            histogram_opts!(
653                "storage_time_travel_write_metadata_latency",
654                "Latency of writing metadata for time travel",
655                exponential_buckets(0.1, 1.5, 20).unwrap()
656            ),
657            registry
658        )
659        .unwrap();
660
661        let object_store_metric = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
662
663        let recovery_failure_cnt = register_int_counter_vec_with_registry!(
664            "recovery_failure_cnt",
665            "Number of failed recovery attempts",
666            &["recovery_type"],
667            registry
668        )
669        .unwrap();
670        let opts = histogram_opts!(
671            "recovery_latency",
672            "Latency of the recovery process",
673            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
674        );
675        let recovery_latency =
676            register_histogram_vec_with_registry!(opts, &["recovery_type"], registry).unwrap();
677
678        let auto_schema_change_failure_cnt = register_guarded_int_counter_vec_with_registry!(
679            "auto_schema_change_failure_cnt",
680            "Number of failed auto schema change",
681            &["table_id", "table_name"],
682            registry
683        )
684        .unwrap();
685
686        let auto_schema_change_success_cnt = register_guarded_int_counter_vec_with_registry!(
687            "auto_schema_change_success_cnt",
688            "Number of success auto schema change",
689            &["table_id", "table_name"],
690            registry
691        )
692        .unwrap();
693
694        let opts = histogram_opts!(
695            "auto_schema_change_latency",
696            "Latency of the auto schema change process",
697            exponential_buckets(0.1, 1.5, 20).unwrap() // max 221s
698        );
699        let auto_schema_change_latency = register_guarded_histogram_vec_with_registry!(
700            opts,
701            &["table_id", "table_name"],
702            registry
703        )
704        .unwrap();
705
706        let source_is_up = register_guarded_int_gauge_vec_with_registry!(
707            "source_status_is_up",
708            "source is up or not",
709            &["source_id", "source_name"],
710            registry
711        )
712        .unwrap();
713        let source_enumerator_metrics = Arc::new(SourceEnumeratorMetrics::default());
714
715        let actor_info = register_int_gauge_vec_with_registry!(
716            "actor_info",
717            "Mapping from actor id to (fragment id, compute node)",
718            &["actor_id", "fragment_id", "compute_node"],
719            registry
720        )
721        .unwrap();
722
723        let table_info = register_int_gauge_vec_with_registry!(
724            "table_info",
725            "Mapping from table id to (actor id, table name)",
726            &[
727                "materialized_view_id",
728                "table_id",
729                "fragment_id",
730                "table_name",
731                "table_type",
732                "compaction_group_id"
733            ],
734            registry
735        )
736        .unwrap();
737
738        let sink_info = register_int_gauge_vec_with_registry!(
739            "sink_info",
740            "Mapping from actor id to (actor id, sink name)",
741            &["actor_id", "sink_id", "sink_name",],
742            registry
743        )
744        .unwrap();
745
746        let relation_info = register_int_gauge_vec_with_registry!(
747            "relation_info",
748            "Information of the database relation (table/source/sink/materialized view/index/internal)",
749            &["id", "database", "schema", "name", "resource_group", "type"],
750            registry
751        )
752        .unwrap();
753
754        let streaming_table_change_log_retention_seconds = register_int_gauge_vec_with_registry!(
755            "streaming_table_change_log_retention_seconds",
756            "Max subscription retention configured for the table change log in seconds",
757            &["table_id"],
758            registry
759        )
760        .unwrap();
761
762        let database_info = register_int_gauge_vec_with_registry!(
763            "database_info",
764            "Mapping from database id to database name",
765            &["database_id", "database_name"],
766            registry
767        )
768        .unwrap();
769
770        let backfill_fragment_progress = register_int_gauge_vec_with_registry!(
771            "backfill_fragment_progress",
772            "Backfill progress per fragment",
773            &[
774                "job_id",
775                "fragment_id",
776                "backfill_state_table_id",
777                "backfill_target_relation_id",
778                "backfill_target_relation_name",
779                "backfill_target_relation_type",
780                "backfill_type",
781                "backfill_epoch",
782                "upstream_type",
783                "backfill_progress",
784            ],
785            registry
786        )
787        .unwrap();
788
789        let l0_compact_level_count = register_histogram_vec_with_registry!(
790            "storage_l0_compact_level_count",
791            "level_count of l0 compact task",
792            &["group", "type"],
793            registry
794        )
795        .unwrap();
796
797        // System parameter info
798        let system_param_info = register_int_gauge_vec_with_registry!(
799            "system_param_info",
800            "Information of system parameters",
801            &["name", "value"],
802            registry
803        )
804        .unwrap();
805
806        let opts = histogram_opts!(
807            "storage_compact_task_size",
808            "Total size of compact that have been issued to state store",
809            exponential_buckets(1048576.0, 2.0, 16).unwrap()
810        );
811
812        let compact_task_size =
813            register_histogram_vec_with_registry!(opts, &["group", "type"], registry).unwrap();
814
815        let compact_task_file_count = register_histogram_vec_with_registry!(
816            "storage_compact_task_file_count",
817            "file count of compact task",
818            &["group", "type"],
819            registry
820        )
821        .unwrap();
822        let opts = histogram_opts!(
823            "storage_compact_task_batch_count",
824            "count of compact task batch",
825            exponential_buckets(1.0, 2.0, 8).unwrap()
826        );
827        let compact_task_batch_count =
828            register_histogram_vec_with_registry!(opts, &["type"], registry).unwrap();
829
830        let table_write_throughput = register_int_counter_vec_with_registry!(
831            "storage_commit_write_throughput",
832            "The number of compactions from one level to another level that have been skipped.",
833            &["table_id"],
834            registry
835        )
836        .unwrap();
837
838        let split_compaction_group_count = register_int_counter_vec_with_registry!(
839            "storage_split_compaction_group_count",
840            "Count of trigger split compaction group",
841            &["group"],
842            registry
843        )
844        .unwrap();
845
846        let state_table_count = register_int_gauge_vec_with_registry!(
847            "storage_state_table_count",
848            "Count of stable table per compaction group",
849            &["group"],
850            registry
851        )
852        .unwrap();
853
854        let branched_sst_count = register_int_gauge_vec_with_registry!(
855            "storage_branched_sst_count",
856            "Count of branched sst per compaction group",
857            &["group"],
858            registry
859        )
860        .unwrap();
861
862        let opts = histogram_opts!(
863            "storage_compaction_event_consumed_latency",
864            "The latency(ms) of each event being consumed",
865            exponential_buckets(1.0, 1.5, 30).unwrap() // max 191s
866        );
867        let compaction_event_consumed_latency =
868            register_histogram_with_registry!(opts, registry).unwrap();
869
870        let opts = histogram_opts!(
871            "storage_compaction_event_loop_iteration_latency",
872            "The latency(ms) of each iteration of the compaction event loop",
873            exponential_buckets(1.0, 1.5, 30).unwrap() // max 191s
874        );
875        let compaction_event_loop_iteration_latency =
876            register_histogram_with_registry!(opts, registry).unwrap();
877
878        let merge_compaction_group_count = register_int_counter_vec_with_registry!(
879            "storage_merge_compaction_group_count",
880            "Count of trigger merge compaction group",
881            &["group"],
882            registry
883        )
884        .unwrap();
885
886        let opts = histogram_opts!(
887            "storage_time_travel_version_replay_latency",
888            "The latency(ms) of replaying a hummock version for time travel",
889            exponential_buckets(0.01, 10.0, 6).unwrap()
890        );
891        let time_travel_version_replay_latency =
892            register_histogram_with_registry!(opts, registry).unwrap();
893
894        let compaction_group_count = register_int_gauge_with_registry!(
895            "storage_compaction_group_count",
896            "The number of compaction groups",
897            registry,
898        )
899        .unwrap();
900
901        let compaction_group_size = register_int_gauge_vec_with_registry!(
902            "storage_compaction_group_size",
903            "The size of compaction group",
904            &["group"],
905            registry
906        )
907        .unwrap();
908
909        let compaction_group_file_count = register_int_gauge_vec_with_registry!(
910            "storage_compaction_group_file_count",
911            "The file count of compaction group",
912            &["group"],
913            registry
914        )
915        .unwrap();
916
917        let compaction_group_throughput = register_int_gauge_vec_with_registry!(
918            "storage_compaction_group_throughput",
919            "The throughput of compaction group",
920            &["group"],
921            registry
922        )
923        .unwrap();
924
925        let opts = histogram_opts!(
926            "storage_compact_task_trivial_move_sst_count",
927            "sst count of compact trivial-move task",
928            exponential_buckets(1.0, 2.0, 8).unwrap()
929        );
930        let compact_task_trivial_move_sst_count =
931            register_histogram_vec_with_registry!(opts, &["group"], registry).unwrap();
932
933        let refresh_job_duration = register_guarded_uint_gauge_vec_with_registry!(
934            "meta_refresh_job_duration",
935            "The duration of refresh job",
936            &["table_id", "status"],
937            registry
938        )
939        .unwrap();
940        let refresh_job_finish_cnt = register_guarded_int_counter_vec_with_registry!(
941            "meta_refresh_job_finish_cnt",
942            "The number of finished refresh jobs",
943            &["table_id", "status"],
944            registry
945        )
946        .unwrap();
947        let refresh_cron_job_trigger_cnt = register_guarded_int_counter_vec_with_registry!(
948            "meta_refresh_cron_job_trigger_cnt",
949            "The number of cron refresh jobs triggered",
950            &["table_id"],
951            registry
952        )
953        .unwrap();
954        let refresh_cron_job_miss_cnt = register_guarded_int_counter_vec_with_registry!(
955            "meta_refresh_cron_job_miss_cnt",
956            "The number of cron refresh jobs missed",
957            &["table_id"],
958            registry
959        )
960        .unwrap();
961
962        Self {
963            grpc_latency,
964            barrier_latency,
965            barrier_wait_commit_latency,
966            barrier_send_latency,
967            all_barrier_nums,
968            in_flight_barrier_nums,
969            last_committed_barrier_time,
970            barrier_interval_by_database,
971            snapshot_backfill_barrier_latency,
972            snapshot_backfill_lag,
973            snapshot_backfill_inflight_barrier_num,
974            recovery_failure_cnt,
975            recovery_latency,
976
977            max_committed_epoch,
978            min_committed_epoch,
979            level_sst_num,
980            level_compact_cnt,
981            compact_frequency,
982            compact_skip_frequency,
983            level_file_size,
984            version_size,
985            version_stats,
986            materialized_view_stats,
987            stale_object_count,
988            stale_object_size,
989            old_version_object_count,
990            old_version_object_size,
991            time_travel_object_count,
992            current_version_object_count,
993            current_version_object_size,
994            total_object_count,
995            total_object_size,
996            table_change_log_object_count,
997            table_change_log_object_size,
998            table_change_log_min_epoch,
999            delta_log_count,
1000            version_checkpoint_latency,
1001            current_version_id,
1002            checkpoint_version_id,
1003            min_pinned_version_id,
1004            min_safepoint_version_id,
1005            write_stop_compaction_groups,
1006            full_gc_trigger_count,
1007            full_gc_candidate_object_count,
1008            full_gc_selected_object_count,
1009            hummock_manager_lock_time,
1010            hummock_manager_real_process_time,
1011            time_after_last_observation: Arc::new(AtomicU64::new(0)),
1012            worker_num,
1013            meta_type,
1014            compact_pending_bytes,
1015            compact_level_compression_ratio,
1016            level_compact_task_cnt,
1017            object_store_metric,
1018            source_is_up,
1019            source_enumerator_metrics,
1020            actor_info,
1021            table_info,
1022            sink_info,
1023            relation_info,
1024            database_info,
1025            backfill_fragment_progress,
1026            streaming_table_change_log_retention_seconds,
1027            system_param_info,
1028            l0_compact_level_count,
1029            compact_task_size,
1030            compact_task_file_count,
1031            compact_task_batch_count,
1032            compact_task_trivial_move_sst_count,
1033            table_write_throughput,
1034            split_compaction_group_count,
1035            state_table_count,
1036            branched_sst_count,
1037            compaction_event_consumed_latency,
1038            compaction_event_loop_iteration_latency,
1039            auto_schema_change_failure_cnt,
1040            auto_schema_change_success_cnt,
1041            auto_schema_change_latency,
1042            merge_compaction_group_count,
1043            time_travel_version_replay_latency,
1044            compaction_group_count,
1045            compaction_group_size,
1046            compaction_group_file_count,
1047            compaction_group_throughput,
1048            refresh_job_duration,
1049            refresh_job_finish_cnt,
1050            refresh_cron_job_trigger_cnt,
1051            refresh_cron_job_miss_cnt,
1052            time_travel_vacuum_metadata_latency,
1053            time_travel_write_metadata_latency,
1054        }
1055    }
1056
1057    #[cfg(test)]
1058    pub fn for_test(registry: &Registry) -> Self {
1059        Self::new(registry)
1060    }
1061}
1062impl Default for MetaMetrics {
1063    fn default() -> Self {
1064        GLOBAL_META_METRICS.clone()
1065    }
1066}
1067
1068/// Refresh `system_param_info` metrics by reading current system parameters.
1069pub async fn refresh_system_param_info_metrics(
1070    system_params_controller: &SystemParamsControllerRef,
1071    meta_metrics: Arc<MetaMetrics>,
1072) {
1073    let params_info = system_params_controller.get_params().await.get_all();
1074
1075    meta_metrics.system_param_info.reset();
1076    for info in params_info {
1077        meta_metrics
1078            .system_param_info
1079            .with_label_values(&[info.name, &info.value])
1080            .set(1);
1081    }
1082}
1083
1084pub fn start_worker_info_monitor(
1085    metadata_manager: MetadataManager,
1086    election_client: ElectionClientRef,
1087    interval: Duration,
1088    meta_metrics: Arc<MetaMetrics>,
1089) -> (JoinHandle<()>, Sender<()>) {
1090    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1091    let join_handle = tokio::spawn(async move {
1092        let mut monitor_interval = tokio::time::interval(interval);
1093        monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1094        loop {
1095            tokio::select! {
1096                // Wait for interval
1097                _ = monitor_interval.tick() => {},
1098                // Shutdown monitor
1099                _ = &mut shutdown_rx => {
1100                    tracing::info!("Worker number monitor is stopped");
1101                    return;
1102                }
1103            }
1104
1105            let node_map = match metadata_manager.count_worker_node().await {
1106                Ok(node_map) => node_map,
1107                Err(err) => {
1108                    tracing::warn!(error = %err.as_report(), "fail to count worker node");
1109                    continue;
1110                }
1111            };
1112
1113            // Reset metrics to clean the stale labels e.g. invalid lease ids
1114            meta_metrics.worker_num.reset();
1115            meta_metrics.meta_type.reset();
1116
1117            for (worker_type, worker_num) in node_map {
1118                meta_metrics
1119                    .worker_num
1120                    .with_label_values(&[(worker_type.as_str_name())])
1121                    .set(worker_num as i64);
1122            }
1123            if let Ok(meta_members) = election_client.get_members().await {
1124                meta_metrics
1125                    .worker_num
1126                    .with_label_values(&[WorkerType::Meta.as_str_name()])
1127                    .set(meta_members.len() as i64);
1128                meta_members.into_iter().for_each(|m| {
1129                    let role = if m.is_leader { "leader" } else { "follower" };
1130                    meta_metrics
1131                        .meta_type
1132                        .with_label_values(&[m.id.as_str(), role])
1133                        .set(1);
1134                });
1135            }
1136        }
1137    });
1138
1139    (join_handle, shutdown_tx)
1140}
1141
1142pub async fn refresh_fragment_info_metrics(
1143    catalog_controller: &CatalogControllerRef,
1144    cluster_controller: &ClusterControllerRef,
1145    hummock_manager: &HummockManagerRef,
1146    meta_metrics: Arc<MetaMetrics>,
1147) {
1148    let worker_nodes = match cluster_controller
1149        .list_workers(Some(WorkerType::ComputeNode.into()), None)
1150        .await
1151    {
1152        Ok(worker_nodes) => worker_nodes,
1153        Err(err) => {
1154            tracing::warn!(error=%err.as_report(), "fail to list worker node");
1155            return;
1156        }
1157    };
1158    let actor_locations = match catalog_controller.list_actor_locations() {
1159        Ok(actor_locations) => actor_locations,
1160        Err(err) => {
1161            tracing::warn!(error=%err.as_report(), "fail to get actor locations");
1162            return;
1163        }
1164    };
1165    let sink_actor_mapping = match catalog_controller.list_sink_actor_mapping().await {
1166        Ok(sink_actor_mapping) => sink_actor_mapping,
1167        Err(err) => {
1168            tracing::warn!(error=%err.as_report(), "fail to get sink actor mapping");
1169            return;
1170        }
1171    };
1172    let fragment_state_tables = match catalog_controller.list_fragment_state_tables().await {
1173        Ok(fragment_state_tables) => fragment_state_tables,
1174        Err(err) => {
1175            tracing::warn!(error=%err.as_report(), "fail to get fragment state tables");
1176            return;
1177        }
1178    };
1179    let table_name_and_type_mapping = match catalog_controller.get_table_name_type_mapping().await {
1180        Ok(mapping) => mapping,
1181        Err(err) => {
1182            tracing::warn!(error=%err.as_report(), "fail to get table name mapping");
1183            return;
1184        }
1185    };
1186
1187    let worker_addr_mapping: HashMap<WorkerId, String> = worker_nodes
1188        .into_iter()
1189        .map(|worker_node| {
1190            let addr = match worker_node.host {
1191                Some(host) => format!("{}:{}", host.host, host.port),
1192                None => "".to_owned(),
1193            };
1194            (worker_node.id, addr)
1195        })
1196        .collect();
1197    let table_compaction_group_id_mapping = hummock_manager
1198        .get_table_compaction_group_id_mapping()
1199        .await;
1200
1201    // Start fresh with a reset to clear all outdated labels. This is safe since we always
1202    // report full info on each interval.
1203    meta_metrics.actor_info.reset();
1204    meta_metrics.table_info.reset();
1205    meta_metrics.sink_info.reset();
1206    for actor_location in actor_locations {
1207        let actor_id_str = actor_location.actor_id.to_string();
1208        let fragment_id_str = actor_location.fragment_id.to_string();
1209        // Report a dummy gauge metrics with (fragment id, actor id, node
1210        // address) as its label
1211        if let Some(address) = worker_addr_mapping.get(&actor_location.worker_id) {
1212            meta_metrics
1213                .actor_info
1214                .with_label_values(&[&actor_id_str, &fragment_id_str, address])
1215                .set(1);
1216        }
1217    }
1218    for (sink_id, (sink_name, actor_ids)) in sink_actor_mapping {
1219        let sink_id_str = sink_id.to_string();
1220        for actor_id in actor_ids {
1221            let actor_id_str = actor_id.to_string();
1222            meta_metrics
1223                .sink_info
1224                .with_label_values(&[&actor_id_str, &sink_id_str, &sink_name])
1225                .set(1);
1226        }
1227    }
1228    for PartialFragmentStateTables {
1229        fragment_id,
1230        job_id,
1231        state_table_ids,
1232    } in fragment_state_tables
1233    {
1234        let fragment_id_str = fragment_id.to_string();
1235        let job_id_str = job_id.to_string();
1236        for table_id in state_table_ids.into_inner() {
1237            let table_id_str = table_id.to_string();
1238            let (table_name, table_type) = table_name_and_type_mapping
1239                .get(&table_id)
1240                .cloned()
1241                .unwrap_or_else(|| ("unknown".to_owned(), "unknown".to_owned()));
1242            let compaction_group_id = table_compaction_group_id_mapping
1243                .get(&table_id)
1244                .map(|cg_id| cg_id.to_string())
1245                .unwrap_or_else(|| "unknown".to_owned());
1246            meta_metrics
1247                .table_info
1248                .with_label_values(&[
1249                    &job_id_str,
1250                    &table_id_str,
1251                    &fragment_id_str,
1252                    &table_name,
1253                    &table_type,
1254                    &compaction_group_id,
1255                ])
1256                .set(1);
1257        }
1258    }
1259}
1260
1261pub async fn refresh_relation_info_metrics(
1262    catalog_controller: &CatalogControllerRef,
1263    meta_metrics: Arc<MetaMetrics>,
1264) {
1265    let table_objects = match catalog_controller.list_table_objects().await {
1266        Ok(table_objects) => table_objects,
1267        Err(err) => {
1268            tracing::warn!(error=%err.as_report(), "fail to get table objects");
1269            return;
1270        }
1271    };
1272
1273    let source_objects = match catalog_controller.list_source_objects().await {
1274        Ok(source_objects) => source_objects,
1275        Err(err) => {
1276            tracing::warn!(error=%err.as_report(), "fail to get source objects");
1277            return;
1278        }
1279    };
1280
1281    let sink_objects = match catalog_controller.list_sink_objects().await {
1282        Ok(sink_objects) => sink_objects,
1283        Err(err) => {
1284            tracing::warn!(error=%err.as_report(), "fail to get sink objects");
1285            return;
1286        }
1287    };
1288    let subscriptions = match catalog_controller.list_subscriptions().await {
1289        Ok(subscriptions) => subscriptions,
1290        Err(err) => {
1291            tracing::warn!(error=%err.as_report(), "fail to get subscription objects");
1292            return;
1293        }
1294    };
1295
1296    meta_metrics.relation_info.reset();
1297    meta_metrics
1298        .streaming_table_change_log_retention_seconds
1299        .reset();
1300
1301    for (id, db, schema, name, resource_group, table_type) in table_objects {
1302        let relation_type = match table_type {
1303            TableType::Table => "table",
1304            TableType::MaterializedView => "materialized_view",
1305            TableType::Index | TableType::VectorIndex => "index",
1306            TableType::Internal => "internal",
1307        };
1308        meta_metrics
1309            .relation_info
1310            .with_label_values(&[
1311                &id.to_string(),
1312                &db,
1313                &schema,
1314                &name,
1315                &resource_group,
1316                &relation_type.to_owned(),
1317            ])
1318            .set(1);
1319    }
1320
1321    for (id, db, schema, name, resource_group) in source_objects {
1322        meta_metrics
1323            .relation_info
1324            .with_label_values(&[
1325                &id.to_string(),
1326                &db,
1327                &schema,
1328                &name,
1329                &resource_group,
1330                &"source".to_owned(),
1331            ])
1332            .set(1);
1333    }
1334
1335    for (id, db, schema, name, resource_group) in sink_objects {
1336        meta_metrics
1337            .relation_info
1338            .with_label_values(&[
1339                &id.to_string(),
1340                &db,
1341                &schema,
1342                &name,
1343                &resource_group,
1344                &"sink".to_owned(),
1345            ])
1346            .set(1);
1347    }
1348
1349    let mut max_retention_by_table = HashMap::new();
1350    for subscription in subscriptions {
1351        max_retention_by_table
1352            .entry(subscription.dependent_table_id)
1353            .and_modify(|retention: &mut u64| {
1354                *retention = (*retention).max(subscription.retention_seconds);
1355            })
1356            .or_insert(subscription.retention_seconds);
1357    }
1358    for (table_id, retention_seconds) in max_retention_by_table {
1359        meta_metrics
1360            .streaming_table_change_log_retention_seconds
1361            .with_label_values(&[&table_id.to_string()])
1362            .set(retention_seconds as _);
1363    }
1364}
1365
1366pub async fn refresh_database_info_metrics(
1367    catalog_controller: &CatalogControllerRef,
1368    meta_metrics: Arc<MetaMetrics>,
1369) {
1370    let databases = match catalog_controller.list_databases().await {
1371        Ok(databases) => databases,
1372        Err(err) => {
1373            tracing::warn!(error=%err.as_report(), "fail to get databases");
1374            return;
1375        }
1376    };
1377
1378    meta_metrics.database_info.reset();
1379
1380    for db in databases {
1381        meta_metrics
1382            .database_info
1383            .with_label_values(&[&db.id.to_string(), &db.name])
1384            .set(1);
1385    }
1386}
1387
1388fn extract_backfill_fragment_info(
1389    distribution: &FragmentDistribution,
1390) -> Option<BackfillFragmentInfo> {
1391    let backfill_type =
1392        if distribution.fragment_type_mask & FragmentTypeFlag::SourceScan as u32 != 0 {
1393            "SOURCE"
1394        } else if distribution.fragment_type_mask
1395            & (FragmentTypeFlag::SnapshotBackfillStreamScan as u32
1396                | FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32)
1397            != 0
1398        {
1399            "SNAPSHOT_BACKFILL"
1400        } else if distribution.fragment_type_mask & FragmentTypeFlag::StreamScan as u32 != 0 {
1401            "ARRANGEMENT_OR_NO_SHUFFLE"
1402        } else {
1403            return None;
1404        };
1405
1406    let stream_node = distribution.node.as_ref()?;
1407    let mut info = None;
1408    match backfill_type {
1409        "SOURCE" => {
1410            visit_stream_node_source_backfill(stream_node, |node| {
1411                info = Some(BackfillFragmentInfo {
1412                    job_id: distribution.table_id.as_raw_id(),
1413                    fragment_id: distribution.fragment_id.as_raw_id(),
1414                    backfill_state_table_id: node
1415                        .state_table
1416                        .as_ref()
1417                        .map(|table| table.id.as_raw_id())
1418                        .unwrap_or_default(),
1419                    backfill_target_relation_id: node.upstream_source_id.as_raw_id(),
1420                    backfill_type,
1421                    backfill_epoch: 0,
1422                });
1423            });
1424        }
1425        "SNAPSHOT_BACKFILL" | "ARRANGEMENT_OR_NO_SHUFFLE" => {
1426            visit_stream_node_stream_scan(stream_node, |node| {
1427                info = Some(BackfillFragmentInfo {
1428                    job_id: distribution.table_id.as_raw_id(),
1429                    fragment_id: distribution.fragment_id.as_raw_id(),
1430                    backfill_state_table_id: node
1431                        .state_table
1432                        .as_ref()
1433                        .map(|table| table.id.as_raw_id())
1434                        .unwrap_or_default(),
1435                    backfill_target_relation_id: node.table_id.as_raw_id(),
1436                    backfill_type,
1437                    backfill_epoch: node.snapshot_backfill_epoch.unwrap_or_default(),
1438                });
1439            });
1440        }
1441        _ => {}
1442    }
1443
1444    info
1445}
1446
1447pub async fn refresh_backfill_progress_metrics(
1448    catalog_controller: &CatalogControllerRef,
1449    hummock_manager: &HummockManagerRef,
1450    barrier_manager: &BarrierManagerRef,
1451    meta_metrics: Arc<MetaMetrics>,
1452) {
1453    let fragment_descs = match catalog_controller.list_fragment_descs_with_node(true).await {
1454        Ok(fragment_descs) => fragment_descs,
1455        Err(err) => {
1456            tracing::warn!(error=%err.as_report(), "fail to list creating fragment descs");
1457            return;
1458        }
1459    };
1460
1461    let backfill_infos: HashMap<(u32, u32), BackfillFragmentInfo> = fragment_descs
1462        .iter()
1463        .filter_map(|(distribution, _)| extract_backfill_fragment_info(distribution))
1464        .map(|info| ((info.job_id, info.fragment_id), info))
1465        .collect();
1466
1467    let fragment_progresses = match barrier_manager.get_fragment_backfill_progress().await {
1468        Ok(progress) => progress,
1469        Err(err) => {
1470            tracing::warn!(error=%err.as_report(), "fail to get fragment backfill progress");
1471            return;
1472        }
1473    };
1474
1475    let progress_by_fragment: HashMap<(u32, u32), _> = fragment_progresses
1476        .into_iter()
1477        .map(|progress| {
1478            (
1479                (
1480                    progress.job_id.as_raw_id(),
1481                    progress.fragment_id.as_raw_id(),
1482                ),
1483                progress,
1484            )
1485        })
1486        .collect();
1487
1488    let version_stats = hummock_manager.get_version_stats().await;
1489
1490    let relation_ids: HashSet<_> = backfill_infos
1491        .values()
1492        .map(|info| ObjectId::new(info.backfill_target_relation_id))
1493        .collect();
1494    let relation_objects = match catalog_controller
1495        .list_relation_objects_by_ids(&relation_ids)
1496        .await
1497    {
1498        Ok(relation_objects) => relation_objects,
1499        Err(err) => {
1500            tracing::warn!(error=%err.as_report(), "fail to get relation objects");
1501            return;
1502        }
1503    };
1504
1505    let mut relation_info = HashMap::new();
1506    for (id, db, schema, name, rel_type) in relation_objects {
1507        relation_info.insert(id.as_raw_id(), (db, schema, name, rel_type));
1508    }
1509
1510    meta_metrics.backfill_fragment_progress.reset();
1511
1512    for info in backfill_infos.values() {
1513        let progress = progress_by_fragment.get(&(info.job_id, info.fragment_id));
1514        let (db, schema, name, rel_type) = relation_info
1515            .get(&info.backfill_target_relation_id)
1516            .cloned()
1517            .unwrap_or_else(|| {
1518                (
1519                    "unknown".to_owned(),
1520                    "unknown".to_owned(),
1521                    "unknown".to_owned(),
1522                    "unknown".to_owned(),
1523                )
1524            });
1525
1526        let job_id_str = info.job_id.to_string();
1527        let fragment_id_str = info.fragment_id.to_string();
1528        let backfill_state_table_id_str = info.backfill_state_table_id.to_string();
1529        let backfill_target_relation_id_str = info.backfill_target_relation_id.to_string();
1530        let backfill_target_relation_name_str = format!("{db}.{schema}.{name}");
1531        let backfill_target_relation_type_str = rel_type;
1532        let backfill_type_str = info.backfill_type.to_owned();
1533        let backfill_epoch_str = info.backfill_epoch.to_string();
1534        let total_key_count = version_stats
1535            .table_stats
1536            .get(&TableId::new(info.backfill_target_relation_id))
1537            .map(|stats| stats.total_key_count as u64);
1538
1539        let progress_label = match (info.backfill_type, progress) {
1540            ("SOURCE", Some(progress)) => format!("{} consumed rows", progress.consumed_rows),
1541            ("SOURCE", None) => "0 consumed rows".to_owned(),
1542            (_, Some(progress)) if progress.done => {
1543                let total = total_key_count.unwrap_or(0);
1544                format!("100.0000% ({}/{})", total, total)
1545            }
1546            (_, Some(progress)) => {
1547                let total = total_key_count.unwrap_or(0);
1548                if total == 0 {
1549                    "100.0000% (0/0)".to_owned()
1550                } else {
1551                    let raw = (progress.consumed_rows as f64) / (total as f64) * 100.0;
1552                    format!(
1553                        "{:.4}% ({}/{})",
1554                        raw.min(100.0),
1555                        progress.consumed_rows,
1556                        total
1557                    )
1558                }
1559            }
1560            (_, None) => "0.0000% (0/0)".to_owned(),
1561        };
1562        let upstream_type_str = progress
1563            .map(|progress| progress.upstream_type.to_string())
1564            .unwrap_or_else(|| "Unknown".to_owned());
1565
1566        meta_metrics
1567            .backfill_fragment_progress
1568            .with_label_values(&[
1569                &job_id_str,
1570                &fragment_id_str,
1571                &backfill_state_table_id_str,
1572                &backfill_target_relation_id_str,
1573                &backfill_target_relation_name_str,
1574                &backfill_target_relation_type_str,
1575                &backfill_type_str,
1576                &backfill_epoch_str,
1577                &upstream_type_str,
1578                &progress_label,
1579            ])
1580            .set(1);
1581    }
1582}
1583
1584pub fn start_info_monitor(
1585    metadata_manager: MetadataManager,
1586    hummock_manager: HummockManagerRef,
1587    barrier_manager: BarrierManagerRef,
1588    system_params_controller: SystemParamsControllerRef,
1589    meta_metrics: Arc<MetaMetrics>,
1590) -> (JoinHandle<()>, Sender<()>) {
1591    const COLLECT_INTERVAL_SECONDS: u64 = 60;
1592
1593    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1594    let join_handle = tokio::spawn(async move {
1595        let mut monitor_interval =
1596            tokio::time::interval(Duration::from_secs(COLLECT_INTERVAL_SECONDS));
1597        monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1598        loop {
1599            tokio::select! {
1600                // Wait for interval
1601                _ = monitor_interval.tick() => {},
1602                // Shutdown monitor
1603                _ = &mut shutdown_rx => {
1604                    tracing::info!("Meta info monitor is stopped");
1605                    return;
1606                }
1607            }
1608
1609            // Fragment and relation info
1610            refresh_fragment_info_metrics(
1611                &metadata_manager.catalog_controller,
1612                &metadata_manager.cluster_controller,
1613                &hummock_manager,
1614                meta_metrics.clone(),
1615            )
1616            .await;
1617
1618            refresh_relation_info_metrics(
1619                &metadata_manager.catalog_controller,
1620                meta_metrics.clone(),
1621            )
1622            .await;
1623
1624            refresh_database_info_metrics(
1625                &metadata_manager.catalog_controller,
1626                meta_metrics.clone(),
1627            )
1628            .await;
1629
1630            refresh_backfill_progress_metrics(
1631                &metadata_manager.catalog_controller,
1632                &hummock_manager,
1633                &barrier_manager,
1634                meta_metrics.clone(),
1635            )
1636            .await;
1637
1638            // System parameter info
1639            refresh_system_param_info_metrics(&system_params_controller, meta_metrics.clone())
1640                .await;
1641        }
1642    });
1643
1644    (join_handle, shutdown_tx)
1645}