1use std::collections::{HashMap, HashSet};
16use std::sync::atomic::AtomicU64;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use prometheus::core::{AtomicF64, GenericGaugeVec};
21use prometheus::{
22 GaugeVec, Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
23 exponential_buckets, histogram_opts, register_gauge_vec_with_registry,
24 register_histogram_vec_with_registry, register_histogram_with_registry,
25 register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
26 register_int_gauge_with_registry,
27};
28use risingwave_common::catalog::{FragmentTypeFlag, TableId};
29use risingwave_common::metrics::{
30 LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
31 LabelGuardedUintGaugeVec,
32};
33use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
34use risingwave_common::system_param::reader::SystemParamsRead;
35use risingwave_common::util::stream_graph_visitor::{
36 visit_stream_node_source_backfill, visit_stream_node_stream_scan,
37};
38use risingwave_common::{
39 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
40 register_guarded_int_gauge_vec_with_registry, register_guarded_uint_gauge_vec_with_registry,
41};
42use risingwave_connector::source::monitor::EnumeratorMetrics as SourceEnumeratorMetrics;
43use risingwave_meta_model::table::TableType;
44use risingwave_meta_model::{ObjectId, WorkerId};
45use risingwave_object_store::object::object_metrics::{
46 GLOBAL_OBJECT_STORE_METRICS, ObjectStoreMetrics,
47};
48use risingwave_pb::common::WorkerType;
49use risingwave_pb::meta::FragmentDistribution;
50use thiserror_ext::AsReport;
51use tokio::sync::oneshot::Sender;
52use tokio::task::JoinHandle;
53
54use crate::barrier::BarrierManagerRef;
55use crate::controller::catalog::CatalogControllerRef;
56use crate::controller::cluster::ClusterControllerRef;
57use crate::controller::system_param::SystemParamsControllerRef;
58use crate::controller::utils::PartialFragmentStateTables;
59use crate::hummock::HummockManagerRef;
60use crate::manager::MetadataManager;
61use crate::rpc::ElectionClientRef;
62
63struct BackfillFragmentInfo {
64 job_id: u32,
65 fragment_id: u32,
66 backfill_state_table_id: u32,
67 backfill_target_relation_id: u32,
68 backfill_type: &'static str,
69 backfill_epoch: u64,
70}
71
72#[derive(Clone)]
73pub struct MetaMetrics {
74 pub worker_num: IntGaugeVec,
77 pub meta_type: IntGaugeVec,
79
80 pub grpc_latency: HistogramVec,
83
84 pub barrier_latency: LabelGuardedHistogramVec,
88 pub barrier_wait_commit_latency: Histogram,
90 pub barrier_send_latency: LabelGuardedHistogramVec,
92 pub all_barrier_nums: LabelGuardedIntGaugeVec,
95 pub in_flight_barrier_nums: LabelGuardedIntGaugeVec,
97 pub last_committed_barrier_time: IntGaugeVec,
99 pub barrier_interval_by_database: GaugeVec,
101
102 pub snapshot_backfill_barrier_latency: LabelGuardedHistogramVec, pub snapshot_backfill_lag: LabelGuardedIntGaugeVec, pub snapshot_backfill_inflight_barrier_num: LabelGuardedIntGaugeVec, pub recovery_failure_cnt: IntCounterVec,
112 pub recovery_latency: HistogramVec,
113
114 pub max_committed_epoch: IntGauge,
117 pub min_committed_epoch: IntGauge,
119 pub level_sst_num: IntGaugeVec,
121 pub level_compact_cnt: IntGaugeVec,
123 pub compact_frequency: IntCounterVec,
125 pub level_file_size: IntGaugeVec,
127 pub version_size: IntGauge,
129 pub current_version_id: IntGauge,
131 pub checkpoint_version_id: IntGauge,
133 pub min_pinned_version_id: IntGauge,
135 pub min_safepoint_version_id: IntGauge,
137 pub write_stop_compaction_groups: IntGaugeVec,
139 pub full_gc_trigger_count: IntGauge,
141 pub full_gc_candidate_object_count: Histogram,
143 pub full_gc_selected_object_count: Histogram,
145 pub version_stats: IntGaugeVec,
147 pub materialized_view_stats: IntGaugeVec,
149 pub stale_object_count: IntGauge,
151 pub stale_object_size: IntGauge,
153 pub old_version_object_count: IntGauge,
155 pub old_version_object_size: IntGauge,
157 pub time_travel_object_count: IntGauge,
159 pub current_version_object_count: IntGauge,
161 pub current_version_object_size: IntGauge,
163 pub total_object_count: IntGauge,
165 pub total_object_size: IntGauge,
167 pub table_change_log_object_count: IntGaugeVec,
169 pub table_change_log_object_size: IntGaugeVec,
171 pub table_change_log_min_epoch: IntGaugeVec,
173 pub delta_log_count: IntGauge,
175 pub version_checkpoint_latency: Histogram,
177 pub hummock_manager_lock_time: HistogramVec,
179 pub hummock_manager_real_process_time: HistogramVec,
181 pub compact_skip_frequency: IntCounterVec,
183 pub compact_pending_bytes: IntGaugeVec,
185 pub compact_level_compression_ratio: GenericGaugeVec<AtomicF64>,
187 pub level_compact_task_cnt: IntGaugeVec,
189 pub time_after_last_observation: Arc<AtomicU64>,
190 pub l0_compact_level_count: HistogramVec,
191 pub compact_task_size: HistogramVec,
192 pub compact_task_file_count: HistogramVec,
193 pub compact_task_batch_count: HistogramVec,
194 pub split_compaction_group_count: IntCounterVec,
195 pub state_table_count: IntGaugeVec,
196 pub branched_sst_count: IntGaugeVec,
197 pub compact_task_trivial_move_sst_count: HistogramVec,
198
199 pub compaction_event_consumed_latency: Histogram,
200 pub compaction_event_loop_iteration_latency: Histogram,
201 pub time_travel_vacuum_metadata_latency: Histogram,
202 pub time_travel_write_metadata_latency: Histogram,
203
204 pub object_store_metric: Arc<ObjectStoreMetrics>,
207
208 pub source_is_up: LabelGuardedIntGaugeVec,
211 pub source_enumerator_metrics: Arc<SourceEnumeratorMetrics>,
212
213 pub actor_info: IntGaugeVec,
216 pub table_info: IntGaugeVec,
218 pub sink_info: IntGaugeVec,
220 pub relation_info: IntGaugeVec,
222 pub database_info: IntGaugeVec,
224 pub backfill_fragment_progress: IntGaugeVec,
226
227 pub system_param_info: IntGaugeVec,
231
232 pub table_write_throughput: IntCounterVec,
234
235 pub merge_compaction_group_count: IntCounterVec,
237
238 pub auto_schema_change_failure_cnt: LabelGuardedIntCounterVec,
240 pub auto_schema_change_success_cnt: LabelGuardedIntCounterVec,
241 pub auto_schema_change_latency: LabelGuardedHistogramVec,
242
243 pub time_travel_version_replay_latency: Histogram,
244
245 pub compaction_group_count: IntGauge,
246 pub compaction_group_size: IntGaugeVec,
247 pub compaction_group_file_count: IntGaugeVec,
248 pub compaction_group_throughput: IntGaugeVec,
249
250 pub refresh_job_duration: LabelGuardedUintGaugeVec,
252 pub refresh_job_finish_cnt: LabelGuardedIntCounterVec,
253 pub refresh_cron_job_trigger_cnt: LabelGuardedIntCounterVec,
254 pub refresh_cron_job_miss_cnt: LabelGuardedIntCounterVec,
255}
256
257pub static GLOBAL_META_METRICS: LazyLock<MetaMetrics> =
258 LazyLock::new(|| MetaMetrics::new(&GLOBAL_METRICS_REGISTRY));
259
260impl MetaMetrics {
261 fn new(registry: &Registry) -> Self {
262 let opts = histogram_opts!(
263 "meta_grpc_duration_seconds",
264 "gRPC latency of meta services",
265 exponential_buckets(0.0001, 2.0, 20).unwrap() );
267 let grpc_latency =
268 register_histogram_vec_with_registry!(opts, &["path"], registry).unwrap();
269
270 let opts = histogram_opts!(
271 "meta_barrier_duration_seconds",
272 "barrier latency",
273 exponential_buckets(0.1, 1.5, 20).unwrap() );
275 let barrier_latency =
276 register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
277 .unwrap();
278
279 let opts = histogram_opts!(
280 "meta_barrier_wait_commit_duration_seconds",
281 "barrier_wait_commit_latency",
282 exponential_buckets(0.1, 1.5, 20).unwrap() );
284 let barrier_wait_commit_latency =
285 register_histogram_with_registry!(opts, registry).unwrap();
286
287 let opts = histogram_opts!(
288 "meta_barrier_send_duration_seconds",
289 "barrier send latency",
290 exponential_buckets(0.1, 1.5, 19).unwrap() );
292 let barrier_send_latency =
293 register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
294 .unwrap();
295 let barrier_interval_by_database = register_gauge_vec_with_registry!(
296 "meta_barrier_interval_by_database",
297 "barrier interval of each database",
298 &["database_id"],
299 registry
300 )
301 .unwrap();
302
303 let all_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
304 "all_barrier_nums",
305 "num of of all_barrier",
306 &["database_id"],
307 registry
308 )
309 .unwrap();
310 let in_flight_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
311 "in_flight_barrier_nums",
312 "num of of in_flight_barrier",
313 &["database_id"],
314 registry
315 )
316 .unwrap();
317 let last_committed_barrier_time = register_int_gauge_vec_with_registry!(
318 "last_committed_barrier_time",
319 "The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.",
320 &["database_id"],
321 registry
322 )
323 .unwrap();
324
325 let opts = histogram_opts!(
327 "meta_snapshot_backfill_barrier_duration_seconds",
328 "snapshot backfill barrier latency",
329 exponential_buckets(0.1, 1.5, 20).unwrap() );
331 let snapshot_backfill_barrier_latency = register_guarded_histogram_vec_with_registry!(
332 opts,
333 &["table_id", "barrier_type"],
334 registry
335 )
336 .unwrap();
337
338 let snapshot_backfill_lag = register_guarded_int_gauge_vec_with_registry!(
339 "meta_snapshot_backfill_upstream_lag",
340 "snapshot backfill upstream_lag",
341 &["table_id"],
342 registry
343 )
344 .unwrap();
345 let snapshot_backfill_inflight_barrier_num = register_guarded_int_gauge_vec_with_registry!(
346 "meta_snapshot_backfill_inflight_barrier_num",
347 "snapshot backfill inflight_barrier_num",
348 &["table_id"],
349 registry
350 )
351 .unwrap();
352
353 let max_committed_epoch = register_int_gauge_with_registry!(
354 "storage_max_committed_epoch",
355 "max committed epoch",
356 registry
357 )
358 .unwrap();
359
360 let min_committed_epoch = register_int_gauge_with_registry!(
361 "storage_min_committed_epoch",
362 "min committed epoch",
363 registry
364 )
365 .unwrap();
366
367 let level_sst_num = register_int_gauge_vec_with_registry!(
368 "storage_level_sst_num",
369 "num of SSTs in each level",
370 &["level_index"],
371 registry
372 )
373 .unwrap();
374
375 let level_compact_cnt = register_int_gauge_vec_with_registry!(
376 "storage_level_compact_cnt",
377 "num of SSTs to be merged to next level in each level",
378 &["level_index"],
379 registry
380 )
381 .unwrap();
382
383 let compact_frequency = register_int_counter_vec_with_registry!(
384 "storage_level_compact_frequency",
385 "The number of compactions from one level to another level that have completed or failed.",
386 &["compactor", "group", "task_type", "result"],
387 registry
388 )
389 .unwrap();
390 let compact_skip_frequency = register_int_counter_vec_with_registry!(
391 "storage_skip_compact_frequency",
392 "The number of compactions from one level to another level that have been skipped.",
393 &["level", "type"],
394 registry
395 )
396 .unwrap();
397
398 let version_size =
399 register_int_gauge_with_registry!("storage_version_size", "version size", registry)
400 .unwrap();
401
402 let current_version_id = register_int_gauge_with_registry!(
403 "storage_current_version_id",
404 "current version id",
405 registry
406 )
407 .unwrap();
408
409 let checkpoint_version_id = register_int_gauge_with_registry!(
410 "storage_checkpoint_version_id",
411 "checkpoint version id",
412 registry
413 )
414 .unwrap();
415
416 let min_pinned_version_id = register_int_gauge_with_registry!(
417 "storage_min_pinned_version_id",
418 "min pinned version id",
419 registry
420 )
421 .unwrap();
422
423 let write_stop_compaction_groups = register_int_gauge_vec_with_registry!(
424 "storage_write_stop_compaction_groups",
425 "compaction groups of write stop state",
426 &["compaction_group_id"],
427 registry
428 )
429 .unwrap();
430
431 let full_gc_trigger_count = register_int_gauge_with_registry!(
432 "storage_full_gc_trigger_count",
433 "the number of attempts to trigger full GC",
434 registry
435 )
436 .unwrap();
437
438 let opts = histogram_opts!(
439 "storage_full_gc_candidate_object_count",
440 "the number of candidate object to delete after scanning object store",
441 exponential_buckets(1.0, 10.0, 6).unwrap()
442 );
443 let full_gc_candidate_object_count =
444 register_histogram_with_registry!(opts, registry).unwrap();
445
446 let opts = histogram_opts!(
447 "storage_full_gc_selected_object_count",
448 "the number of object to delete after filtering by meta node",
449 exponential_buckets(1.0, 10.0, 6).unwrap()
450 );
451 let full_gc_selected_object_count =
452 register_histogram_with_registry!(opts, registry).unwrap();
453
454 let min_safepoint_version_id = register_int_gauge_with_registry!(
455 "storage_min_safepoint_version_id",
456 "min safepoint version id",
457 registry
458 )
459 .unwrap();
460
461 let level_file_size = register_int_gauge_vec_with_registry!(
462 "storage_level_total_file_size",
463 "KBs total file bytes in each level",
464 &["level_index"],
465 registry
466 )
467 .unwrap();
468
469 let version_stats = register_int_gauge_vec_with_registry!(
470 "storage_version_stats",
471 "per table stats in current hummock version",
472 &["table_id", "metric"],
473 registry
474 )
475 .unwrap();
476
477 let materialized_view_stats = register_int_gauge_vec_with_registry!(
478 "storage_materialized_view_stats",
479 "per materialized view stats in current hummock version",
480 &["table_id", "metric"],
481 registry
482 )
483 .unwrap();
484
485 let stale_object_count = register_int_gauge_with_registry!(
486 "storage_stale_object_count",
487 "total number of objects that is no longer referenced by versions.",
488 registry
489 )
490 .unwrap();
491
492 let stale_object_size = register_int_gauge_with_registry!(
493 "storage_stale_object_size",
494 "total size of objects that is no longer referenced by versions.",
495 registry
496 )
497 .unwrap();
498
499 let old_version_object_count = register_int_gauge_with_registry!(
500 "storage_old_version_object_count",
501 "total number of objects that is still referenced by non-current versions",
502 registry
503 )
504 .unwrap();
505
506 let old_version_object_size = register_int_gauge_with_registry!(
507 "storage_old_version_object_size",
508 "total size of objects that is still referenced by non-current versions",
509 registry
510 )
511 .unwrap();
512
513 let current_version_object_count = register_int_gauge_with_registry!(
514 "storage_current_version_object_count",
515 "total number of objects that is referenced by current version",
516 registry
517 )
518 .unwrap();
519
520 let current_version_object_size = register_int_gauge_with_registry!(
521 "storage_current_version_object_size",
522 "total size of objects that is referenced by current version",
523 registry
524 )
525 .unwrap();
526
527 let total_object_count = register_int_gauge_with_registry!(
528 "storage_total_object_count",
529 "Total number of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
530 registry
531 ).unwrap();
532
533 let total_object_size = register_int_gauge_with_registry!(
534 "storage_total_object_size",
535 "Total size of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
536 registry
537 ).unwrap();
538
539 let table_change_log_object_count = register_int_gauge_vec_with_registry!(
540 "storage_table_change_log_object_count",
541 "per table change log object count",
542 &["table_id"],
543 registry
544 )
545 .unwrap();
546
547 let table_change_log_object_size = register_int_gauge_vec_with_registry!(
548 "storage_table_change_log_object_size",
549 "per table change log object size",
550 &["table_id"],
551 registry
552 )
553 .unwrap();
554
555 let table_change_log_min_epoch = register_int_gauge_vec_with_registry!(
556 "storage_table_change_log_min_epoch",
557 "min epoch currently retained in table change log",
558 &["table_id"],
559 registry
560 )
561 .unwrap();
562
563 let time_travel_object_count = register_int_gauge_with_registry!(
564 "storage_time_travel_object_count",
565 "total number of objects that is referenced by time travel.",
566 registry
567 )
568 .unwrap();
569
570 let delta_log_count = register_int_gauge_with_registry!(
571 "storage_delta_log_count",
572 "total number of hummock version delta log",
573 registry
574 )
575 .unwrap();
576
577 let opts = histogram_opts!(
578 "storage_version_checkpoint_latency",
579 "hummock version checkpoint latency",
580 exponential_buckets(0.1, 1.5, 20).unwrap()
581 );
582 let version_checkpoint_latency = register_histogram_with_registry!(opts, registry).unwrap();
583
584 let hummock_manager_lock_time = register_histogram_vec_with_registry!(
585 "hummock_manager_lock_time",
586 "latency for hummock manager to acquire the rwlock",
587 &["lock_name", "lock_type"],
588 registry
589 )
590 .unwrap();
591
592 let hummock_manager_real_process_time = register_histogram_vec_with_registry!(
593 "meta_hummock_manager_real_process_time",
594 "latency for hummock manager to really process the request",
595 &["method"],
596 registry
597 )
598 .unwrap();
599
600 let worker_num = register_int_gauge_vec_with_registry!(
601 "worker_num",
602 "number of nodes in the cluster",
603 &["worker_type"],
604 registry,
605 )
606 .unwrap();
607
608 let meta_type = register_int_gauge_vec_with_registry!(
609 "meta_num",
610 "role of meta nodes in the cluster",
611 &["worker_addr", "role"],
612 registry,
613 )
614 .unwrap();
615
616 let compact_pending_bytes = register_int_gauge_vec_with_registry!(
617 "storage_compact_pending_bytes",
618 "bytes of lsm tree needed to reach balance",
619 &["group"],
620 registry
621 )
622 .unwrap();
623
624 let compact_level_compression_ratio = register_gauge_vec_with_registry!(
625 "storage_compact_level_compression_ratio",
626 "compression ratio of each level of the lsm tree",
627 &["group", "level", "algorithm"],
628 registry
629 )
630 .unwrap();
631
632 let level_compact_task_cnt = register_int_gauge_vec_with_registry!(
633 "storage_level_compact_task_cnt",
634 "num of compact_task organized by group and level",
635 &["task"],
636 registry
637 )
638 .unwrap();
639
640 let time_travel_vacuum_metadata_latency = register_histogram_with_registry!(
641 histogram_opts!(
642 "storage_time_travel_vacuum_metadata_latency",
643 "Latency of vacuuming metadata for time travel",
644 exponential_buckets(0.1, 1.5, 20).unwrap()
645 ),
646 registry
647 )
648 .unwrap();
649 let time_travel_write_metadata_latency = register_histogram_with_registry!(
650 histogram_opts!(
651 "storage_time_travel_write_metadata_latency",
652 "Latency of writing metadata for time travel",
653 exponential_buckets(0.1, 1.5, 20).unwrap()
654 ),
655 registry
656 )
657 .unwrap();
658
659 let object_store_metric = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
660
661 let recovery_failure_cnt = register_int_counter_vec_with_registry!(
662 "recovery_failure_cnt",
663 "Number of failed recovery attempts",
664 &["recovery_type"],
665 registry
666 )
667 .unwrap();
668 let opts = histogram_opts!(
669 "recovery_latency",
670 "Latency of the recovery process",
671 exponential_buckets(0.1, 1.5, 20).unwrap() );
673 let recovery_latency =
674 register_histogram_vec_with_registry!(opts, &["recovery_type"], registry).unwrap();
675
676 let auto_schema_change_failure_cnt = register_guarded_int_counter_vec_with_registry!(
677 "auto_schema_change_failure_cnt",
678 "Number of failed auto schema change",
679 &["table_id", "table_name"],
680 registry
681 )
682 .unwrap();
683
684 let auto_schema_change_success_cnt = register_guarded_int_counter_vec_with_registry!(
685 "auto_schema_change_success_cnt",
686 "Number of success auto schema change",
687 &["table_id", "table_name"],
688 registry
689 )
690 .unwrap();
691
692 let opts = histogram_opts!(
693 "auto_schema_change_latency",
694 "Latency of the auto schema change process",
695 exponential_buckets(0.1, 1.5, 20).unwrap() );
697 let auto_schema_change_latency = register_guarded_histogram_vec_with_registry!(
698 opts,
699 &["table_id", "table_name"],
700 registry
701 )
702 .unwrap();
703
704 let source_is_up = register_guarded_int_gauge_vec_with_registry!(
705 "source_status_is_up",
706 "source is up or not",
707 &["source_id", "source_name"],
708 registry
709 )
710 .unwrap();
711 let source_enumerator_metrics = Arc::new(SourceEnumeratorMetrics::default());
712
713 let actor_info = register_int_gauge_vec_with_registry!(
714 "actor_info",
715 "Mapping from actor id to (fragment id, compute node)",
716 &["actor_id", "fragment_id", "compute_node"],
717 registry
718 )
719 .unwrap();
720
721 let table_info = register_int_gauge_vec_with_registry!(
722 "table_info",
723 "Mapping from table id to (actor id, table name)",
724 &[
725 "materialized_view_id",
726 "table_id",
727 "fragment_id",
728 "table_name",
729 "table_type",
730 "compaction_group_id"
731 ],
732 registry
733 )
734 .unwrap();
735
736 let sink_info = register_int_gauge_vec_with_registry!(
737 "sink_info",
738 "Mapping from actor id to (actor id, sink name)",
739 &["actor_id", "sink_id", "sink_name",],
740 registry
741 )
742 .unwrap();
743
744 let relation_info = register_int_gauge_vec_with_registry!(
745 "relation_info",
746 "Information of the database relation (table/source/sink/materialized view/index/internal)",
747 &["id", "database", "schema", "name", "resource_group", "type"],
748 registry
749 )
750 .unwrap();
751
752 let database_info = register_int_gauge_vec_with_registry!(
753 "database_info",
754 "Mapping from database id to database name",
755 &["database_id", "database_name"],
756 registry
757 )
758 .unwrap();
759
760 let backfill_fragment_progress = register_int_gauge_vec_with_registry!(
761 "backfill_fragment_progress",
762 "Backfill progress per fragment",
763 &[
764 "job_id",
765 "fragment_id",
766 "backfill_state_table_id",
767 "backfill_target_relation_id",
768 "backfill_target_relation_name",
769 "backfill_target_relation_type",
770 "backfill_type",
771 "backfill_epoch",
772 "upstream_type",
773 "backfill_progress",
774 ],
775 registry
776 )
777 .unwrap();
778
779 let l0_compact_level_count = register_histogram_vec_with_registry!(
780 "storage_l0_compact_level_count",
781 "level_count of l0 compact task",
782 &["group", "type"],
783 registry
784 )
785 .unwrap();
786
787 let system_param_info = register_int_gauge_vec_with_registry!(
789 "system_param_info",
790 "Information of system parameters",
791 &["name", "value"],
792 registry
793 )
794 .unwrap();
795
796 let opts = histogram_opts!(
797 "storage_compact_task_size",
798 "Total size of compact that have been issued to state store",
799 exponential_buckets(1048576.0, 2.0, 16).unwrap()
800 );
801
802 let compact_task_size =
803 register_histogram_vec_with_registry!(opts, &["group", "type"], registry).unwrap();
804
805 let compact_task_file_count = register_histogram_vec_with_registry!(
806 "storage_compact_task_file_count",
807 "file count of compact task",
808 &["group", "type"],
809 registry
810 )
811 .unwrap();
812 let opts = histogram_opts!(
813 "storage_compact_task_batch_count",
814 "count of compact task batch",
815 exponential_buckets(1.0, 2.0, 8).unwrap()
816 );
817 let compact_task_batch_count =
818 register_histogram_vec_with_registry!(opts, &["type"], registry).unwrap();
819
820 let table_write_throughput = register_int_counter_vec_with_registry!(
821 "storage_commit_write_throughput",
822 "The number of compactions from one level to another level that have been skipped.",
823 &["table_id"],
824 registry
825 )
826 .unwrap();
827
828 let split_compaction_group_count = register_int_counter_vec_with_registry!(
829 "storage_split_compaction_group_count",
830 "Count of trigger split compaction group",
831 &["group"],
832 registry
833 )
834 .unwrap();
835
836 let state_table_count = register_int_gauge_vec_with_registry!(
837 "storage_state_table_count",
838 "Count of stable table per compaction group",
839 &["group"],
840 registry
841 )
842 .unwrap();
843
844 let branched_sst_count = register_int_gauge_vec_with_registry!(
845 "storage_branched_sst_count",
846 "Count of branched sst per compaction group",
847 &["group"],
848 registry
849 )
850 .unwrap();
851
852 let opts = histogram_opts!(
853 "storage_compaction_event_consumed_latency",
854 "The latency(ms) of each event being consumed",
855 exponential_buckets(1.0, 1.5, 30).unwrap() );
857 let compaction_event_consumed_latency =
858 register_histogram_with_registry!(opts, registry).unwrap();
859
860 let opts = histogram_opts!(
861 "storage_compaction_event_loop_iteration_latency",
862 "The latency(ms) of each iteration of the compaction event loop",
863 exponential_buckets(1.0, 1.5, 30).unwrap() );
865 let compaction_event_loop_iteration_latency =
866 register_histogram_with_registry!(opts, registry).unwrap();
867
868 let merge_compaction_group_count = register_int_counter_vec_with_registry!(
869 "storage_merge_compaction_group_count",
870 "Count of trigger merge compaction group",
871 &["group"],
872 registry
873 )
874 .unwrap();
875
876 let opts = histogram_opts!(
877 "storage_time_travel_version_replay_latency",
878 "The latency(ms) of replaying a hummock version for time travel",
879 exponential_buckets(0.01, 10.0, 6).unwrap()
880 );
881 let time_travel_version_replay_latency =
882 register_histogram_with_registry!(opts, registry).unwrap();
883
884 let compaction_group_count = register_int_gauge_with_registry!(
885 "storage_compaction_group_count",
886 "The number of compaction groups",
887 registry,
888 )
889 .unwrap();
890
891 let compaction_group_size = register_int_gauge_vec_with_registry!(
892 "storage_compaction_group_size",
893 "The size of compaction group",
894 &["group"],
895 registry
896 )
897 .unwrap();
898
899 let compaction_group_file_count = register_int_gauge_vec_with_registry!(
900 "storage_compaction_group_file_count",
901 "The file count of compaction group",
902 &["group"],
903 registry
904 )
905 .unwrap();
906
907 let compaction_group_throughput = register_int_gauge_vec_with_registry!(
908 "storage_compaction_group_throughput",
909 "The throughput of compaction group",
910 &["group"],
911 registry
912 )
913 .unwrap();
914
915 let opts = histogram_opts!(
916 "storage_compact_task_trivial_move_sst_count",
917 "sst count of compact trivial-move task",
918 exponential_buckets(1.0, 2.0, 8).unwrap()
919 );
920 let compact_task_trivial_move_sst_count =
921 register_histogram_vec_with_registry!(opts, &["group"], registry).unwrap();
922
923 let refresh_job_duration = register_guarded_uint_gauge_vec_with_registry!(
924 "meta_refresh_job_duration",
925 "The duration of refresh job",
926 &["table_id", "status"],
927 registry
928 )
929 .unwrap();
930 let refresh_job_finish_cnt = register_guarded_int_counter_vec_with_registry!(
931 "meta_refresh_job_finish_cnt",
932 "The number of finished refresh jobs",
933 &["table_id", "status"],
934 registry
935 )
936 .unwrap();
937 let refresh_cron_job_trigger_cnt = register_guarded_int_counter_vec_with_registry!(
938 "meta_refresh_cron_job_trigger_cnt",
939 "The number of cron refresh jobs triggered",
940 &["table_id"],
941 registry
942 )
943 .unwrap();
944 let refresh_cron_job_miss_cnt = register_guarded_int_counter_vec_with_registry!(
945 "meta_refresh_cron_job_miss_cnt",
946 "The number of cron refresh jobs missed",
947 &["table_id"],
948 registry
949 )
950 .unwrap();
951
952 Self {
953 grpc_latency,
954 barrier_latency,
955 barrier_wait_commit_latency,
956 barrier_send_latency,
957 all_barrier_nums,
958 in_flight_barrier_nums,
959 last_committed_barrier_time,
960 barrier_interval_by_database,
961 snapshot_backfill_barrier_latency,
962 snapshot_backfill_lag,
963 snapshot_backfill_inflight_barrier_num,
964 recovery_failure_cnt,
965 recovery_latency,
966
967 max_committed_epoch,
968 min_committed_epoch,
969 level_sst_num,
970 level_compact_cnt,
971 compact_frequency,
972 compact_skip_frequency,
973 level_file_size,
974 version_size,
975 version_stats,
976 materialized_view_stats,
977 stale_object_count,
978 stale_object_size,
979 old_version_object_count,
980 old_version_object_size,
981 time_travel_object_count,
982 current_version_object_count,
983 current_version_object_size,
984 total_object_count,
985 total_object_size,
986 table_change_log_object_count,
987 table_change_log_object_size,
988 table_change_log_min_epoch,
989 delta_log_count,
990 version_checkpoint_latency,
991 current_version_id,
992 checkpoint_version_id,
993 min_pinned_version_id,
994 min_safepoint_version_id,
995 write_stop_compaction_groups,
996 full_gc_trigger_count,
997 full_gc_candidate_object_count,
998 full_gc_selected_object_count,
999 hummock_manager_lock_time,
1000 hummock_manager_real_process_time,
1001 time_after_last_observation: Arc::new(AtomicU64::new(0)),
1002 worker_num,
1003 meta_type,
1004 compact_pending_bytes,
1005 compact_level_compression_ratio,
1006 level_compact_task_cnt,
1007 object_store_metric,
1008 source_is_up,
1009 source_enumerator_metrics,
1010 actor_info,
1011 table_info,
1012 sink_info,
1013 relation_info,
1014 database_info,
1015 backfill_fragment_progress,
1016 system_param_info,
1017 l0_compact_level_count,
1018 compact_task_size,
1019 compact_task_file_count,
1020 compact_task_batch_count,
1021 compact_task_trivial_move_sst_count,
1022 table_write_throughput,
1023 split_compaction_group_count,
1024 state_table_count,
1025 branched_sst_count,
1026 compaction_event_consumed_latency,
1027 compaction_event_loop_iteration_latency,
1028 auto_schema_change_failure_cnt,
1029 auto_schema_change_success_cnt,
1030 auto_schema_change_latency,
1031 merge_compaction_group_count,
1032 time_travel_version_replay_latency,
1033 compaction_group_count,
1034 compaction_group_size,
1035 compaction_group_file_count,
1036 compaction_group_throughput,
1037 refresh_job_duration,
1038 refresh_job_finish_cnt,
1039 refresh_cron_job_trigger_cnt,
1040 refresh_cron_job_miss_cnt,
1041 time_travel_vacuum_metadata_latency,
1042 time_travel_write_metadata_latency,
1043 }
1044 }
1045
1046 #[cfg(test)]
1047 pub fn for_test(registry: &Registry) -> Self {
1048 Self::new(registry)
1049 }
1050}
1051impl Default for MetaMetrics {
1052 fn default() -> Self {
1053 GLOBAL_META_METRICS.clone()
1054 }
1055}
1056
1057pub async fn refresh_system_param_info_metrics(
1059 system_params_controller: &SystemParamsControllerRef,
1060 meta_metrics: Arc<MetaMetrics>,
1061) {
1062 let params_info = system_params_controller.get_params().await.get_all();
1063
1064 meta_metrics.system_param_info.reset();
1065 for info in params_info {
1066 meta_metrics
1067 .system_param_info
1068 .with_label_values(&[info.name, &info.value])
1069 .set(1);
1070 }
1071}
1072
1073pub fn start_worker_info_monitor(
1074 metadata_manager: MetadataManager,
1075 election_client: ElectionClientRef,
1076 interval: Duration,
1077 meta_metrics: Arc<MetaMetrics>,
1078) -> (JoinHandle<()>, Sender<()>) {
1079 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1080 let join_handle = tokio::spawn(async move {
1081 let mut monitor_interval = tokio::time::interval(interval);
1082 monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1083 loop {
1084 tokio::select! {
1085 _ = monitor_interval.tick() => {},
1087 _ = &mut shutdown_rx => {
1089 tracing::info!("Worker number monitor is stopped");
1090 return;
1091 }
1092 }
1093
1094 let node_map = match metadata_manager.count_worker_node().await {
1095 Ok(node_map) => node_map,
1096 Err(err) => {
1097 tracing::warn!(error = %err.as_report(), "fail to count worker node");
1098 continue;
1099 }
1100 };
1101
1102 meta_metrics.worker_num.reset();
1104 meta_metrics.meta_type.reset();
1105
1106 for (worker_type, worker_num) in node_map {
1107 meta_metrics
1108 .worker_num
1109 .with_label_values(&[(worker_type.as_str_name())])
1110 .set(worker_num as i64);
1111 }
1112 if let Ok(meta_members) = election_client.get_members().await {
1113 meta_metrics
1114 .worker_num
1115 .with_label_values(&[WorkerType::Meta.as_str_name()])
1116 .set(meta_members.len() as i64);
1117 meta_members.into_iter().for_each(|m| {
1118 let role = if m.is_leader { "leader" } else { "follower" };
1119 meta_metrics
1120 .meta_type
1121 .with_label_values(&[m.id.as_str(), role])
1122 .set(1);
1123 });
1124 }
1125 }
1126 });
1127
1128 (join_handle, shutdown_tx)
1129}
1130
1131pub async fn refresh_fragment_info_metrics(
1132 catalog_controller: &CatalogControllerRef,
1133 cluster_controller: &ClusterControllerRef,
1134 hummock_manager: &HummockManagerRef,
1135 meta_metrics: Arc<MetaMetrics>,
1136) {
1137 let worker_nodes = match cluster_controller
1138 .list_workers(Some(WorkerType::ComputeNode.into()), None)
1139 .await
1140 {
1141 Ok(worker_nodes) => worker_nodes,
1142 Err(err) => {
1143 tracing::warn!(error=%err.as_report(), "fail to list worker node");
1144 return;
1145 }
1146 };
1147 let actor_locations = match catalog_controller.list_actor_locations() {
1148 Ok(actor_locations) => actor_locations,
1149 Err(err) => {
1150 tracing::warn!(error=%err.as_report(), "fail to get actor locations");
1151 return;
1152 }
1153 };
1154 let sink_actor_mapping = match catalog_controller.list_sink_actor_mapping().await {
1155 Ok(sink_actor_mapping) => sink_actor_mapping,
1156 Err(err) => {
1157 tracing::warn!(error=%err.as_report(), "fail to get sink actor mapping");
1158 return;
1159 }
1160 };
1161 let fragment_state_tables = match catalog_controller.list_fragment_state_tables().await {
1162 Ok(fragment_state_tables) => fragment_state_tables,
1163 Err(err) => {
1164 tracing::warn!(error=%err.as_report(), "fail to get fragment state tables");
1165 return;
1166 }
1167 };
1168 let table_name_and_type_mapping = match catalog_controller.get_table_name_type_mapping().await {
1169 Ok(mapping) => mapping,
1170 Err(err) => {
1171 tracing::warn!(error=%err.as_report(), "fail to get table name mapping");
1172 return;
1173 }
1174 };
1175
1176 let worker_addr_mapping: HashMap<WorkerId, String> = worker_nodes
1177 .into_iter()
1178 .map(|worker_node| {
1179 let addr = match worker_node.host {
1180 Some(host) => format!("{}:{}", host.host, host.port),
1181 None => "".to_owned(),
1182 };
1183 (worker_node.id, addr)
1184 })
1185 .collect();
1186 let table_compaction_group_id_mapping = hummock_manager
1187 .get_table_compaction_group_id_mapping()
1188 .await;
1189
1190 meta_metrics.actor_info.reset();
1193 meta_metrics.table_info.reset();
1194 meta_metrics.sink_info.reset();
1195 for actor_location in actor_locations {
1196 let actor_id_str = actor_location.actor_id.to_string();
1197 let fragment_id_str = actor_location.fragment_id.to_string();
1198 if let Some(address) = worker_addr_mapping.get(&actor_location.worker_id) {
1201 meta_metrics
1202 .actor_info
1203 .with_label_values(&[&actor_id_str, &fragment_id_str, address])
1204 .set(1);
1205 }
1206 }
1207 for (sink_id, (sink_name, actor_ids)) in sink_actor_mapping {
1208 let sink_id_str = sink_id.to_string();
1209 for actor_id in actor_ids {
1210 let actor_id_str = actor_id.to_string();
1211 meta_metrics
1212 .sink_info
1213 .with_label_values(&[&actor_id_str, &sink_id_str, &sink_name])
1214 .set(1);
1215 }
1216 }
1217 for PartialFragmentStateTables {
1218 fragment_id,
1219 job_id,
1220 state_table_ids,
1221 } in fragment_state_tables
1222 {
1223 let fragment_id_str = fragment_id.to_string();
1224 let job_id_str = job_id.to_string();
1225 for table_id in state_table_ids.into_inner() {
1226 let table_id_str = table_id.to_string();
1227 let (table_name, table_type) = table_name_and_type_mapping
1228 .get(&table_id)
1229 .cloned()
1230 .unwrap_or_else(|| ("unknown".to_owned(), "unknown".to_owned()));
1231 let compaction_group_id = table_compaction_group_id_mapping
1232 .get(&table_id)
1233 .map(|cg_id| cg_id.to_string())
1234 .unwrap_or_else(|| "unknown".to_owned());
1235 meta_metrics
1236 .table_info
1237 .with_label_values(&[
1238 &job_id_str,
1239 &table_id_str,
1240 &fragment_id_str,
1241 &table_name,
1242 &table_type,
1243 &compaction_group_id,
1244 ])
1245 .set(1);
1246 }
1247 }
1248}
1249
1250pub async fn refresh_relation_info_metrics(
1251 catalog_controller: &CatalogControllerRef,
1252 meta_metrics: Arc<MetaMetrics>,
1253) {
1254 let table_objects = match catalog_controller.list_table_objects().await {
1255 Ok(table_objects) => table_objects,
1256 Err(err) => {
1257 tracing::warn!(error=%err.as_report(), "fail to get table objects");
1258 return;
1259 }
1260 };
1261
1262 let source_objects = match catalog_controller.list_source_objects().await {
1263 Ok(source_objects) => source_objects,
1264 Err(err) => {
1265 tracing::warn!(error=%err.as_report(), "fail to get source objects");
1266 return;
1267 }
1268 };
1269
1270 let sink_objects = match catalog_controller.list_sink_objects().await {
1271 Ok(sink_objects) => sink_objects,
1272 Err(err) => {
1273 tracing::warn!(error=%err.as_report(), "fail to get sink objects");
1274 return;
1275 }
1276 };
1277
1278 meta_metrics.relation_info.reset();
1279
1280 for (id, db, schema, name, resource_group, table_type) in table_objects {
1281 let relation_type = match table_type {
1282 TableType::Table => "table",
1283 TableType::MaterializedView => "materialized_view",
1284 TableType::Index | TableType::VectorIndex => "index",
1285 TableType::Internal => "internal",
1286 };
1287 meta_metrics
1288 .relation_info
1289 .with_label_values(&[
1290 &id.to_string(),
1291 &db,
1292 &schema,
1293 &name,
1294 &resource_group,
1295 &relation_type.to_owned(),
1296 ])
1297 .set(1);
1298 }
1299
1300 for (id, db, schema, name, resource_group) in source_objects {
1301 meta_metrics
1302 .relation_info
1303 .with_label_values(&[
1304 &id.to_string(),
1305 &db,
1306 &schema,
1307 &name,
1308 &resource_group,
1309 &"source".to_owned(),
1310 ])
1311 .set(1);
1312 }
1313
1314 for (id, db, schema, name, resource_group) in sink_objects {
1315 meta_metrics
1316 .relation_info
1317 .with_label_values(&[
1318 &id.to_string(),
1319 &db,
1320 &schema,
1321 &name,
1322 &resource_group,
1323 &"sink".to_owned(),
1324 ])
1325 .set(1);
1326 }
1327}
1328
1329pub async fn refresh_database_info_metrics(
1330 catalog_controller: &CatalogControllerRef,
1331 meta_metrics: Arc<MetaMetrics>,
1332) {
1333 let databases = match catalog_controller.list_databases().await {
1334 Ok(databases) => databases,
1335 Err(err) => {
1336 tracing::warn!(error=%err.as_report(), "fail to get databases");
1337 return;
1338 }
1339 };
1340
1341 meta_metrics.database_info.reset();
1342
1343 for db in databases {
1344 meta_metrics
1345 .database_info
1346 .with_label_values(&[&db.id.to_string(), &db.name])
1347 .set(1);
1348 }
1349}
1350
1351fn extract_backfill_fragment_info(
1352 distribution: &FragmentDistribution,
1353) -> Option<BackfillFragmentInfo> {
1354 let backfill_type =
1355 if distribution.fragment_type_mask & FragmentTypeFlag::SourceScan as u32 != 0 {
1356 "SOURCE"
1357 } else if distribution.fragment_type_mask
1358 & (FragmentTypeFlag::SnapshotBackfillStreamScan as u32
1359 | FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32)
1360 != 0
1361 {
1362 "SNAPSHOT_BACKFILL"
1363 } else if distribution.fragment_type_mask & FragmentTypeFlag::StreamScan as u32 != 0 {
1364 "ARRANGEMENT_OR_NO_SHUFFLE"
1365 } else {
1366 return None;
1367 };
1368
1369 let stream_node = distribution.node.as_ref()?;
1370 let mut info = None;
1371 match backfill_type {
1372 "SOURCE" => {
1373 visit_stream_node_source_backfill(stream_node, |node| {
1374 info = Some(BackfillFragmentInfo {
1375 job_id: distribution.table_id.as_raw_id(),
1376 fragment_id: distribution.fragment_id.as_raw_id(),
1377 backfill_state_table_id: node
1378 .state_table
1379 .as_ref()
1380 .map(|table| table.id.as_raw_id())
1381 .unwrap_or_default(),
1382 backfill_target_relation_id: node.upstream_source_id.as_raw_id(),
1383 backfill_type,
1384 backfill_epoch: 0,
1385 });
1386 });
1387 }
1388 "SNAPSHOT_BACKFILL" | "ARRANGEMENT_OR_NO_SHUFFLE" => {
1389 visit_stream_node_stream_scan(stream_node, |node| {
1390 info = Some(BackfillFragmentInfo {
1391 job_id: distribution.table_id.as_raw_id(),
1392 fragment_id: distribution.fragment_id.as_raw_id(),
1393 backfill_state_table_id: node
1394 .state_table
1395 .as_ref()
1396 .map(|table| table.id.as_raw_id())
1397 .unwrap_or_default(),
1398 backfill_target_relation_id: node.table_id.as_raw_id(),
1399 backfill_type,
1400 backfill_epoch: node.snapshot_backfill_epoch.unwrap_or_default(),
1401 });
1402 });
1403 }
1404 _ => {}
1405 }
1406
1407 info
1408}
1409
1410pub async fn refresh_backfill_progress_metrics(
1411 catalog_controller: &CatalogControllerRef,
1412 hummock_manager: &HummockManagerRef,
1413 barrier_manager: &BarrierManagerRef,
1414 meta_metrics: Arc<MetaMetrics>,
1415) {
1416 let fragment_descs = match catalog_controller.list_fragment_descs_with_node(true).await {
1417 Ok(fragment_descs) => fragment_descs,
1418 Err(err) => {
1419 tracing::warn!(error=%err.as_report(), "fail to list creating fragment descs");
1420 return;
1421 }
1422 };
1423
1424 let backfill_infos: HashMap<(u32, u32), BackfillFragmentInfo> = fragment_descs
1425 .iter()
1426 .filter_map(|(distribution, _)| extract_backfill_fragment_info(distribution))
1427 .map(|info| ((info.job_id, info.fragment_id), info))
1428 .collect();
1429
1430 let fragment_progresses = match barrier_manager.get_fragment_backfill_progress().await {
1431 Ok(progress) => progress,
1432 Err(err) => {
1433 tracing::warn!(error=%err.as_report(), "fail to get fragment backfill progress");
1434 return;
1435 }
1436 };
1437
1438 let progress_by_fragment: HashMap<(u32, u32), _> = fragment_progresses
1439 .into_iter()
1440 .map(|progress| {
1441 (
1442 (
1443 progress.job_id.as_raw_id(),
1444 progress.fragment_id.as_raw_id(),
1445 ),
1446 progress,
1447 )
1448 })
1449 .collect();
1450
1451 let version_stats = hummock_manager.get_version_stats().await;
1452
1453 let relation_ids: HashSet<_> = backfill_infos
1454 .values()
1455 .map(|info| ObjectId::new(info.backfill_target_relation_id))
1456 .collect();
1457 let relation_objects = match catalog_controller
1458 .list_relation_objects_by_ids(&relation_ids)
1459 .await
1460 {
1461 Ok(relation_objects) => relation_objects,
1462 Err(err) => {
1463 tracing::warn!(error=%err.as_report(), "fail to get relation objects");
1464 return;
1465 }
1466 };
1467
1468 let mut relation_info = HashMap::new();
1469 for (id, db, schema, name, rel_type) in relation_objects {
1470 relation_info.insert(id.as_raw_id(), (db, schema, name, rel_type));
1471 }
1472
1473 meta_metrics.backfill_fragment_progress.reset();
1474
1475 for info in backfill_infos.values() {
1476 let progress = progress_by_fragment.get(&(info.job_id, info.fragment_id));
1477 let (db, schema, name, rel_type) = relation_info
1478 .get(&info.backfill_target_relation_id)
1479 .cloned()
1480 .unwrap_or_else(|| {
1481 (
1482 "unknown".to_owned(),
1483 "unknown".to_owned(),
1484 "unknown".to_owned(),
1485 "unknown".to_owned(),
1486 )
1487 });
1488
1489 let job_id_str = info.job_id.to_string();
1490 let fragment_id_str = info.fragment_id.to_string();
1491 let backfill_state_table_id_str = info.backfill_state_table_id.to_string();
1492 let backfill_target_relation_id_str = info.backfill_target_relation_id.to_string();
1493 let backfill_target_relation_name_str = format!("{db}.{schema}.{name}");
1494 let backfill_target_relation_type_str = rel_type;
1495 let backfill_type_str = info.backfill_type.to_owned();
1496 let backfill_epoch_str = info.backfill_epoch.to_string();
1497 let total_key_count = version_stats
1498 .table_stats
1499 .get(&TableId::new(info.backfill_target_relation_id))
1500 .map(|stats| stats.total_key_count as u64);
1501
1502 let progress_label = match (info.backfill_type, progress) {
1503 ("SOURCE", Some(progress)) => format!("{} consumed rows", progress.consumed_rows),
1504 ("SOURCE", None) => "0 consumed rows".to_owned(),
1505 (_, Some(progress)) if progress.done => {
1506 let total = total_key_count.unwrap_or(0);
1507 format!("100.0000% ({}/{})", total, total)
1508 }
1509 (_, Some(progress)) => {
1510 let total = total_key_count.unwrap_or(0);
1511 if total == 0 {
1512 "100.0000% (0/0)".to_owned()
1513 } else {
1514 let raw = (progress.consumed_rows as f64) / (total as f64) * 100.0;
1515 format!(
1516 "{:.4}% ({}/{})",
1517 raw.min(100.0),
1518 progress.consumed_rows,
1519 total
1520 )
1521 }
1522 }
1523 (_, None) => "0.0000% (0/0)".to_owned(),
1524 };
1525 let upstream_type_str = progress
1526 .map(|progress| progress.upstream_type.to_string())
1527 .unwrap_or_else(|| "Unknown".to_owned());
1528
1529 meta_metrics
1530 .backfill_fragment_progress
1531 .with_label_values(&[
1532 &job_id_str,
1533 &fragment_id_str,
1534 &backfill_state_table_id_str,
1535 &backfill_target_relation_id_str,
1536 &backfill_target_relation_name_str,
1537 &backfill_target_relation_type_str,
1538 &backfill_type_str,
1539 &backfill_epoch_str,
1540 &upstream_type_str,
1541 &progress_label,
1542 ])
1543 .set(1);
1544 }
1545}
1546
1547pub fn start_info_monitor(
1548 metadata_manager: MetadataManager,
1549 hummock_manager: HummockManagerRef,
1550 barrier_manager: BarrierManagerRef,
1551 system_params_controller: SystemParamsControllerRef,
1552 meta_metrics: Arc<MetaMetrics>,
1553) -> (JoinHandle<()>, Sender<()>) {
1554 const COLLECT_INTERVAL_SECONDS: u64 = 60;
1555
1556 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1557 let join_handle = tokio::spawn(async move {
1558 let mut monitor_interval =
1559 tokio::time::interval(Duration::from_secs(COLLECT_INTERVAL_SECONDS));
1560 monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1561 loop {
1562 tokio::select! {
1563 _ = monitor_interval.tick() => {},
1565 _ = &mut shutdown_rx => {
1567 tracing::info!("Meta info monitor is stopped");
1568 return;
1569 }
1570 }
1571
1572 refresh_fragment_info_metrics(
1574 &metadata_manager.catalog_controller,
1575 &metadata_manager.cluster_controller,
1576 &hummock_manager,
1577 meta_metrics.clone(),
1578 )
1579 .await;
1580
1581 refresh_relation_info_metrics(
1582 &metadata_manager.catalog_controller,
1583 meta_metrics.clone(),
1584 )
1585 .await;
1586
1587 refresh_database_info_metrics(
1588 &metadata_manager.catalog_controller,
1589 meta_metrics.clone(),
1590 )
1591 .await;
1592
1593 refresh_backfill_progress_metrics(
1594 &metadata_manager.catalog_controller,
1595 &hummock_manager,
1596 &barrier_manager,
1597 meta_metrics.clone(),
1598 )
1599 .await;
1600
1601 refresh_system_param_info_metrics(&system_params_controller, meta_metrics.clone())
1603 .await;
1604 }
1605 });
1606
1607 (join_handle, shutdown_tx)
1608}