1use std::collections::{HashMap, HashSet};
16use std::sync::atomic::AtomicU64;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use prometheus::core::{AtomicF64, GenericGaugeVec};
21use prometheus::{
22 GaugeVec, Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
23 exponential_buckets, histogram_opts, register_gauge_vec_with_registry,
24 register_histogram_vec_with_registry, register_histogram_with_registry,
25 register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
26 register_int_gauge_with_registry,
27};
28use risingwave_common::catalog::{FragmentTypeFlag, TableId};
29use risingwave_common::metrics::{
30 LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
31 LabelGuardedUintGaugeVec,
32};
33use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
34use risingwave_common::system_param::reader::SystemParamsRead;
35use risingwave_common::util::stream_graph_visitor::{
36 visit_stream_node_source_backfill, visit_stream_node_stream_scan,
37};
38use risingwave_common::{
39 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
40 register_guarded_int_gauge_vec_with_registry, register_guarded_uint_gauge_vec_with_registry,
41};
42use risingwave_connector::source::monitor::EnumeratorMetrics as SourceEnumeratorMetrics;
43use risingwave_meta_model::table::TableType;
44use risingwave_meta_model::{ObjectId, WorkerId};
45use risingwave_object_store::object::object_metrics::{
46 GLOBAL_OBJECT_STORE_METRICS, ObjectStoreMetrics,
47};
48use risingwave_pb::common::WorkerType;
49use risingwave_pb::meta::FragmentDistribution;
50use thiserror_ext::AsReport;
51use tokio::sync::oneshot::Sender;
52use tokio::task::JoinHandle;
53
54use crate::barrier::BarrierManagerRef;
55use crate::controller::catalog::CatalogControllerRef;
56use crate::controller::cluster::ClusterControllerRef;
57use crate::controller::system_param::SystemParamsControllerRef;
58use crate::controller::utils::PartialFragmentStateTables;
59use crate::hummock::HummockManagerRef;
60use crate::manager::MetadataManager;
61use crate::rpc::ElectionClientRef;
62
63struct BackfillFragmentInfo {
64 job_id: u32,
65 fragment_id: u32,
66 backfill_state_table_id: u32,
67 backfill_target_relation_id: u32,
68 backfill_type: &'static str,
69 backfill_epoch: u64,
70}
71
72#[derive(Clone)]
73pub struct MetaMetrics {
74 pub worker_num: IntGaugeVec,
77 pub meta_type: IntGaugeVec,
79
80 pub grpc_latency: HistogramVec,
83
84 pub barrier_latency: LabelGuardedHistogramVec,
88 pub barrier_wait_commit_latency: Histogram,
90 pub barrier_send_latency: LabelGuardedHistogramVec,
92 pub all_barrier_nums: LabelGuardedIntGaugeVec,
95 pub in_flight_barrier_nums: LabelGuardedIntGaugeVec,
97 pub last_committed_barrier_time: IntGaugeVec,
99 pub barrier_interval_by_database: GaugeVec,
101
102 pub snapshot_backfill_barrier_latency: LabelGuardedHistogramVec, pub snapshot_backfill_wait_commit_latency: LabelGuardedHistogramVec, pub snapshot_backfill_lag: LabelGuardedIntGaugeVec, pub snapshot_backfill_inflight_barrier_num: LabelGuardedIntGaugeVec, pub recovery_failure_cnt: IntCounterVec,
114 pub recovery_latency: HistogramVec,
115
116 pub max_committed_epoch: IntGauge,
119 pub min_committed_epoch: IntGauge,
121 pub level_sst_num: IntGaugeVec,
123 pub level_compact_cnt: IntGaugeVec,
125 pub compact_frequency: IntCounterVec,
127 pub level_file_size: IntGaugeVec,
129 pub version_size: IntGauge,
131 pub current_version_id: IntGauge,
133 pub checkpoint_version_id: IntGauge,
135 pub min_pinned_version_id: IntGauge,
137 pub min_safepoint_version_id: IntGauge,
139 pub write_stop_compaction_groups: IntGaugeVec,
141 pub full_gc_trigger_count: IntGauge,
143 pub full_gc_candidate_object_count: Histogram,
145 pub full_gc_selected_object_count: Histogram,
147 pub version_stats: IntGaugeVec,
149 pub materialized_view_stats: IntGaugeVec,
151 pub stale_object_count: IntGauge,
153 pub stale_object_size: IntGauge,
155 pub old_version_object_count: IntGauge,
157 pub old_version_object_size: IntGauge,
159 pub time_travel_object_count: IntGauge,
161 pub current_version_object_count: IntGauge,
163 pub current_version_object_size: IntGauge,
165 pub total_object_count: IntGauge,
167 pub total_object_size: IntGauge,
169 pub table_change_log_object_count: IntGaugeVec,
171 pub table_change_log_object_size: IntGaugeVec,
173 pub delta_log_count: IntGauge,
175 pub version_checkpoint_latency: Histogram,
177 pub hummock_manager_lock_time: HistogramVec,
179 pub hummock_manager_real_process_time: HistogramVec,
181 pub compact_skip_frequency: IntCounterVec,
183 pub compact_pending_bytes: IntGaugeVec,
185 pub compact_level_compression_ratio: GenericGaugeVec<AtomicF64>,
187 pub level_compact_task_cnt: IntGaugeVec,
189 pub time_after_last_observation: Arc<AtomicU64>,
190 pub l0_compact_level_count: HistogramVec,
191 pub compact_task_size: HistogramVec,
192 pub compact_task_file_count: HistogramVec,
193 pub compact_task_batch_count: HistogramVec,
194 pub split_compaction_group_count: IntCounterVec,
195 pub state_table_count: IntGaugeVec,
196 pub branched_sst_count: IntGaugeVec,
197 pub compact_task_trivial_move_sst_count: HistogramVec,
198
199 pub compaction_event_consumed_latency: Histogram,
200 pub compaction_event_loop_iteration_latency: Histogram,
201 pub time_travel_vacuum_metadata_latency: Histogram,
202 pub time_travel_write_metadata_latency: Histogram,
203
204 pub object_store_metric: Arc<ObjectStoreMetrics>,
207
208 pub source_is_up: LabelGuardedIntGaugeVec,
211 pub source_enumerator_metrics: Arc<SourceEnumeratorMetrics>,
212
213 pub actor_info: IntGaugeVec,
216 pub table_info: IntGaugeVec,
218 pub sink_info: IntGaugeVec,
220 pub relation_info: IntGaugeVec,
222 pub backfill_fragment_progress: IntGaugeVec,
224
225 pub system_param_info: IntGaugeVec,
229
230 pub table_write_throughput: IntCounterVec,
232
233 pub merge_compaction_group_count: IntCounterVec,
235
236 pub auto_schema_change_failure_cnt: LabelGuardedIntCounterVec,
238 pub auto_schema_change_success_cnt: LabelGuardedIntCounterVec,
239 pub auto_schema_change_latency: LabelGuardedHistogramVec,
240
241 pub time_travel_version_replay_latency: Histogram,
242
243 pub compaction_group_count: IntGauge,
244 pub compaction_group_size: IntGaugeVec,
245 pub compaction_group_file_count: IntGaugeVec,
246 pub compaction_group_throughput: IntGaugeVec,
247
248 pub refresh_job_duration: LabelGuardedUintGaugeVec,
250 pub refresh_job_finish_cnt: LabelGuardedIntCounterVec,
251 pub refresh_cron_job_trigger_cnt: LabelGuardedIntCounterVec,
252 pub refresh_cron_job_miss_cnt: LabelGuardedIntCounterVec,
253}
254
255pub static GLOBAL_META_METRICS: LazyLock<MetaMetrics> =
256 LazyLock::new(|| MetaMetrics::new(&GLOBAL_METRICS_REGISTRY));
257
258impl MetaMetrics {
259 fn new(registry: &Registry) -> Self {
260 let opts = histogram_opts!(
261 "meta_grpc_duration_seconds",
262 "gRPC latency of meta services",
263 exponential_buckets(0.0001, 2.0, 20).unwrap() );
265 let grpc_latency =
266 register_histogram_vec_with_registry!(opts, &["path"], registry).unwrap();
267
268 let opts = histogram_opts!(
269 "meta_barrier_duration_seconds",
270 "barrier latency",
271 exponential_buckets(0.1, 1.5, 20).unwrap() );
273 let barrier_latency =
274 register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
275 .unwrap();
276
277 let opts = histogram_opts!(
278 "meta_barrier_wait_commit_duration_seconds",
279 "barrier_wait_commit_latency",
280 exponential_buckets(0.1, 1.5, 20).unwrap() );
282 let barrier_wait_commit_latency =
283 register_histogram_with_registry!(opts, registry).unwrap();
284
285 let opts = histogram_opts!(
286 "meta_barrier_send_duration_seconds",
287 "barrier send latency",
288 exponential_buckets(0.1, 1.5, 19).unwrap() );
290 let barrier_send_latency =
291 register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
292 .unwrap();
293 let barrier_interval_by_database = register_gauge_vec_with_registry!(
294 "meta_barrier_interval_by_database",
295 "barrier interval of each database",
296 &["database_id"],
297 registry
298 )
299 .unwrap();
300
301 let all_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
302 "all_barrier_nums",
303 "num of of all_barrier",
304 &["database_id"],
305 registry
306 )
307 .unwrap();
308 let in_flight_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
309 "in_flight_barrier_nums",
310 "num of of in_flight_barrier",
311 &["database_id"],
312 registry
313 )
314 .unwrap();
315 let last_committed_barrier_time = register_int_gauge_vec_with_registry!(
316 "last_committed_barrier_time",
317 "The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.",
318 &["database_id"],
319 registry
320 )
321 .unwrap();
322
323 let opts = histogram_opts!(
325 "meta_snapshot_backfill_barrier_duration_seconds",
326 "snapshot backfill barrier latency",
327 exponential_buckets(0.1, 1.5, 20).unwrap() );
329 let snapshot_backfill_barrier_latency = register_guarded_histogram_vec_with_registry!(
330 opts,
331 &["table_id", "barrier_type"],
332 registry
333 )
334 .unwrap();
335 let opts = histogram_opts!(
336 "meta_snapshot_backfill_barrier_wait_commit_duration_seconds",
337 "snapshot backfill barrier_wait_commit_latency",
338 exponential_buckets(0.1, 1.5, 20).unwrap() );
340 let snapshot_backfill_wait_commit_latency =
341 register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
342
343 let snapshot_backfill_lag = register_guarded_int_gauge_vec_with_registry!(
344 "meta_snapshot_backfill_upstream_lag",
345 "snapshot backfill upstream_lag",
346 &["table_id"],
347 registry
348 )
349 .unwrap();
350 let snapshot_backfill_inflight_barrier_num = register_guarded_int_gauge_vec_with_registry!(
351 "meta_snapshot_backfill_inflight_barrier_num",
352 "snapshot backfill inflight_barrier_num",
353 &["table_id"],
354 registry
355 )
356 .unwrap();
357
358 let max_committed_epoch = register_int_gauge_with_registry!(
359 "storage_max_committed_epoch",
360 "max committed epoch",
361 registry
362 )
363 .unwrap();
364
365 let min_committed_epoch = register_int_gauge_with_registry!(
366 "storage_min_committed_epoch",
367 "min committed epoch",
368 registry
369 )
370 .unwrap();
371
372 let level_sst_num = register_int_gauge_vec_with_registry!(
373 "storage_level_sst_num",
374 "num of SSTs in each level",
375 &["level_index"],
376 registry
377 )
378 .unwrap();
379
380 let level_compact_cnt = register_int_gauge_vec_with_registry!(
381 "storage_level_compact_cnt",
382 "num of SSTs to be merged to next level in each level",
383 &["level_index"],
384 registry
385 )
386 .unwrap();
387
388 let compact_frequency = register_int_counter_vec_with_registry!(
389 "storage_level_compact_frequency",
390 "The number of compactions from one level to another level that have completed or failed.",
391 &["compactor", "group", "task_type", "result"],
392 registry
393 )
394 .unwrap();
395 let compact_skip_frequency = register_int_counter_vec_with_registry!(
396 "storage_skip_compact_frequency",
397 "The number of compactions from one level to another level that have been skipped.",
398 &["level", "type"],
399 registry
400 )
401 .unwrap();
402
403 let version_size =
404 register_int_gauge_with_registry!("storage_version_size", "version size", registry)
405 .unwrap();
406
407 let current_version_id = register_int_gauge_with_registry!(
408 "storage_current_version_id",
409 "current version id",
410 registry
411 )
412 .unwrap();
413
414 let checkpoint_version_id = register_int_gauge_with_registry!(
415 "storage_checkpoint_version_id",
416 "checkpoint version id",
417 registry
418 )
419 .unwrap();
420
421 let min_pinned_version_id = register_int_gauge_with_registry!(
422 "storage_min_pinned_version_id",
423 "min pinned version id",
424 registry
425 )
426 .unwrap();
427
428 let write_stop_compaction_groups = register_int_gauge_vec_with_registry!(
429 "storage_write_stop_compaction_groups",
430 "compaction groups of write stop state",
431 &["compaction_group_id"],
432 registry
433 )
434 .unwrap();
435
436 let full_gc_trigger_count = register_int_gauge_with_registry!(
437 "storage_full_gc_trigger_count",
438 "the number of attempts to trigger full GC",
439 registry
440 )
441 .unwrap();
442
443 let opts = histogram_opts!(
444 "storage_full_gc_candidate_object_count",
445 "the number of candidate object to delete after scanning object store",
446 exponential_buckets(1.0, 10.0, 6).unwrap()
447 );
448 let full_gc_candidate_object_count =
449 register_histogram_with_registry!(opts, registry).unwrap();
450
451 let opts = histogram_opts!(
452 "storage_full_gc_selected_object_count",
453 "the number of object to delete after filtering by meta node",
454 exponential_buckets(1.0, 10.0, 6).unwrap()
455 );
456 let full_gc_selected_object_count =
457 register_histogram_with_registry!(opts, registry).unwrap();
458
459 let min_safepoint_version_id = register_int_gauge_with_registry!(
460 "storage_min_safepoint_version_id",
461 "min safepoint version id",
462 registry
463 )
464 .unwrap();
465
466 let level_file_size = register_int_gauge_vec_with_registry!(
467 "storage_level_total_file_size",
468 "KBs total file bytes in each level",
469 &["level_index"],
470 registry
471 )
472 .unwrap();
473
474 let version_stats = register_int_gauge_vec_with_registry!(
475 "storage_version_stats",
476 "per table stats in current hummock version",
477 &["table_id", "metric"],
478 registry
479 )
480 .unwrap();
481
482 let materialized_view_stats = register_int_gauge_vec_with_registry!(
483 "storage_materialized_view_stats",
484 "per materialized view stats in current hummock version",
485 &["table_id", "metric"],
486 registry
487 )
488 .unwrap();
489
490 let stale_object_count = register_int_gauge_with_registry!(
491 "storage_stale_object_count",
492 "total number of objects that is no longer referenced by versions.",
493 registry
494 )
495 .unwrap();
496
497 let stale_object_size = register_int_gauge_with_registry!(
498 "storage_stale_object_size",
499 "total size of objects that is no longer referenced by versions.",
500 registry
501 )
502 .unwrap();
503
504 let old_version_object_count = register_int_gauge_with_registry!(
505 "storage_old_version_object_count",
506 "total number of objects that is still referenced by non-current versions",
507 registry
508 )
509 .unwrap();
510
511 let old_version_object_size = register_int_gauge_with_registry!(
512 "storage_old_version_object_size",
513 "total size of objects that is still referenced by non-current versions",
514 registry
515 )
516 .unwrap();
517
518 let current_version_object_count = register_int_gauge_with_registry!(
519 "storage_current_version_object_count",
520 "total number of objects that is referenced by current version",
521 registry
522 )
523 .unwrap();
524
525 let current_version_object_size = register_int_gauge_with_registry!(
526 "storage_current_version_object_size",
527 "total size of objects that is referenced by current version",
528 registry
529 )
530 .unwrap();
531
532 let total_object_count = register_int_gauge_with_registry!(
533 "storage_total_object_count",
534 "Total number of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
535 registry
536 ).unwrap();
537
538 let total_object_size = register_int_gauge_with_registry!(
539 "storage_total_object_size",
540 "Total size of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
541 registry
542 ).unwrap();
543
544 let table_change_log_object_count = register_int_gauge_vec_with_registry!(
545 "storage_table_change_log_object_count",
546 "per table change log object count",
547 &["table_id"],
548 registry
549 )
550 .unwrap();
551
552 let table_change_log_object_size = register_int_gauge_vec_with_registry!(
553 "storage_table_change_log_object_size",
554 "per table change log object size",
555 &["table_id"],
556 registry
557 )
558 .unwrap();
559
560 let time_travel_object_count = register_int_gauge_with_registry!(
561 "storage_time_travel_object_count",
562 "total number of objects that is referenced by time travel.",
563 registry
564 )
565 .unwrap();
566
567 let delta_log_count = register_int_gauge_with_registry!(
568 "storage_delta_log_count",
569 "total number of hummock version delta log",
570 registry
571 )
572 .unwrap();
573
574 let opts = histogram_opts!(
575 "storage_version_checkpoint_latency",
576 "hummock version checkpoint latency",
577 exponential_buckets(0.1, 1.5, 20).unwrap()
578 );
579 let version_checkpoint_latency = register_histogram_with_registry!(opts, registry).unwrap();
580
581 let hummock_manager_lock_time = register_histogram_vec_with_registry!(
582 "hummock_manager_lock_time",
583 "latency for hummock manager to acquire the rwlock",
584 &["lock_name", "lock_type"],
585 registry
586 )
587 .unwrap();
588
589 let hummock_manager_real_process_time = register_histogram_vec_with_registry!(
590 "meta_hummock_manager_real_process_time",
591 "latency for hummock manager to really process the request",
592 &["method"],
593 registry
594 )
595 .unwrap();
596
597 let worker_num = register_int_gauge_vec_with_registry!(
598 "worker_num",
599 "number of nodes in the cluster",
600 &["worker_type"],
601 registry,
602 )
603 .unwrap();
604
605 let meta_type = register_int_gauge_vec_with_registry!(
606 "meta_num",
607 "role of meta nodes in the cluster",
608 &["worker_addr", "role"],
609 registry,
610 )
611 .unwrap();
612
613 let compact_pending_bytes = register_int_gauge_vec_with_registry!(
614 "storage_compact_pending_bytes",
615 "bytes of lsm tree needed to reach balance",
616 &["group"],
617 registry
618 )
619 .unwrap();
620
621 let compact_level_compression_ratio = register_gauge_vec_with_registry!(
622 "storage_compact_level_compression_ratio",
623 "compression ratio of each level of the lsm tree",
624 &["group", "level", "algorithm"],
625 registry
626 )
627 .unwrap();
628
629 let level_compact_task_cnt = register_int_gauge_vec_with_registry!(
630 "storage_level_compact_task_cnt",
631 "num of compact_task organized by group and level",
632 &["task"],
633 registry
634 )
635 .unwrap();
636
637 let time_travel_vacuum_metadata_latency = register_histogram_with_registry!(
638 histogram_opts!(
639 "storage_time_travel_vacuum_metadata_latency",
640 "Latency of vacuuming metadata for time travel",
641 exponential_buckets(0.1, 1.5, 20).unwrap()
642 ),
643 registry
644 )
645 .unwrap();
646 let time_travel_write_metadata_latency = register_histogram_with_registry!(
647 histogram_opts!(
648 "storage_time_travel_write_metadata_latency",
649 "Latency of writing metadata for time travel",
650 exponential_buckets(0.1, 1.5, 20).unwrap()
651 ),
652 registry
653 )
654 .unwrap();
655
656 let object_store_metric = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
657
658 let recovery_failure_cnt = register_int_counter_vec_with_registry!(
659 "recovery_failure_cnt",
660 "Number of failed recovery attempts",
661 &["recovery_type"],
662 registry
663 )
664 .unwrap();
665 let opts = histogram_opts!(
666 "recovery_latency",
667 "Latency of the recovery process",
668 exponential_buckets(0.1, 1.5, 20).unwrap() );
670 let recovery_latency =
671 register_histogram_vec_with_registry!(opts, &["recovery_type"], registry).unwrap();
672
673 let auto_schema_change_failure_cnt = register_guarded_int_counter_vec_with_registry!(
674 "auto_schema_change_failure_cnt",
675 "Number of failed auto schema change",
676 &["table_id", "table_name"],
677 registry
678 )
679 .unwrap();
680
681 let auto_schema_change_success_cnt = register_guarded_int_counter_vec_with_registry!(
682 "auto_schema_change_success_cnt",
683 "Number of success auto schema change",
684 &["table_id", "table_name"],
685 registry
686 )
687 .unwrap();
688
689 let opts = histogram_opts!(
690 "auto_schema_change_latency",
691 "Latency of the auto schema change process",
692 exponential_buckets(0.1, 1.5, 20).unwrap() );
694 let auto_schema_change_latency = register_guarded_histogram_vec_with_registry!(
695 opts,
696 &["table_id", "table_name"],
697 registry
698 )
699 .unwrap();
700
701 let source_is_up = register_guarded_int_gauge_vec_with_registry!(
702 "source_status_is_up",
703 "source is up or not",
704 &["source_id", "source_name"],
705 registry
706 )
707 .unwrap();
708 let source_enumerator_metrics = Arc::new(SourceEnumeratorMetrics::default());
709
710 let actor_info = register_int_gauge_vec_with_registry!(
711 "actor_info",
712 "Mapping from actor id to (fragment id, compute node)",
713 &["actor_id", "fragment_id", "compute_node"],
714 registry
715 )
716 .unwrap();
717
718 let table_info = register_int_gauge_vec_with_registry!(
719 "table_info",
720 "Mapping from table id to (actor id, table name)",
721 &[
722 "materialized_view_id",
723 "table_id",
724 "fragment_id",
725 "table_name",
726 "table_type",
727 "compaction_group_id"
728 ],
729 registry
730 )
731 .unwrap();
732
733 let sink_info = register_int_gauge_vec_with_registry!(
734 "sink_info",
735 "Mapping from actor id to (actor id, sink name)",
736 &["actor_id", "sink_id", "sink_name",],
737 registry
738 )
739 .unwrap();
740
741 let relation_info = register_int_gauge_vec_with_registry!(
742 "relation_info",
743 "Information of the database relation (table/source/sink/materialized view/index/internal)",
744 &["id", "database", "schema", "name", "resource_group", "type"],
745 registry
746 )
747 .unwrap();
748
749 let backfill_fragment_progress = register_int_gauge_vec_with_registry!(
750 "backfill_fragment_progress",
751 "Backfill progress per fragment",
752 &[
753 "job_id",
754 "fragment_id",
755 "backfill_state_table_id",
756 "backfill_target_relation_id",
757 "backfill_target_relation_name",
758 "backfill_target_relation_type",
759 "backfill_type",
760 "backfill_epoch",
761 "upstream_type",
762 "backfill_progress",
763 ],
764 registry
765 )
766 .unwrap();
767
768 let l0_compact_level_count = register_histogram_vec_with_registry!(
769 "storage_l0_compact_level_count",
770 "level_count of l0 compact task",
771 &["group", "type"],
772 registry
773 )
774 .unwrap();
775
776 let system_param_info = register_int_gauge_vec_with_registry!(
778 "system_param_info",
779 "Information of system parameters",
780 &["name", "value"],
781 registry
782 )
783 .unwrap();
784
785 let opts = histogram_opts!(
786 "storage_compact_task_size",
787 "Total size of compact that have been issued to state store",
788 exponential_buckets(1048576.0, 2.0, 16).unwrap()
789 );
790
791 let compact_task_size =
792 register_histogram_vec_with_registry!(opts, &["group", "type"], registry).unwrap();
793
794 let compact_task_file_count = register_histogram_vec_with_registry!(
795 "storage_compact_task_file_count",
796 "file count of compact task",
797 &["group", "type"],
798 registry
799 )
800 .unwrap();
801 let opts = histogram_opts!(
802 "storage_compact_task_batch_count",
803 "count of compact task batch",
804 exponential_buckets(1.0, 2.0, 8).unwrap()
805 );
806 let compact_task_batch_count =
807 register_histogram_vec_with_registry!(opts, &["type"], registry).unwrap();
808
809 let table_write_throughput = register_int_counter_vec_with_registry!(
810 "storage_commit_write_throughput",
811 "The number of compactions from one level to another level that have been skipped.",
812 &["table_id"],
813 registry
814 )
815 .unwrap();
816
817 let split_compaction_group_count = register_int_counter_vec_with_registry!(
818 "storage_split_compaction_group_count",
819 "Count of trigger split compaction group",
820 &["group"],
821 registry
822 )
823 .unwrap();
824
825 let state_table_count = register_int_gauge_vec_with_registry!(
826 "storage_state_table_count",
827 "Count of stable table per compaction group",
828 &["group"],
829 registry
830 )
831 .unwrap();
832
833 let branched_sst_count = register_int_gauge_vec_with_registry!(
834 "storage_branched_sst_count",
835 "Count of branched sst per compaction group",
836 &["group"],
837 registry
838 )
839 .unwrap();
840
841 let opts = histogram_opts!(
842 "storage_compaction_event_consumed_latency",
843 "The latency(ms) of each event being consumed",
844 exponential_buckets(1.0, 1.5, 30).unwrap() );
846 let compaction_event_consumed_latency =
847 register_histogram_with_registry!(opts, registry).unwrap();
848
849 let opts = histogram_opts!(
850 "storage_compaction_event_loop_iteration_latency",
851 "The latency(ms) of each iteration of the compaction event loop",
852 exponential_buckets(1.0, 1.5, 30).unwrap() );
854 let compaction_event_loop_iteration_latency =
855 register_histogram_with_registry!(opts, registry).unwrap();
856
857 let merge_compaction_group_count = register_int_counter_vec_with_registry!(
858 "storage_merge_compaction_group_count",
859 "Count of trigger merge compaction group",
860 &["group"],
861 registry
862 )
863 .unwrap();
864
865 let opts = histogram_opts!(
866 "storage_time_travel_version_replay_latency",
867 "The latency(ms) of replaying a hummock version for time travel",
868 exponential_buckets(0.01, 10.0, 6).unwrap()
869 );
870 let time_travel_version_replay_latency =
871 register_histogram_with_registry!(opts, registry).unwrap();
872
873 let compaction_group_count = register_int_gauge_with_registry!(
874 "storage_compaction_group_count",
875 "The number of compaction groups",
876 registry,
877 )
878 .unwrap();
879
880 let compaction_group_size = register_int_gauge_vec_with_registry!(
881 "storage_compaction_group_size",
882 "The size of compaction group",
883 &["group"],
884 registry
885 )
886 .unwrap();
887
888 let compaction_group_file_count = register_int_gauge_vec_with_registry!(
889 "storage_compaction_group_file_count",
890 "The file count of compaction group",
891 &["group"],
892 registry
893 )
894 .unwrap();
895
896 let compaction_group_throughput = register_int_gauge_vec_with_registry!(
897 "storage_compaction_group_throughput",
898 "The throughput of compaction group",
899 &["group"],
900 registry
901 )
902 .unwrap();
903
904 let opts = histogram_opts!(
905 "storage_compact_task_trivial_move_sst_count",
906 "sst count of compact trivial-move task",
907 exponential_buckets(1.0, 2.0, 8).unwrap()
908 );
909 let compact_task_trivial_move_sst_count =
910 register_histogram_vec_with_registry!(opts, &["group"], registry).unwrap();
911
912 let refresh_job_duration = register_guarded_uint_gauge_vec_with_registry!(
913 "meta_refresh_job_duration",
914 "The duration of refresh job",
915 &["table_id", "status"],
916 registry
917 )
918 .unwrap();
919 let refresh_job_finish_cnt = register_guarded_int_counter_vec_with_registry!(
920 "meta_refresh_job_finish_cnt",
921 "The number of finished refresh jobs",
922 &["table_id", "status"],
923 registry
924 )
925 .unwrap();
926 let refresh_cron_job_trigger_cnt = register_guarded_int_counter_vec_with_registry!(
927 "meta_refresh_cron_job_trigger_cnt",
928 "The number of cron refresh jobs triggered",
929 &["table_id"],
930 registry
931 )
932 .unwrap();
933 let refresh_cron_job_miss_cnt = register_guarded_int_counter_vec_with_registry!(
934 "meta_refresh_cron_job_miss_cnt",
935 "The number of cron refresh jobs missed",
936 &["table_id"],
937 registry
938 )
939 .unwrap();
940
941 Self {
942 grpc_latency,
943 barrier_latency,
944 barrier_wait_commit_latency,
945 barrier_send_latency,
946 all_barrier_nums,
947 in_flight_barrier_nums,
948 last_committed_barrier_time,
949 barrier_interval_by_database,
950 snapshot_backfill_barrier_latency,
951 snapshot_backfill_wait_commit_latency,
952 snapshot_backfill_lag,
953 snapshot_backfill_inflight_barrier_num,
954 recovery_failure_cnt,
955 recovery_latency,
956
957 max_committed_epoch,
958 min_committed_epoch,
959 level_sst_num,
960 level_compact_cnt,
961 compact_frequency,
962 compact_skip_frequency,
963 level_file_size,
964 version_size,
965 version_stats,
966 materialized_view_stats,
967 stale_object_count,
968 stale_object_size,
969 old_version_object_count,
970 old_version_object_size,
971 time_travel_object_count,
972 current_version_object_count,
973 current_version_object_size,
974 total_object_count,
975 total_object_size,
976 table_change_log_object_count,
977 table_change_log_object_size,
978 delta_log_count,
979 version_checkpoint_latency,
980 current_version_id,
981 checkpoint_version_id,
982 min_pinned_version_id,
983 min_safepoint_version_id,
984 write_stop_compaction_groups,
985 full_gc_trigger_count,
986 full_gc_candidate_object_count,
987 full_gc_selected_object_count,
988 hummock_manager_lock_time,
989 hummock_manager_real_process_time,
990 time_after_last_observation: Arc::new(AtomicU64::new(0)),
991 worker_num,
992 meta_type,
993 compact_pending_bytes,
994 compact_level_compression_ratio,
995 level_compact_task_cnt,
996 object_store_metric,
997 source_is_up,
998 source_enumerator_metrics,
999 actor_info,
1000 table_info,
1001 sink_info,
1002 relation_info,
1003 backfill_fragment_progress,
1004 system_param_info,
1005 l0_compact_level_count,
1006 compact_task_size,
1007 compact_task_file_count,
1008 compact_task_batch_count,
1009 compact_task_trivial_move_sst_count,
1010 table_write_throughput,
1011 split_compaction_group_count,
1012 state_table_count,
1013 branched_sst_count,
1014 compaction_event_consumed_latency,
1015 compaction_event_loop_iteration_latency,
1016 auto_schema_change_failure_cnt,
1017 auto_schema_change_success_cnt,
1018 auto_schema_change_latency,
1019 merge_compaction_group_count,
1020 time_travel_version_replay_latency,
1021 compaction_group_count,
1022 compaction_group_size,
1023 compaction_group_file_count,
1024 compaction_group_throughput,
1025 refresh_job_duration,
1026 refresh_job_finish_cnt,
1027 refresh_cron_job_trigger_cnt,
1028 refresh_cron_job_miss_cnt,
1029 time_travel_vacuum_metadata_latency,
1030 time_travel_write_metadata_latency,
1031 }
1032 }
1033
1034 #[cfg(test)]
1035 pub fn for_test(registry: &Registry) -> Self {
1036 Self::new(registry)
1037 }
1038}
1039impl Default for MetaMetrics {
1040 fn default() -> Self {
1041 GLOBAL_META_METRICS.clone()
1042 }
1043}
1044
1045pub async fn refresh_system_param_info_metrics(
1047 system_params_controller: &SystemParamsControllerRef,
1048 meta_metrics: Arc<MetaMetrics>,
1049) {
1050 let params_info = system_params_controller.get_params().await.get_all();
1051
1052 meta_metrics.system_param_info.reset();
1053 for info in params_info {
1054 meta_metrics
1055 .system_param_info
1056 .with_label_values(&[info.name, &info.value])
1057 .set(1);
1058 }
1059}
1060
1061pub fn start_worker_info_monitor(
1062 metadata_manager: MetadataManager,
1063 election_client: ElectionClientRef,
1064 interval: Duration,
1065 meta_metrics: Arc<MetaMetrics>,
1066) -> (JoinHandle<()>, Sender<()>) {
1067 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1068 let join_handle = tokio::spawn(async move {
1069 let mut monitor_interval = tokio::time::interval(interval);
1070 monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1071 loop {
1072 tokio::select! {
1073 _ = monitor_interval.tick() => {},
1075 _ = &mut shutdown_rx => {
1077 tracing::info!("Worker number monitor is stopped");
1078 return;
1079 }
1080 }
1081
1082 let node_map = match metadata_manager.count_worker_node().await {
1083 Ok(node_map) => node_map,
1084 Err(err) => {
1085 tracing::warn!(error = %err.as_report(), "fail to count worker node");
1086 continue;
1087 }
1088 };
1089
1090 meta_metrics.worker_num.reset();
1092 meta_metrics.meta_type.reset();
1093
1094 for (worker_type, worker_num) in node_map {
1095 meta_metrics
1096 .worker_num
1097 .with_label_values(&[(worker_type.as_str_name())])
1098 .set(worker_num as i64);
1099 }
1100 if let Ok(meta_members) = election_client.get_members().await {
1101 meta_metrics
1102 .worker_num
1103 .with_label_values(&[WorkerType::Meta.as_str_name()])
1104 .set(meta_members.len() as i64);
1105 meta_members.into_iter().for_each(|m| {
1106 let role = if m.is_leader { "leader" } else { "follower" };
1107 meta_metrics
1108 .meta_type
1109 .with_label_values(&[m.id.as_str(), role])
1110 .set(1);
1111 });
1112 }
1113 }
1114 });
1115
1116 (join_handle, shutdown_tx)
1117}
1118
1119pub async fn refresh_fragment_info_metrics(
1120 catalog_controller: &CatalogControllerRef,
1121 cluster_controller: &ClusterControllerRef,
1122 hummock_manager: &HummockManagerRef,
1123 meta_metrics: Arc<MetaMetrics>,
1124) {
1125 let worker_nodes = match cluster_controller
1126 .list_workers(Some(WorkerType::ComputeNode.into()), None)
1127 .await
1128 {
1129 Ok(worker_nodes) => worker_nodes,
1130 Err(err) => {
1131 tracing::warn!(error=%err.as_report(), "fail to list worker node");
1132 return;
1133 }
1134 };
1135 let actor_locations = match catalog_controller.list_actor_locations() {
1136 Ok(actor_locations) => actor_locations,
1137 Err(err) => {
1138 tracing::warn!(error=%err.as_report(), "fail to get actor locations");
1139 return;
1140 }
1141 };
1142 let sink_actor_mapping = match catalog_controller.list_sink_actor_mapping().await {
1143 Ok(sink_actor_mapping) => sink_actor_mapping,
1144 Err(err) => {
1145 tracing::warn!(error=%err.as_report(), "fail to get sink actor mapping");
1146 return;
1147 }
1148 };
1149 let fragment_state_tables = match catalog_controller.list_fragment_state_tables().await {
1150 Ok(fragment_state_tables) => fragment_state_tables,
1151 Err(err) => {
1152 tracing::warn!(error=%err.as_report(), "fail to get fragment state tables");
1153 return;
1154 }
1155 };
1156 let table_name_and_type_mapping = match catalog_controller.get_table_name_type_mapping().await {
1157 Ok(mapping) => mapping,
1158 Err(err) => {
1159 tracing::warn!(error=%err.as_report(), "fail to get table name mapping");
1160 return;
1161 }
1162 };
1163
1164 let worker_addr_mapping: HashMap<WorkerId, String> = worker_nodes
1165 .into_iter()
1166 .map(|worker_node| {
1167 let addr = match worker_node.host {
1168 Some(host) => format!("{}:{}", host.host, host.port),
1169 None => "".to_owned(),
1170 };
1171 (worker_node.id, addr)
1172 })
1173 .collect();
1174 let table_compaction_group_id_mapping = hummock_manager
1175 .get_table_compaction_group_id_mapping()
1176 .await;
1177
1178 meta_metrics.actor_info.reset();
1181 meta_metrics.table_info.reset();
1182 meta_metrics.sink_info.reset();
1183 for actor_location in actor_locations {
1184 let actor_id_str = actor_location.actor_id.to_string();
1185 let fragment_id_str = actor_location.fragment_id.to_string();
1186 if let Some(address) = worker_addr_mapping.get(&actor_location.worker_id) {
1189 meta_metrics
1190 .actor_info
1191 .with_label_values(&[&actor_id_str, &fragment_id_str, address])
1192 .set(1);
1193 }
1194 }
1195 for (sink_id, (sink_name, actor_ids)) in sink_actor_mapping {
1196 let sink_id_str = sink_id.to_string();
1197 for actor_id in actor_ids {
1198 let actor_id_str = actor_id.to_string();
1199 meta_metrics
1200 .sink_info
1201 .with_label_values(&[&actor_id_str, &sink_id_str, &sink_name])
1202 .set(1);
1203 }
1204 }
1205 for PartialFragmentStateTables {
1206 fragment_id,
1207 job_id,
1208 state_table_ids,
1209 } in fragment_state_tables
1210 {
1211 let fragment_id_str = fragment_id.to_string();
1212 let job_id_str = job_id.to_string();
1213 for table_id in state_table_ids.into_inner() {
1214 let table_id_str = table_id.to_string();
1215 let (table_name, table_type) = table_name_and_type_mapping
1216 .get(&table_id)
1217 .cloned()
1218 .unwrap_or_else(|| ("unknown".to_owned(), "unknown".to_owned()));
1219 let compaction_group_id = table_compaction_group_id_mapping
1220 .get(&table_id)
1221 .map(|cg_id| cg_id.to_string())
1222 .unwrap_or_else(|| "unknown".to_owned());
1223 meta_metrics
1224 .table_info
1225 .with_label_values(&[
1226 &job_id_str,
1227 &table_id_str,
1228 &fragment_id_str,
1229 &table_name,
1230 &table_type,
1231 &compaction_group_id,
1232 ])
1233 .set(1);
1234 }
1235 }
1236}
1237
1238pub async fn refresh_relation_info_metrics(
1239 catalog_controller: &CatalogControllerRef,
1240 meta_metrics: Arc<MetaMetrics>,
1241) {
1242 let table_objects = match catalog_controller.list_table_objects().await {
1243 Ok(table_objects) => table_objects,
1244 Err(err) => {
1245 tracing::warn!(error=%err.as_report(), "fail to get table objects");
1246 return;
1247 }
1248 };
1249
1250 let source_objects = match catalog_controller.list_source_objects().await {
1251 Ok(source_objects) => source_objects,
1252 Err(err) => {
1253 tracing::warn!(error=%err.as_report(), "fail to get source objects");
1254 return;
1255 }
1256 };
1257
1258 let sink_objects = match catalog_controller.list_sink_objects().await {
1259 Ok(sink_objects) => sink_objects,
1260 Err(err) => {
1261 tracing::warn!(error=%err.as_report(), "fail to get sink objects");
1262 return;
1263 }
1264 };
1265
1266 meta_metrics.relation_info.reset();
1267
1268 for (id, db, schema, name, resource_group, table_type) in table_objects {
1269 let relation_type = match table_type {
1270 TableType::Table => "table",
1271 TableType::MaterializedView => "materialized_view",
1272 TableType::Index | TableType::VectorIndex => "index",
1273 TableType::Internal => "internal",
1274 };
1275 meta_metrics
1276 .relation_info
1277 .with_label_values(&[
1278 &id.to_string(),
1279 &db,
1280 &schema,
1281 &name,
1282 &resource_group,
1283 &relation_type.to_owned(),
1284 ])
1285 .set(1);
1286 }
1287
1288 for (id, db, schema, name, resource_group) in source_objects {
1289 meta_metrics
1290 .relation_info
1291 .with_label_values(&[
1292 &id.to_string(),
1293 &db,
1294 &schema,
1295 &name,
1296 &resource_group,
1297 &"source".to_owned(),
1298 ])
1299 .set(1);
1300 }
1301
1302 for (id, db, schema, name, resource_group) in sink_objects {
1303 meta_metrics
1304 .relation_info
1305 .with_label_values(&[
1306 &id.to_string(),
1307 &db,
1308 &schema,
1309 &name,
1310 &resource_group,
1311 &"sink".to_owned(),
1312 ])
1313 .set(1);
1314 }
1315}
1316
1317fn extract_backfill_fragment_info(
1318 distribution: &FragmentDistribution,
1319) -> Option<BackfillFragmentInfo> {
1320 let backfill_type =
1321 if distribution.fragment_type_mask & FragmentTypeFlag::SourceScan as u32 != 0 {
1322 "SOURCE"
1323 } else if distribution.fragment_type_mask
1324 & (FragmentTypeFlag::SnapshotBackfillStreamScan as u32
1325 | FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32)
1326 != 0
1327 {
1328 "SNAPSHOT_BACKFILL"
1329 } else if distribution.fragment_type_mask & FragmentTypeFlag::StreamScan as u32 != 0 {
1330 "ARRANGEMENT_OR_NO_SHUFFLE"
1331 } else {
1332 return None;
1333 };
1334
1335 let stream_node = distribution.node.as_ref()?;
1336 let mut info = None;
1337 match backfill_type {
1338 "SOURCE" => {
1339 visit_stream_node_source_backfill(stream_node, |node| {
1340 info = Some(BackfillFragmentInfo {
1341 job_id: distribution.table_id.as_raw_id(),
1342 fragment_id: distribution.fragment_id.as_raw_id(),
1343 backfill_state_table_id: node
1344 .state_table
1345 .as_ref()
1346 .map(|table| table.id.as_raw_id())
1347 .unwrap_or_default(),
1348 backfill_target_relation_id: node.upstream_source_id.as_raw_id(),
1349 backfill_type,
1350 backfill_epoch: 0,
1351 });
1352 });
1353 }
1354 "SNAPSHOT_BACKFILL" | "ARRANGEMENT_OR_NO_SHUFFLE" => {
1355 visit_stream_node_stream_scan(stream_node, |node| {
1356 info = Some(BackfillFragmentInfo {
1357 job_id: distribution.table_id.as_raw_id(),
1358 fragment_id: distribution.fragment_id.as_raw_id(),
1359 backfill_state_table_id: node
1360 .state_table
1361 .as_ref()
1362 .map(|table| table.id.as_raw_id())
1363 .unwrap_or_default(),
1364 backfill_target_relation_id: node.table_id.as_raw_id(),
1365 backfill_type,
1366 backfill_epoch: node.snapshot_backfill_epoch.unwrap_or_default(),
1367 });
1368 });
1369 }
1370 _ => {}
1371 }
1372
1373 info
1374}
1375
1376pub async fn refresh_backfill_progress_metrics(
1377 catalog_controller: &CatalogControllerRef,
1378 hummock_manager: &HummockManagerRef,
1379 barrier_manager: &BarrierManagerRef,
1380 meta_metrics: Arc<MetaMetrics>,
1381) {
1382 let fragment_descs = match catalog_controller.list_fragment_descs_with_node(true).await {
1383 Ok(fragment_descs) => fragment_descs,
1384 Err(err) => {
1385 tracing::warn!(error=%err.as_report(), "fail to list creating fragment descs");
1386 return;
1387 }
1388 };
1389
1390 let backfill_infos: HashMap<(u32, u32), BackfillFragmentInfo> = fragment_descs
1391 .iter()
1392 .filter_map(|(distribution, _)| extract_backfill_fragment_info(distribution))
1393 .map(|info| ((info.job_id, info.fragment_id), info))
1394 .collect();
1395
1396 let fragment_progresses = match barrier_manager.get_fragment_backfill_progress().await {
1397 Ok(progress) => progress,
1398 Err(err) => {
1399 tracing::warn!(error=%err.as_report(), "fail to get fragment backfill progress");
1400 return;
1401 }
1402 };
1403
1404 let progress_by_fragment: HashMap<(u32, u32), _> = fragment_progresses
1405 .into_iter()
1406 .map(|progress| {
1407 (
1408 (
1409 progress.job_id.as_raw_id(),
1410 progress.fragment_id.as_raw_id(),
1411 ),
1412 progress,
1413 )
1414 })
1415 .collect();
1416
1417 let version_stats = hummock_manager.get_version_stats().await;
1418
1419 let relation_ids: HashSet<_> = backfill_infos
1420 .values()
1421 .map(|info| ObjectId::new(info.backfill_target_relation_id))
1422 .collect();
1423 let relation_objects = match catalog_controller
1424 .list_relation_objects_by_ids(&relation_ids)
1425 .await
1426 {
1427 Ok(relation_objects) => relation_objects,
1428 Err(err) => {
1429 tracing::warn!(error=%err.as_report(), "fail to get relation objects");
1430 return;
1431 }
1432 };
1433
1434 let mut relation_info = HashMap::new();
1435 for (id, db, schema, name, rel_type) in relation_objects {
1436 relation_info.insert(id.as_raw_id(), (db, schema, name, rel_type));
1437 }
1438
1439 meta_metrics.backfill_fragment_progress.reset();
1440
1441 for info in backfill_infos.values() {
1442 let progress = progress_by_fragment.get(&(info.job_id, info.fragment_id));
1443 let (db, schema, name, rel_type) = relation_info
1444 .get(&info.backfill_target_relation_id)
1445 .cloned()
1446 .unwrap_or_else(|| {
1447 (
1448 "unknown".to_owned(),
1449 "unknown".to_owned(),
1450 "unknown".to_owned(),
1451 "unknown".to_owned(),
1452 )
1453 });
1454
1455 let job_id_str = info.job_id.to_string();
1456 let fragment_id_str = info.fragment_id.to_string();
1457 let backfill_state_table_id_str = info.backfill_state_table_id.to_string();
1458 let backfill_target_relation_id_str = info.backfill_target_relation_id.to_string();
1459 let backfill_target_relation_name_str = format!("{db}.{schema}.{name}");
1460 let backfill_target_relation_type_str = rel_type;
1461 let backfill_type_str = info.backfill_type.to_owned();
1462 let backfill_epoch_str = info.backfill_epoch.to_string();
1463 let total_key_count = version_stats
1464 .table_stats
1465 .get(&TableId::new(info.backfill_target_relation_id))
1466 .map(|stats| stats.total_key_count as u64);
1467
1468 let progress_label = match (info.backfill_type, progress) {
1469 ("SOURCE", Some(progress)) => format!("{} consumed rows", progress.consumed_rows),
1470 ("SOURCE", None) => "0 consumed rows".to_owned(),
1471 (_, Some(progress)) if progress.done => {
1472 let total = total_key_count.unwrap_or(0);
1473 format!("100.0000% ({}/{})", total, total)
1474 }
1475 (_, Some(progress)) => {
1476 let total = total_key_count.unwrap_or(0);
1477 if total == 0 {
1478 "100.0000% (0/0)".to_owned()
1479 } else {
1480 let raw = (progress.consumed_rows as f64) / (total as f64) * 100.0;
1481 format!(
1482 "{:.4}% ({}/{})",
1483 raw.min(100.0),
1484 progress.consumed_rows,
1485 total
1486 )
1487 }
1488 }
1489 (_, None) => "0.0000% (0/0)".to_owned(),
1490 };
1491 let upstream_type_str = progress
1492 .map(|progress| progress.upstream_type.to_string())
1493 .unwrap_or_else(|| "Unknown".to_owned());
1494
1495 meta_metrics
1496 .backfill_fragment_progress
1497 .with_label_values(&[
1498 &job_id_str,
1499 &fragment_id_str,
1500 &backfill_state_table_id_str,
1501 &backfill_target_relation_id_str,
1502 &backfill_target_relation_name_str,
1503 &backfill_target_relation_type_str,
1504 &backfill_type_str,
1505 &backfill_epoch_str,
1506 &upstream_type_str,
1507 &progress_label,
1508 ])
1509 .set(1);
1510 }
1511}
1512
1513pub fn start_info_monitor(
1514 metadata_manager: MetadataManager,
1515 hummock_manager: HummockManagerRef,
1516 barrier_manager: BarrierManagerRef,
1517 system_params_controller: SystemParamsControllerRef,
1518 meta_metrics: Arc<MetaMetrics>,
1519) -> (JoinHandle<()>, Sender<()>) {
1520 const COLLECT_INTERVAL_SECONDS: u64 = 60;
1521
1522 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1523 let join_handle = tokio::spawn(async move {
1524 let mut monitor_interval =
1525 tokio::time::interval(Duration::from_secs(COLLECT_INTERVAL_SECONDS));
1526 monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1527 loop {
1528 tokio::select! {
1529 _ = monitor_interval.tick() => {},
1531 _ = &mut shutdown_rx => {
1533 tracing::info!("Meta info monitor is stopped");
1534 return;
1535 }
1536 }
1537
1538 refresh_fragment_info_metrics(
1540 &metadata_manager.catalog_controller,
1541 &metadata_manager.cluster_controller,
1542 &hummock_manager,
1543 meta_metrics.clone(),
1544 )
1545 .await;
1546
1547 refresh_relation_info_metrics(
1548 &metadata_manager.catalog_controller,
1549 meta_metrics.clone(),
1550 )
1551 .await;
1552
1553 refresh_backfill_progress_metrics(
1554 &metadata_manager.catalog_controller,
1555 &hummock_manager,
1556 &barrier_manager,
1557 meta_metrics.clone(),
1558 )
1559 .await;
1560
1561 refresh_system_param_info_metrics(&system_params_controller, meta_metrics.clone())
1563 .await;
1564 }
1565 });
1566
1567 (join_handle, shutdown_tx)
1568}