1use std::collections::{HashMap, HashSet};
16use std::sync::atomic::AtomicU64;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use prometheus::core::{AtomicF64, GenericGaugeVec};
21use prometheus::{
22 GaugeVec, Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
23 exponential_buckets, histogram_opts, register_gauge_vec_with_registry,
24 register_histogram_vec_with_registry, register_histogram_with_registry,
25 register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
26 register_int_gauge_with_registry,
27};
28use risingwave_common::catalog::{FragmentTypeFlag, TableId};
29use risingwave_common::metrics::{
30 LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
31 LabelGuardedUintGaugeVec,
32};
33use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
34use risingwave_common::system_param::reader::SystemParamsRead;
35use risingwave_common::util::stream_graph_visitor::{
36 visit_stream_node_source_backfill, visit_stream_node_stream_scan,
37};
38use risingwave_common::{
39 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
40 register_guarded_int_gauge_vec_with_registry, register_guarded_uint_gauge_vec_with_registry,
41};
42use risingwave_connector::source::monitor::EnumeratorMetrics as SourceEnumeratorMetrics;
43use risingwave_meta_model::table::TableType;
44use risingwave_meta_model::{ObjectId, WorkerId};
45use risingwave_object_store::object::object_metrics::{
46 GLOBAL_OBJECT_STORE_METRICS, ObjectStoreMetrics,
47};
48use risingwave_pb::common::WorkerType;
49use risingwave_pb::meta::FragmentDistribution;
50use thiserror_ext::AsReport;
51use tokio::sync::oneshot::Sender;
52use tokio::task::JoinHandle;
53
54use crate::barrier::BarrierManagerRef;
55use crate::controller::catalog::CatalogControllerRef;
56use crate::controller::cluster::ClusterControllerRef;
57use crate::controller::system_param::SystemParamsControllerRef;
58use crate::controller::utils::PartialFragmentStateTables;
59use crate::hummock::HummockManagerRef;
60use crate::manager::MetadataManager;
61use crate::rpc::ElectionClientRef;
62
63struct BackfillFragmentInfo {
64 job_id: u32,
65 fragment_id: u32,
66 backfill_state_table_id: u32,
67 backfill_target_relation_id: u32,
68 backfill_type: &'static str,
69 backfill_epoch: u64,
70}
71
72#[derive(Clone)]
73pub struct MetaMetrics {
74 pub worker_num: IntGaugeVec,
77 pub meta_type: IntGaugeVec,
79
80 pub grpc_latency: HistogramVec,
83
84 pub barrier_latency: LabelGuardedHistogramVec,
88 pub barrier_wait_commit_latency: Histogram,
90 pub barrier_send_latency: LabelGuardedHistogramVec,
92 pub all_barrier_nums: LabelGuardedIntGaugeVec,
95 pub in_flight_barrier_nums: LabelGuardedIntGaugeVec,
97 pub last_committed_barrier_time: IntGaugeVec,
99 pub barrier_interval_by_database: GaugeVec,
101
102 pub snapshot_backfill_barrier_latency: LabelGuardedHistogramVec, pub snapshot_backfill_wait_commit_latency: LabelGuardedHistogramVec, pub snapshot_backfill_lag: LabelGuardedIntGaugeVec, pub snapshot_backfill_inflight_barrier_num: LabelGuardedIntGaugeVec, pub recovery_failure_cnt: IntCounterVec,
114 pub recovery_latency: HistogramVec,
115
116 pub max_committed_epoch: IntGauge,
119 pub min_committed_epoch: IntGauge,
121 pub level_sst_num: IntGaugeVec,
123 pub level_compact_cnt: IntGaugeVec,
125 pub compact_frequency: IntCounterVec,
127 pub level_file_size: IntGaugeVec,
129 pub version_size: IntGauge,
131 pub current_version_id: IntGauge,
133 pub checkpoint_version_id: IntGauge,
135 pub min_pinned_version_id: IntGauge,
137 pub min_safepoint_version_id: IntGauge,
139 pub write_stop_compaction_groups: IntGaugeVec,
141 pub full_gc_trigger_count: IntGauge,
143 pub full_gc_candidate_object_count: Histogram,
145 pub full_gc_selected_object_count: Histogram,
147 pub version_stats: IntGaugeVec,
149 pub materialized_view_stats: IntGaugeVec,
151 pub stale_object_count: IntGauge,
153 pub stale_object_size: IntGauge,
155 pub old_version_object_count: IntGauge,
157 pub old_version_object_size: IntGauge,
159 pub time_travel_object_count: IntGauge,
161 pub current_version_object_count: IntGauge,
163 pub current_version_object_size: IntGauge,
165 pub total_object_count: IntGauge,
167 pub total_object_size: IntGauge,
169 pub table_change_log_object_count: IntGaugeVec,
171 pub table_change_log_object_size: IntGaugeVec,
173 pub table_change_log_min_epoch: IntGaugeVec,
175 pub delta_log_count: IntGauge,
177 pub version_checkpoint_latency: Histogram,
179 pub hummock_manager_lock_time: HistogramVec,
181 pub hummock_manager_real_process_time: HistogramVec,
183 pub compact_skip_frequency: IntCounterVec,
185 pub compact_pending_bytes: IntGaugeVec,
187 pub compact_level_compression_ratio: GenericGaugeVec<AtomicF64>,
189 pub level_compact_task_cnt: IntGaugeVec,
191 pub time_after_last_observation: Arc<AtomicU64>,
192 pub l0_compact_level_count: HistogramVec,
193 pub compact_task_size: HistogramVec,
194 pub compact_task_file_count: HistogramVec,
195 pub compact_task_batch_count: HistogramVec,
196 pub split_compaction_group_count: IntCounterVec,
197 pub state_table_count: IntGaugeVec,
198 pub branched_sst_count: IntGaugeVec,
199 pub compact_task_trivial_move_sst_count: HistogramVec,
200
201 pub compaction_event_consumed_latency: Histogram,
202 pub compaction_event_loop_iteration_latency: Histogram,
203 pub time_travel_vacuum_metadata_latency: Histogram,
204 pub time_travel_write_metadata_latency: Histogram,
205
206 pub object_store_metric: Arc<ObjectStoreMetrics>,
209
210 pub source_is_up: LabelGuardedIntGaugeVec,
213 pub source_enumerator_metrics: Arc<SourceEnumeratorMetrics>,
214
215 pub actor_info: IntGaugeVec,
218 pub table_info: IntGaugeVec,
220 pub sink_info: IntGaugeVec,
222 pub relation_info: IntGaugeVec,
224 pub backfill_fragment_progress: IntGaugeVec,
226
227 pub system_param_info: IntGaugeVec,
231
232 pub table_write_throughput: IntCounterVec,
234
235 pub merge_compaction_group_count: IntCounterVec,
237
238 pub auto_schema_change_failure_cnt: LabelGuardedIntCounterVec,
240 pub auto_schema_change_success_cnt: LabelGuardedIntCounterVec,
241 pub auto_schema_change_latency: LabelGuardedHistogramVec,
242
243 pub time_travel_version_replay_latency: Histogram,
244
245 pub compaction_group_count: IntGauge,
246 pub compaction_group_size: IntGaugeVec,
247 pub compaction_group_file_count: IntGaugeVec,
248 pub compaction_group_throughput: IntGaugeVec,
249
250 pub refresh_job_duration: LabelGuardedUintGaugeVec,
252 pub refresh_job_finish_cnt: LabelGuardedIntCounterVec,
253 pub refresh_cron_job_trigger_cnt: LabelGuardedIntCounterVec,
254 pub refresh_cron_job_miss_cnt: LabelGuardedIntCounterVec,
255}
256
257pub static GLOBAL_META_METRICS: LazyLock<MetaMetrics> =
258 LazyLock::new(|| MetaMetrics::new(&GLOBAL_METRICS_REGISTRY));
259
260impl MetaMetrics {
261 fn new(registry: &Registry) -> Self {
262 let opts = histogram_opts!(
263 "meta_grpc_duration_seconds",
264 "gRPC latency of meta services",
265 exponential_buckets(0.0001, 2.0, 20).unwrap() );
267 let grpc_latency =
268 register_histogram_vec_with_registry!(opts, &["path"], registry).unwrap();
269
270 let opts = histogram_opts!(
271 "meta_barrier_duration_seconds",
272 "barrier latency",
273 exponential_buckets(0.1, 1.5, 20).unwrap() );
275 let barrier_latency =
276 register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
277 .unwrap();
278
279 let opts = histogram_opts!(
280 "meta_barrier_wait_commit_duration_seconds",
281 "barrier_wait_commit_latency",
282 exponential_buckets(0.1, 1.5, 20).unwrap() );
284 let barrier_wait_commit_latency =
285 register_histogram_with_registry!(opts, registry).unwrap();
286
287 let opts = histogram_opts!(
288 "meta_barrier_send_duration_seconds",
289 "barrier send latency",
290 exponential_buckets(0.1, 1.5, 19).unwrap() );
292 let barrier_send_latency =
293 register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
294 .unwrap();
295 let barrier_interval_by_database = register_gauge_vec_with_registry!(
296 "meta_barrier_interval_by_database",
297 "barrier interval of each database",
298 &["database_id"],
299 registry
300 )
301 .unwrap();
302
303 let all_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
304 "all_barrier_nums",
305 "num of of all_barrier",
306 &["database_id"],
307 registry
308 )
309 .unwrap();
310 let in_flight_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
311 "in_flight_barrier_nums",
312 "num of of in_flight_barrier",
313 &["database_id"],
314 registry
315 )
316 .unwrap();
317 let last_committed_barrier_time = register_int_gauge_vec_with_registry!(
318 "last_committed_barrier_time",
319 "The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.",
320 &["database_id"],
321 registry
322 )
323 .unwrap();
324
325 let opts = histogram_opts!(
327 "meta_snapshot_backfill_barrier_duration_seconds",
328 "snapshot backfill barrier latency",
329 exponential_buckets(0.1, 1.5, 20).unwrap() );
331 let snapshot_backfill_barrier_latency = register_guarded_histogram_vec_with_registry!(
332 opts,
333 &["table_id", "barrier_type"],
334 registry
335 )
336 .unwrap();
337 let opts = histogram_opts!(
338 "meta_snapshot_backfill_barrier_wait_commit_duration_seconds",
339 "snapshot backfill barrier_wait_commit_latency",
340 exponential_buckets(0.1, 1.5, 20).unwrap() );
342 let snapshot_backfill_wait_commit_latency =
343 register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
344
345 let snapshot_backfill_lag = register_guarded_int_gauge_vec_with_registry!(
346 "meta_snapshot_backfill_upstream_lag",
347 "snapshot backfill upstream_lag",
348 &["table_id"],
349 registry
350 )
351 .unwrap();
352 let snapshot_backfill_inflight_barrier_num = register_guarded_int_gauge_vec_with_registry!(
353 "meta_snapshot_backfill_inflight_barrier_num",
354 "snapshot backfill inflight_barrier_num",
355 &["table_id"],
356 registry
357 )
358 .unwrap();
359
360 let max_committed_epoch = register_int_gauge_with_registry!(
361 "storage_max_committed_epoch",
362 "max committed epoch",
363 registry
364 )
365 .unwrap();
366
367 let min_committed_epoch = register_int_gauge_with_registry!(
368 "storage_min_committed_epoch",
369 "min committed epoch",
370 registry
371 )
372 .unwrap();
373
374 let level_sst_num = register_int_gauge_vec_with_registry!(
375 "storage_level_sst_num",
376 "num of SSTs in each level",
377 &["level_index"],
378 registry
379 )
380 .unwrap();
381
382 let level_compact_cnt = register_int_gauge_vec_with_registry!(
383 "storage_level_compact_cnt",
384 "num of SSTs to be merged to next level in each level",
385 &["level_index"],
386 registry
387 )
388 .unwrap();
389
390 let compact_frequency = register_int_counter_vec_with_registry!(
391 "storage_level_compact_frequency",
392 "The number of compactions from one level to another level that have completed or failed.",
393 &["compactor", "group", "task_type", "result"],
394 registry
395 )
396 .unwrap();
397 let compact_skip_frequency = register_int_counter_vec_with_registry!(
398 "storage_skip_compact_frequency",
399 "The number of compactions from one level to another level that have been skipped.",
400 &["level", "type"],
401 registry
402 )
403 .unwrap();
404
405 let version_size =
406 register_int_gauge_with_registry!("storage_version_size", "version size", registry)
407 .unwrap();
408
409 let current_version_id = register_int_gauge_with_registry!(
410 "storage_current_version_id",
411 "current version id",
412 registry
413 )
414 .unwrap();
415
416 let checkpoint_version_id = register_int_gauge_with_registry!(
417 "storage_checkpoint_version_id",
418 "checkpoint version id",
419 registry
420 )
421 .unwrap();
422
423 let min_pinned_version_id = register_int_gauge_with_registry!(
424 "storage_min_pinned_version_id",
425 "min pinned version id",
426 registry
427 )
428 .unwrap();
429
430 let write_stop_compaction_groups = register_int_gauge_vec_with_registry!(
431 "storage_write_stop_compaction_groups",
432 "compaction groups of write stop state",
433 &["compaction_group_id"],
434 registry
435 )
436 .unwrap();
437
438 let full_gc_trigger_count = register_int_gauge_with_registry!(
439 "storage_full_gc_trigger_count",
440 "the number of attempts to trigger full GC",
441 registry
442 )
443 .unwrap();
444
445 let opts = histogram_opts!(
446 "storage_full_gc_candidate_object_count",
447 "the number of candidate object to delete after scanning object store",
448 exponential_buckets(1.0, 10.0, 6).unwrap()
449 );
450 let full_gc_candidate_object_count =
451 register_histogram_with_registry!(opts, registry).unwrap();
452
453 let opts = histogram_opts!(
454 "storage_full_gc_selected_object_count",
455 "the number of object to delete after filtering by meta node",
456 exponential_buckets(1.0, 10.0, 6).unwrap()
457 );
458 let full_gc_selected_object_count =
459 register_histogram_with_registry!(opts, registry).unwrap();
460
461 let min_safepoint_version_id = register_int_gauge_with_registry!(
462 "storage_min_safepoint_version_id",
463 "min safepoint version id",
464 registry
465 )
466 .unwrap();
467
468 let level_file_size = register_int_gauge_vec_with_registry!(
469 "storage_level_total_file_size",
470 "KBs total file bytes in each level",
471 &["level_index"],
472 registry
473 )
474 .unwrap();
475
476 let version_stats = register_int_gauge_vec_with_registry!(
477 "storage_version_stats",
478 "per table stats in current hummock version",
479 &["table_id", "metric"],
480 registry
481 )
482 .unwrap();
483
484 let materialized_view_stats = register_int_gauge_vec_with_registry!(
485 "storage_materialized_view_stats",
486 "per materialized view stats in current hummock version",
487 &["table_id", "metric"],
488 registry
489 )
490 .unwrap();
491
492 let stale_object_count = register_int_gauge_with_registry!(
493 "storage_stale_object_count",
494 "total number of objects that is no longer referenced by versions.",
495 registry
496 )
497 .unwrap();
498
499 let stale_object_size = register_int_gauge_with_registry!(
500 "storage_stale_object_size",
501 "total size of objects that is no longer referenced by versions.",
502 registry
503 )
504 .unwrap();
505
506 let old_version_object_count = register_int_gauge_with_registry!(
507 "storage_old_version_object_count",
508 "total number of objects that is still referenced by non-current versions",
509 registry
510 )
511 .unwrap();
512
513 let old_version_object_size = register_int_gauge_with_registry!(
514 "storage_old_version_object_size",
515 "total size of objects that is still referenced by non-current versions",
516 registry
517 )
518 .unwrap();
519
520 let current_version_object_count = register_int_gauge_with_registry!(
521 "storage_current_version_object_count",
522 "total number of objects that is referenced by current version",
523 registry
524 )
525 .unwrap();
526
527 let current_version_object_size = register_int_gauge_with_registry!(
528 "storage_current_version_object_size",
529 "total size of objects that is referenced by current version",
530 registry
531 )
532 .unwrap();
533
534 let total_object_count = register_int_gauge_with_registry!(
535 "storage_total_object_count",
536 "Total number of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
537 registry
538 ).unwrap();
539
540 let total_object_size = register_int_gauge_with_registry!(
541 "storage_total_object_size",
542 "Total size of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
543 registry
544 ).unwrap();
545
546 let table_change_log_object_count = register_int_gauge_vec_with_registry!(
547 "storage_table_change_log_object_count",
548 "per table change log object count",
549 &["table_id"],
550 registry
551 )
552 .unwrap();
553
554 let table_change_log_object_size = register_int_gauge_vec_with_registry!(
555 "storage_table_change_log_object_size",
556 "per table change log object size",
557 &["table_id"],
558 registry
559 )
560 .unwrap();
561
562 let table_change_log_min_epoch = register_int_gauge_vec_with_registry!(
563 "storage_table_change_log_min_epoch",
564 "min epoch currently retained in table change log",
565 &["table_id"],
566 registry
567 )
568 .unwrap();
569
570 let time_travel_object_count = register_int_gauge_with_registry!(
571 "storage_time_travel_object_count",
572 "total number of objects that is referenced by time travel.",
573 registry
574 )
575 .unwrap();
576
577 let delta_log_count = register_int_gauge_with_registry!(
578 "storage_delta_log_count",
579 "total number of hummock version delta log",
580 registry
581 )
582 .unwrap();
583
584 let opts = histogram_opts!(
585 "storage_version_checkpoint_latency",
586 "hummock version checkpoint latency",
587 exponential_buckets(0.1, 1.5, 20).unwrap()
588 );
589 let version_checkpoint_latency = register_histogram_with_registry!(opts, registry).unwrap();
590
591 let hummock_manager_lock_time = register_histogram_vec_with_registry!(
592 "hummock_manager_lock_time",
593 "latency for hummock manager to acquire the rwlock",
594 &["lock_name", "lock_type"],
595 registry
596 )
597 .unwrap();
598
599 let hummock_manager_real_process_time = register_histogram_vec_with_registry!(
600 "meta_hummock_manager_real_process_time",
601 "latency for hummock manager to really process the request",
602 &["method"],
603 registry
604 )
605 .unwrap();
606
607 let worker_num = register_int_gauge_vec_with_registry!(
608 "worker_num",
609 "number of nodes in the cluster",
610 &["worker_type"],
611 registry,
612 )
613 .unwrap();
614
615 let meta_type = register_int_gauge_vec_with_registry!(
616 "meta_num",
617 "role of meta nodes in the cluster",
618 &["worker_addr", "role"],
619 registry,
620 )
621 .unwrap();
622
623 let compact_pending_bytes = register_int_gauge_vec_with_registry!(
624 "storage_compact_pending_bytes",
625 "bytes of lsm tree needed to reach balance",
626 &["group"],
627 registry
628 )
629 .unwrap();
630
631 let compact_level_compression_ratio = register_gauge_vec_with_registry!(
632 "storage_compact_level_compression_ratio",
633 "compression ratio of each level of the lsm tree",
634 &["group", "level", "algorithm"],
635 registry
636 )
637 .unwrap();
638
639 let level_compact_task_cnt = register_int_gauge_vec_with_registry!(
640 "storage_level_compact_task_cnt",
641 "num of compact_task organized by group and level",
642 &["task"],
643 registry
644 )
645 .unwrap();
646
647 let time_travel_vacuum_metadata_latency = register_histogram_with_registry!(
648 histogram_opts!(
649 "storage_time_travel_vacuum_metadata_latency",
650 "Latency of vacuuming metadata for time travel",
651 exponential_buckets(0.1, 1.5, 20).unwrap()
652 ),
653 registry
654 )
655 .unwrap();
656 let time_travel_write_metadata_latency = register_histogram_with_registry!(
657 histogram_opts!(
658 "storage_time_travel_write_metadata_latency",
659 "Latency of writing metadata for time travel",
660 exponential_buckets(0.1, 1.5, 20).unwrap()
661 ),
662 registry
663 )
664 .unwrap();
665
666 let object_store_metric = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
667
668 let recovery_failure_cnt = register_int_counter_vec_with_registry!(
669 "recovery_failure_cnt",
670 "Number of failed recovery attempts",
671 &["recovery_type"],
672 registry
673 )
674 .unwrap();
675 let opts = histogram_opts!(
676 "recovery_latency",
677 "Latency of the recovery process",
678 exponential_buckets(0.1, 1.5, 20).unwrap() );
680 let recovery_latency =
681 register_histogram_vec_with_registry!(opts, &["recovery_type"], registry).unwrap();
682
683 let auto_schema_change_failure_cnt = register_guarded_int_counter_vec_with_registry!(
684 "auto_schema_change_failure_cnt",
685 "Number of failed auto schema change",
686 &["table_id", "table_name"],
687 registry
688 )
689 .unwrap();
690
691 let auto_schema_change_success_cnt = register_guarded_int_counter_vec_with_registry!(
692 "auto_schema_change_success_cnt",
693 "Number of success auto schema change",
694 &["table_id", "table_name"],
695 registry
696 )
697 .unwrap();
698
699 let opts = histogram_opts!(
700 "auto_schema_change_latency",
701 "Latency of the auto schema change process",
702 exponential_buckets(0.1, 1.5, 20).unwrap() );
704 let auto_schema_change_latency = register_guarded_histogram_vec_with_registry!(
705 opts,
706 &["table_id", "table_name"],
707 registry
708 )
709 .unwrap();
710
711 let source_is_up = register_guarded_int_gauge_vec_with_registry!(
712 "source_status_is_up",
713 "source is up or not",
714 &["source_id", "source_name"],
715 registry
716 )
717 .unwrap();
718 let source_enumerator_metrics = Arc::new(SourceEnumeratorMetrics::default());
719
720 let actor_info = register_int_gauge_vec_with_registry!(
721 "actor_info",
722 "Mapping from actor id to (fragment id, compute node)",
723 &["actor_id", "fragment_id", "compute_node"],
724 registry
725 )
726 .unwrap();
727
728 let table_info = register_int_gauge_vec_with_registry!(
729 "table_info",
730 "Mapping from table id to (actor id, table name)",
731 &[
732 "materialized_view_id",
733 "table_id",
734 "fragment_id",
735 "table_name",
736 "table_type",
737 "compaction_group_id"
738 ],
739 registry
740 )
741 .unwrap();
742
743 let sink_info = register_int_gauge_vec_with_registry!(
744 "sink_info",
745 "Mapping from actor id to (actor id, sink name)",
746 &["actor_id", "sink_id", "sink_name",],
747 registry
748 )
749 .unwrap();
750
751 let relation_info = register_int_gauge_vec_with_registry!(
752 "relation_info",
753 "Information of the database relation (table/source/sink/materialized view/index/internal)",
754 &["id", "database", "schema", "name", "resource_group", "type"],
755 registry
756 )
757 .unwrap();
758
759 let backfill_fragment_progress = register_int_gauge_vec_with_registry!(
760 "backfill_fragment_progress",
761 "Backfill progress per fragment",
762 &[
763 "job_id",
764 "fragment_id",
765 "backfill_state_table_id",
766 "backfill_target_relation_id",
767 "backfill_target_relation_name",
768 "backfill_target_relation_type",
769 "backfill_type",
770 "backfill_epoch",
771 "upstream_type",
772 "backfill_progress",
773 ],
774 registry
775 )
776 .unwrap();
777
778 let l0_compact_level_count = register_histogram_vec_with_registry!(
779 "storage_l0_compact_level_count",
780 "level_count of l0 compact task",
781 &["group", "type"],
782 registry
783 )
784 .unwrap();
785
786 let system_param_info = register_int_gauge_vec_with_registry!(
788 "system_param_info",
789 "Information of system parameters",
790 &["name", "value"],
791 registry
792 )
793 .unwrap();
794
795 let opts = histogram_opts!(
796 "storage_compact_task_size",
797 "Total size of compact that have been issued to state store",
798 exponential_buckets(1048576.0, 2.0, 16).unwrap()
799 );
800
801 let compact_task_size =
802 register_histogram_vec_with_registry!(opts, &["group", "type"], registry).unwrap();
803
804 let compact_task_file_count = register_histogram_vec_with_registry!(
805 "storage_compact_task_file_count",
806 "file count of compact task",
807 &["group", "type"],
808 registry
809 )
810 .unwrap();
811 let opts = histogram_opts!(
812 "storage_compact_task_batch_count",
813 "count of compact task batch",
814 exponential_buckets(1.0, 2.0, 8).unwrap()
815 );
816 let compact_task_batch_count =
817 register_histogram_vec_with_registry!(opts, &["type"], registry).unwrap();
818
819 let table_write_throughput = register_int_counter_vec_with_registry!(
820 "storage_commit_write_throughput",
821 "The number of compactions from one level to another level that have been skipped.",
822 &["table_id"],
823 registry
824 )
825 .unwrap();
826
827 let split_compaction_group_count = register_int_counter_vec_with_registry!(
828 "storage_split_compaction_group_count",
829 "Count of trigger split compaction group",
830 &["group"],
831 registry
832 )
833 .unwrap();
834
835 let state_table_count = register_int_gauge_vec_with_registry!(
836 "storage_state_table_count",
837 "Count of stable table per compaction group",
838 &["group"],
839 registry
840 )
841 .unwrap();
842
843 let branched_sst_count = register_int_gauge_vec_with_registry!(
844 "storage_branched_sst_count",
845 "Count of branched sst per compaction group",
846 &["group"],
847 registry
848 )
849 .unwrap();
850
851 let opts = histogram_opts!(
852 "storage_compaction_event_consumed_latency",
853 "The latency(ms) of each event being consumed",
854 exponential_buckets(1.0, 1.5, 30).unwrap() );
856 let compaction_event_consumed_latency =
857 register_histogram_with_registry!(opts, registry).unwrap();
858
859 let opts = histogram_opts!(
860 "storage_compaction_event_loop_iteration_latency",
861 "The latency(ms) of each iteration of the compaction event loop",
862 exponential_buckets(1.0, 1.5, 30).unwrap() );
864 let compaction_event_loop_iteration_latency =
865 register_histogram_with_registry!(opts, registry).unwrap();
866
867 let merge_compaction_group_count = register_int_counter_vec_with_registry!(
868 "storage_merge_compaction_group_count",
869 "Count of trigger merge compaction group",
870 &["group"],
871 registry
872 )
873 .unwrap();
874
875 let opts = histogram_opts!(
876 "storage_time_travel_version_replay_latency",
877 "The latency(ms) of replaying a hummock version for time travel",
878 exponential_buckets(0.01, 10.0, 6).unwrap()
879 );
880 let time_travel_version_replay_latency =
881 register_histogram_with_registry!(opts, registry).unwrap();
882
883 let compaction_group_count = register_int_gauge_with_registry!(
884 "storage_compaction_group_count",
885 "The number of compaction groups",
886 registry,
887 )
888 .unwrap();
889
890 let compaction_group_size = register_int_gauge_vec_with_registry!(
891 "storage_compaction_group_size",
892 "The size of compaction group",
893 &["group"],
894 registry
895 )
896 .unwrap();
897
898 let compaction_group_file_count = register_int_gauge_vec_with_registry!(
899 "storage_compaction_group_file_count",
900 "The file count of compaction group",
901 &["group"],
902 registry
903 )
904 .unwrap();
905
906 let compaction_group_throughput = register_int_gauge_vec_with_registry!(
907 "storage_compaction_group_throughput",
908 "The throughput of compaction group",
909 &["group"],
910 registry
911 )
912 .unwrap();
913
914 let opts = histogram_opts!(
915 "storage_compact_task_trivial_move_sst_count",
916 "sst count of compact trivial-move task",
917 exponential_buckets(1.0, 2.0, 8).unwrap()
918 );
919 let compact_task_trivial_move_sst_count =
920 register_histogram_vec_with_registry!(opts, &["group"], registry).unwrap();
921
922 let refresh_job_duration = register_guarded_uint_gauge_vec_with_registry!(
923 "meta_refresh_job_duration",
924 "The duration of refresh job",
925 &["table_id", "status"],
926 registry
927 )
928 .unwrap();
929 let refresh_job_finish_cnt = register_guarded_int_counter_vec_with_registry!(
930 "meta_refresh_job_finish_cnt",
931 "The number of finished refresh jobs",
932 &["table_id", "status"],
933 registry
934 )
935 .unwrap();
936 let refresh_cron_job_trigger_cnt = register_guarded_int_counter_vec_with_registry!(
937 "meta_refresh_cron_job_trigger_cnt",
938 "The number of cron refresh jobs triggered",
939 &["table_id"],
940 registry
941 )
942 .unwrap();
943 let refresh_cron_job_miss_cnt = register_guarded_int_counter_vec_with_registry!(
944 "meta_refresh_cron_job_miss_cnt",
945 "The number of cron refresh jobs missed",
946 &["table_id"],
947 registry
948 )
949 .unwrap();
950
951 Self {
952 grpc_latency,
953 barrier_latency,
954 barrier_wait_commit_latency,
955 barrier_send_latency,
956 all_barrier_nums,
957 in_flight_barrier_nums,
958 last_committed_barrier_time,
959 barrier_interval_by_database,
960 snapshot_backfill_barrier_latency,
961 snapshot_backfill_wait_commit_latency,
962 snapshot_backfill_lag,
963 snapshot_backfill_inflight_barrier_num,
964 recovery_failure_cnt,
965 recovery_latency,
966
967 max_committed_epoch,
968 min_committed_epoch,
969 level_sst_num,
970 level_compact_cnt,
971 compact_frequency,
972 compact_skip_frequency,
973 level_file_size,
974 version_size,
975 version_stats,
976 materialized_view_stats,
977 stale_object_count,
978 stale_object_size,
979 old_version_object_count,
980 old_version_object_size,
981 time_travel_object_count,
982 current_version_object_count,
983 current_version_object_size,
984 total_object_count,
985 total_object_size,
986 table_change_log_object_count,
987 table_change_log_object_size,
988 table_change_log_min_epoch,
989 delta_log_count,
990 version_checkpoint_latency,
991 current_version_id,
992 checkpoint_version_id,
993 min_pinned_version_id,
994 min_safepoint_version_id,
995 write_stop_compaction_groups,
996 full_gc_trigger_count,
997 full_gc_candidate_object_count,
998 full_gc_selected_object_count,
999 hummock_manager_lock_time,
1000 hummock_manager_real_process_time,
1001 time_after_last_observation: Arc::new(AtomicU64::new(0)),
1002 worker_num,
1003 meta_type,
1004 compact_pending_bytes,
1005 compact_level_compression_ratio,
1006 level_compact_task_cnt,
1007 object_store_metric,
1008 source_is_up,
1009 source_enumerator_metrics,
1010 actor_info,
1011 table_info,
1012 sink_info,
1013 relation_info,
1014 backfill_fragment_progress,
1015 system_param_info,
1016 l0_compact_level_count,
1017 compact_task_size,
1018 compact_task_file_count,
1019 compact_task_batch_count,
1020 compact_task_trivial_move_sst_count,
1021 table_write_throughput,
1022 split_compaction_group_count,
1023 state_table_count,
1024 branched_sst_count,
1025 compaction_event_consumed_latency,
1026 compaction_event_loop_iteration_latency,
1027 auto_schema_change_failure_cnt,
1028 auto_schema_change_success_cnt,
1029 auto_schema_change_latency,
1030 merge_compaction_group_count,
1031 time_travel_version_replay_latency,
1032 compaction_group_count,
1033 compaction_group_size,
1034 compaction_group_file_count,
1035 compaction_group_throughput,
1036 refresh_job_duration,
1037 refresh_job_finish_cnt,
1038 refresh_cron_job_trigger_cnt,
1039 refresh_cron_job_miss_cnt,
1040 time_travel_vacuum_metadata_latency,
1041 time_travel_write_metadata_latency,
1042 }
1043 }
1044
1045 #[cfg(test)]
1046 pub fn for_test(registry: &Registry) -> Self {
1047 Self::new(registry)
1048 }
1049}
1050impl Default for MetaMetrics {
1051 fn default() -> Self {
1052 GLOBAL_META_METRICS.clone()
1053 }
1054}
1055
1056pub async fn refresh_system_param_info_metrics(
1058 system_params_controller: &SystemParamsControllerRef,
1059 meta_metrics: Arc<MetaMetrics>,
1060) {
1061 let params_info = system_params_controller.get_params().await.get_all();
1062
1063 meta_metrics.system_param_info.reset();
1064 for info in params_info {
1065 meta_metrics
1066 .system_param_info
1067 .with_label_values(&[info.name, &info.value])
1068 .set(1);
1069 }
1070}
1071
1072pub fn start_worker_info_monitor(
1073 metadata_manager: MetadataManager,
1074 election_client: ElectionClientRef,
1075 interval: Duration,
1076 meta_metrics: Arc<MetaMetrics>,
1077) -> (JoinHandle<()>, Sender<()>) {
1078 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1079 let join_handle = tokio::spawn(async move {
1080 let mut monitor_interval = tokio::time::interval(interval);
1081 monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1082 loop {
1083 tokio::select! {
1084 _ = monitor_interval.tick() => {},
1086 _ = &mut shutdown_rx => {
1088 tracing::info!("Worker number monitor is stopped");
1089 return;
1090 }
1091 }
1092
1093 let node_map = match metadata_manager.count_worker_node().await {
1094 Ok(node_map) => node_map,
1095 Err(err) => {
1096 tracing::warn!(error = %err.as_report(), "fail to count worker node");
1097 continue;
1098 }
1099 };
1100
1101 meta_metrics.worker_num.reset();
1103 meta_metrics.meta_type.reset();
1104
1105 for (worker_type, worker_num) in node_map {
1106 meta_metrics
1107 .worker_num
1108 .with_label_values(&[(worker_type.as_str_name())])
1109 .set(worker_num as i64);
1110 }
1111 if let Ok(meta_members) = election_client.get_members().await {
1112 meta_metrics
1113 .worker_num
1114 .with_label_values(&[WorkerType::Meta.as_str_name()])
1115 .set(meta_members.len() as i64);
1116 meta_members.into_iter().for_each(|m| {
1117 let role = if m.is_leader { "leader" } else { "follower" };
1118 meta_metrics
1119 .meta_type
1120 .with_label_values(&[m.id.as_str(), role])
1121 .set(1);
1122 });
1123 }
1124 }
1125 });
1126
1127 (join_handle, shutdown_tx)
1128}
1129
1130pub async fn refresh_fragment_info_metrics(
1131 catalog_controller: &CatalogControllerRef,
1132 cluster_controller: &ClusterControllerRef,
1133 hummock_manager: &HummockManagerRef,
1134 meta_metrics: Arc<MetaMetrics>,
1135) {
1136 let worker_nodes = match cluster_controller
1137 .list_workers(Some(WorkerType::ComputeNode.into()), None)
1138 .await
1139 {
1140 Ok(worker_nodes) => worker_nodes,
1141 Err(err) => {
1142 tracing::warn!(error=%err.as_report(), "fail to list worker node");
1143 return;
1144 }
1145 };
1146 let actor_locations = match catalog_controller.list_actor_locations() {
1147 Ok(actor_locations) => actor_locations,
1148 Err(err) => {
1149 tracing::warn!(error=%err.as_report(), "fail to get actor locations");
1150 return;
1151 }
1152 };
1153 let sink_actor_mapping = match catalog_controller.list_sink_actor_mapping().await {
1154 Ok(sink_actor_mapping) => sink_actor_mapping,
1155 Err(err) => {
1156 tracing::warn!(error=%err.as_report(), "fail to get sink actor mapping");
1157 return;
1158 }
1159 };
1160 let fragment_state_tables = match catalog_controller.list_fragment_state_tables().await {
1161 Ok(fragment_state_tables) => fragment_state_tables,
1162 Err(err) => {
1163 tracing::warn!(error=%err.as_report(), "fail to get fragment state tables");
1164 return;
1165 }
1166 };
1167 let table_name_and_type_mapping = match catalog_controller.get_table_name_type_mapping().await {
1168 Ok(mapping) => mapping,
1169 Err(err) => {
1170 tracing::warn!(error=%err.as_report(), "fail to get table name mapping");
1171 return;
1172 }
1173 };
1174
1175 let worker_addr_mapping: HashMap<WorkerId, String> = worker_nodes
1176 .into_iter()
1177 .map(|worker_node| {
1178 let addr = match worker_node.host {
1179 Some(host) => format!("{}:{}", host.host, host.port),
1180 None => "".to_owned(),
1181 };
1182 (worker_node.id, addr)
1183 })
1184 .collect();
1185 let table_compaction_group_id_mapping = hummock_manager
1186 .get_table_compaction_group_id_mapping()
1187 .await;
1188
1189 meta_metrics.actor_info.reset();
1192 meta_metrics.table_info.reset();
1193 meta_metrics.sink_info.reset();
1194 for actor_location in actor_locations {
1195 let actor_id_str = actor_location.actor_id.to_string();
1196 let fragment_id_str = actor_location.fragment_id.to_string();
1197 if let Some(address) = worker_addr_mapping.get(&actor_location.worker_id) {
1200 meta_metrics
1201 .actor_info
1202 .with_label_values(&[&actor_id_str, &fragment_id_str, address])
1203 .set(1);
1204 }
1205 }
1206 for (sink_id, (sink_name, actor_ids)) in sink_actor_mapping {
1207 let sink_id_str = sink_id.to_string();
1208 for actor_id in actor_ids {
1209 let actor_id_str = actor_id.to_string();
1210 meta_metrics
1211 .sink_info
1212 .with_label_values(&[&actor_id_str, &sink_id_str, &sink_name])
1213 .set(1);
1214 }
1215 }
1216 for PartialFragmentStateTables {
1217 fragment_id,
1218 job_id,
1219 state_table_ids,
1220 } in fragment_state_tables
1221 {
1222 let fragment_id_str = fragment_id.to_string();
1223 let job_id_str = job_id.to_string();
1224 for table_id in state_table_ids.into_inner() {
1225 let table_id_str = table_id.to_string();
1226 let (table_name, table_type) = table_name_and_type_mapping
1227 .get(&table_id)
1228 .cloned()
1229 .unwrap_or_else(|| ("unknown".to_owned(), "unknown".to_owned()));
1230 let compaction_group_id = table_compaction_group_id_mapping
1231 .get(&table_id)
1232 .map(|cg_id| cg_id.to_string())
1233 .unwrap_or_else(|| "unknown".to_owned());
1234 meta_metrics
1235 .table_info
1236 .with_label_values(&[
1237 &job_id_str,
1238 &table_id_str,
1239 &fragment_id_str,
1240 &table_name,
1241 &table_type,
1242 &compaction_group_id,
1243 ])
1244 .set(1);
1245 }
1246 }
1247}
1248
1249pub async fn refresh_relation_info_metrics(
1250 catalog_controller: &CatalogControllerRef,
1251 meta_metrics: Arc<MetaMetrics>,
1252) {
1253 let table_objects = match catalog_controller.list_table_objects().await {
1254 Ok(table_objects) => table_objects,
1255 Err(err) => {
1256 tracing::warn!(error=%err.as_report(), "fail to get table objects");
1257 return;
1258 }
1259 };
1260
1261 let source_objects = match catalog_controller.list_source_objects().await {
1262 Ok(source_objects) => source_objects,
1263 Err(err) => {
1264 tracing::warn!(error=%err.as_report(), "fail to get source objects");
1265 return;
1266 }
1267 };
1268
1269 let sink_objects = match catalog_controller.list_sink_objects().await {
1270 Ok(sink_objects) => sink_objects,
1271 Err(err) => {
1272 tracing::warn!(error=%err.as_report(), "fail to get sink objects");
1273 return;
1274 }
1275 };
1276
1277 meta_metrics.relation_info.reset();
1278
1279 for (id, db, schema, name, resource_group, table_type) in table_objects {
1280 let relation_type = match table_type {
1281 TableType::Table => "table",
1282 TableType::MaterializedView => "materialized_view",
1283 TableType::Index | TableType::VectorIndex => "index",
1284 TableType::Internal => "internal",
1285 };
1286 meta_metrics
1287 .relation_info
1288 .with_label_values(&[
1289 &id.to_string(),
1290 &db,
1291 &schema,
1292 &name,
1293 &resource_group,
1294 &relation_type.to_owned(),
1295 ])
1296 .set(1);
1297 }
1298
1299 for (id, db, schema, name, resource_group) in source_objects {
1300 meta_metrics
1301 .relation_info
1302 .with_label_values(&[
1303 &id.to_string(),
1304 &db,
1305 &schema,
1306 &name,
1307 &resource_group,
1308 &"source".to_owned(),
1309 ])
1310 .set(1);
1311 }
1312
1313 for (id, db, schema, name, resource_group) in sink_objects {
1314 meta_metrics
1315 .relation_info
1316 .with_label_values(&[
1317 &id.to_string(),
1318 &db,
1319 &schema,
1320 &name,
1321 &resource_group,
1322 &"sink".to_owned(),
1323 ])
1324 .set(1);
1325 }
1326}
1327
1328fn extract_backfill_fragment_info(
1329 distribution: &FragmentDistribution,
1330) -> Option<BackfillFragmentInfo> {
1331 let backfill_type =
1332 if distribution.fragment_type_mask & FragmentTypeFlag::SourceScan as u32 != 0 {
1333 "SOURCE"
1334 } else if distribution.fragment_type_mask
1335 & (FragmentTypeFlag::SnapshotBackfillStreamScan as u32
1336 | FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32)
1337 != 0
1338 {
1339 "SNAPSHOT_BACKFILL"
1340 } else if distribution.fragment_type_mask & FragmentTypeFlag::StreamScan as u32 != 0 {
1341 "ARRANGEMENT_OR_NO_SHUFFLE"
1342 } else {
1343 return None;
1344 };
1345
1346 let stream_node = distribution.node.as_ref()?;
1347 let mut info = None;
1348 match backfill_type {
1349 "SOURCE" => {
1350 visit_stream_node_source_backfill(stream_node, |node| {
1351 info = Some(BackfillFragmentInfo {
1352 job_id: distribution.table_id.as_raw_id(),
1353 fragment_id: distribution.fragment_id.as_raw_id(),
1354 backfill_state_table_id: node
1355 .state_table
1356 .as_ref()
1357 .map(|table| table.id.as_raw_id())
1358 .unwrap_or_default(),
1359 backfill_target_relation_id: node.upstream_source_id.as_raw_id(),
1360 backfill_type,
1361 backfill_epoch: 0,
1362 });
1363 });
1364 }
1365 "SNAPSHOT_BACKFILL" | "ARRANGEMENT_OR_NO_SHUFFLE" => {
1366 visit_stream_node_stream_scan(stream_node, |node| {
1367 info = Some(BackfillFragmentInfo {
1368 job_id: distribution.table_id.as_raw_id(),
1369 fragment_id: distribution.fragment_id.as_raw_id(),
1370 backfill_state_table_id: node
1371 .state_table
1372 .as_ref()
1373 .map(|table| table.id.as_raw_id())
1374 .unwrap_or_default(),
1375 backfill_target_relation_id: node.table_id.as_raw_id(),
1376 backfill_type,
1377 backfill_epoch: node.snapshot_backfill_epoch.unwrap_or_default(),
1378 });
1379 });
1380 }
1381 _ => {}
1382 }
1383
1384 info
1385}
1386
1387pub async fn refresh_backfill_progress_metrics(
1388 catalog_controller: &CatalogControllerRef,
1389 hummock_manager: &HummockManagerRef,
1390 barrier_manager: &BarrierManagerRef,
1391 meta_metrics: Arc<MetaMetrics>,
1392) {
1393 let fragment_descs = match catalog_controller.list_fragment_descs_with_node(true).await {
1394 Ok(fragment_descs) => fragment_descs,
1395 Err(err) => {
1396 tracing::warn!(error=%err.as_report(), "fail to list creating fragment descs");
1397 return;
1398 }
1399 };
1400
1401 let backfill_infos: HashMap<(u32, u32), BackfillFragmentInfo> = fragment_descs
1402 .iter()
1403 .filter_map(|(distribution, _)| extract_backfill_fragment_info(distribution))
1404 .map(|info| ((info.job_id, info.fragment_id), info))
1405 .collect();
1406
1407 let fragment_progresses = match barrier_manager.get_fragment_backfill_progress().await {
1408 Ok(progress) => progress,
1409 Err(err) => {
1410 tracing::warn!(error=%err.as_report(), "fail to get fragment backfill progress");
1411 return;
1412 }
1413 };
1414
1415 let progress_by_fragment: HashMap<(u32, u32), _> = fragment_progresses
1416 .into_iter()
1417 .map(|progress| {
1418 (
1419 (
1420 progress.job_id.as_raw_id(),
1421 progress.fragment_id.as_raw_id(),
1422 ),
1423 progress,
1424 )
1425 })
1426 .collect();
1427
1428 let version_stats = hummock_manager.get_version_stats().await;
1429
1430 let relation_ids: HashSet<_> = backfill_infos
1431 .values()
1432 .map(|info| ObjectId::new(info.backfill_target_relation_id))
1433 .collect();
1434 let relation_objects = match catalog_controller
1435 .list_relation_objects_by_ids(&relation_ids)
1436 .await
1437 {
1438 Ok(relation_objects) => relation_objects,
1439 Err(err) => {
1440 tracing::warn!(error=%err.as_report(), "fail to get relation objects");
1441 return;
1442 }
1443 };
1444
1445 let mut relation_info = HashMap::new();
1446 for (id, db, schema, name, rel_type) in relation_objects {
1447 relation_info.insert(id.as_raw_id(), (db, schema, name, rel_type));
1448 }
1449
1450 meta_metrics.backfill_fragment_progress.reset();
1451
1452 for info in backfill_infos.values() {
1453 let progress = progress_by_fragment.get(&(info.job_id, info.fragment_id));
1454 let (db, schema, name, rel_type) = relation_info
1455 .get(&info.backfill_target_relation_id)
1456 .cloned()
1457 .unwrap_or_else(|| {
1458 (
1459 "unknown".to_owned(),
1460 "unknown".to_owned(),
1461 "unknown".to_owned(),
1462 "unknown".to_owned(),
1463 )
1464 });
1465
1466 let job_id_str = info.job_id.to_string();
1467 let fragment_id_str = info.fragment_id.to_string();
1468 let backfill_state_table_id_str = info.backfill_state_table_id.to_string();
1469 let backfill_target_relation_id_str = info.backfill_target_relation_id.to_string();
1470 let backfill_target_relation_name_str = format!("{db}.{schema}.{name}");
1471 let backfill_target_relation_type_str = rel_type;
1472 let backfill_type_str = info.backfill_type.to_owned();
1473 let backfill_epoch_str = info.backfill_epoch.to_string();
1474 let total_key_count = version_stats
1475 .table_stats
1476 .get(&TableId::new(info.backfill_target_relation_id))
1477 .map(|stats| stats.total_key_count as u64);
1478
1479 let progress_label = match (info.backfill_type, progress) {
1480 ("SOURCE", Some(progress)) => format!("{} consumed rows", progress.consumed_rows),
1481 ("SOURCE", None) => "0 consumed rows".to_owned(),
1482 (_, Some(progress)) if progress.done => {
1483 let total = total_key_count.unwrap_or(0);
1484 format!("100.0000% ({}/{})", total, total)
1485 }
1486 (_, Some(progress)) => {
1487 let total = total_key_count.unwrap_or(0);
1488 if total == 0 {
1489 "100.0000% (0/0)".to_owned()
1490 } else {
1491 let raw = (progress.consumed_rows as f64) / (total as f64) * 100.0;
1492 format!(
1493 "{:.4}% ({}/{})",
1494 raw.min(100.0),
1495 progress.consumed_rows,
1496 total
1497 )
1498 }
1499 }
1500 (_, None) => "0.0000% (0/0)".to_owned(),
1501 };
1502 let upstream_type_str = progress
1503 .map(|progress| progress.upstream_type.to_string())
1504 .unwrap_or_else(|| "Unknown".to_owned());
1505
1506 meta_metrics
1507 .backfill_fragment_progress
1508 .with_label_values(&[
1509 &job_id_str,
1510 &fragment_id_str,
1511 &backfill_state_table_id_str,
1512 &backfill_target_relation_id_str,
1513 &backfill_target_relation_name_str,
1514 &backfill_target_relation_type_str,
1515 &backfill_type_str,
1516 &backfill_epoch_str,
1517 &upstream_type_str,
1518 &progress_label,
1519 ])
1520 .set(1);
1521 }
1522}
1523
1524pub fn start_info_monitor(
1525 metadata_manager: MetadataManager,
1526 hummock_manager: HummockManagerRef,
1527 barrier_manager: BarrierManagerRef,
1528 system_params_controller: SystemParamsControllerRef,
1529 meta_metrics: Arc<MetaMetrics>,
1530) -> (JoinHandle<()>, Sender<()>) {
1531 const COLLECT_INTERVAL_SECONDS: u64 = 60;
1532
1533 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1534 let join_handle = tokio::spawn(async move {
1535 let mut monitor_interval =
1536 tokio::time::interval(Duration::from_secs(COLLECT_INTERVAL_SECONDS));
1537 monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1538 loop {
1539 tokio::select! {
1540 _ = monitor_interval.tick() => {},
1542 _ = &mut shutdown_rx => {
1544 tracing::info!("Meta info monitor is stopped");
1545 return;
1546 }
1547 }
1548
1549 refresh_fragment_info_metrics(
1551 &metadata_manager.catalog_controller,
1552 &metadata_manager.cluster_controller,
1553 &hummock_manager,
1554 meta_metrics.clone(),
1555 )
1556 .await;
1557
1558 refresh_relation_info_metrics(
1559 &metadata_manager.catalog_controller,
1560 meta_metrics.clone(),
1561 )
1562 .await;
1563
1564 refresh_backfill_progress_metrics(
1565 &metadata_manager.catalog_controller,
1566 &hummock_manager,
1567 &barrier_manager,
1568 meta_metrics.clone(),
1569 )
1570 .await;
1571
1572 refresh_system_param_info_metrics(&system_params_controller, meta_metrics.clone())
1574 .await;
1575 }
1576 });
1577
1578 (join_handle, shutdown_tx)
1579}