1use std::collections::{HashMap, HashSet};
16use std::sync::atomic::AtomicU64;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use prometheus::core::{AtomicF64, GenericGaugeVec};
21use prometheus::{
22 GaugeVec, Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
23 exponential_buckets, histogram_opts, register_gauge_vec_with_registry,
24 register_histogram_vec_with_registry, register_histogram_with_registry,
25 register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
26 register_int_gauge_with_registry,
27};
28use risingwave_common::catalog::{FragmentTypeFlag, TableId};
29use risingwave_common::metrics::{
30 LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
31 LabelGuardedUintGaugeVec,
32};
33use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
34use risingwave_common::system_param::reader::SystemParamsRead;
35use risingwave_common::util::stream_graph_visitor::{
36 visit_stream_node_source_backfill, visit_stream_node_stream_scan,
37};
38use risingwave_common::{
39 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
40 register_guarded_int_gauge_vec_with_registry, register_guarded_uint_gauge_vec_with_registry,
41};
42use risingwave_connector::source::monitor::EnumeratorMetrics as SourceEnumeratorMetrics;
43use risingwave_meta_model::table::TableType;
44use risingwave_meta_model::{ObjectId, WorkerId};
45use risingwave_object_store::object::object_metrics::{
46 GLOBAL_OBJECT_STORE_METRICS, ObjectStoreMetrics,
47};
48use risingwave_pb::common::WorkerType;
49use risingwave_pb::meta::FragmentDistribution;
50use thiserror_ext::AsReport;
51use tokio::sync::oneshot::Sender;
52use tokio::task::JoinHandle;
53
54use crate::barrier::BarrierManagerRef;
55use crate::controller::catalog::CatalogControllerRef;
56use crate::controller::cluster::ClusterControllerRef;
57use crate::controller::system_param::SystemParamsControllerRef;
58use crate::controller::utils::PartialFragmentStateTables;
59use crate::hummock::HummockManagerRef;
60use crate::manager::MetadataManager;
61use crate::rpc::ElectionClientRef;
62
63struct BackfillFragmentInfo {
64 job_id: u32,
65 fragment_id: u32,
66 backfill_state_table_id: u32,
67 backfill_target_relation_id: u32,
68 backfill_type: &'static str,
69 backfill_epoch: u64,
70}
71
72#[derive(Clone)]
73pub struct MetaMetrics {
74 pub worker_num: IntGaugeVec,
77 pub meta_type: IntGaugeVec,
79
80 pub grpc_latency: HistogramVec,
83
84 pub barrier_latency: LabelGuardedHistogramVec,
88 pub barrier_wait_commit_latency: Histogram,
90 pub barrier_send_latency: LabelGuardedHistogramVec,
92 pub all_barrier_nums: LabelGuardedIntGaugeVec,
95 pub in_flight_barrier_nums: LabelGuardedIntGaugeVec,
97 pub last_committed_barrier_time: IntGaugeVec,
99 pub barrier_interval_by_database: GaugeVec,
101
102 pub snapshot_backfill_barrier_latency: LabelGuardedHistogramVec, pub snapshot_backfill_lag: LabelGuardedIntGaugeVec, pub snapshot_backfill_inflight_barrier_num: LabelGuardedIntGaugeVec, pub recovery_failure_cnt: IntCounterVec,
112 pub recovery_latency: HistogramVec,
113
114 pub max_committed_epoch: IntGauge,
117 pub min_committed_epoch: IntGauge,
119 pub level_sst_num: IntGaugeVec,
121 pub level_compact_cnt: IntGaugeVec,
123 pub compact_frequency: IntCounterVec,
125 pub level_file_size: IntGaugeVec,
127 pub version_size: IntGauge,
129 pub current_version_id: IntGauge,
131 pub checkpoint_version_id: IntGauge,
133 pub min_pinned_version_id: IntGauge,
135 pub min_safepoint_version_id: IntGauge,
137 pub write_stop_compaction_groups: IntGaugeVec,
139 pub full_gc_trigger_count: IntGauge,
141 pub full_gc_candidate_object_count: Histogram,
143 pub full_gc_selected_object_count: Histogram,
145 pub version_stats: IntGaugeVec,
147 pub materialized_view_stats: IntGaugeVec,
149 pub stale_object_count: IntGauge,
151 pub stale_object_size: IntGauge,
153 pub old_version_object_count: IntGauge,
155 pub old_version_object_size: IntGauge,
157 pub time_travel_object_count: IntGauge,
159 pub current_version_object_count: IntGauge,
161 pub current_version_object_size: IntGauge,
163 pub total_object_count: IntGauge,
165 pub total_object_size: IntGauge,
167 pub table_change_log_object_count: IntGaugeVec,
169 pub table_change_log_object_size: IntGaugeVec,
171 pub table_change_log_min_epoch: IntGaugeVec,
173 pub delta_log_count: IntGauge,
175 pub version_checkpoint_latency: Histogram,
177 pub hummock_manager_lock_time: HistogramVec,
179 pub hummock_manager_real_process_time: HistogramVec,
181 pub compact_skip_frequency: IntCounterVec,
183 pub compact_pending_bytes: IntGaugeVec,
185 pub compact_level_compression_ratio: GenericGaugeVec<AtomicF64>,
187 pub level_compact_task_cnt: IntGaugeVec,
189 pub time_after_last_observation: Arc<AtomicU64>,
190 pub l0_compact_level_count: HistogramVec,
191 pub compact_task_size: HistogramVec,
192 pub compact_task_file_count: HistogramVec,
193 pub compact_task_batch_count: HistogramVec,
194 pub split_compaction_group_count: IntCounterVec,
195 pub state_table_count: IntGaugeVec,
196 pub branched_sst_count: IntGaugeVec,
197 pub compact_task_trivial_move_sst_count: HistogramVec,
198
199 pub compaction_event_consumed_latency: Histogram,
200 pub compaction_event_loop_iteration_latency: Histogram,
201 pub time_travel_vacuum_metadata_latency: Histogram,
202 pub time_travel_write_metadata_latency: Histogram,
203
204 pub object_store_metric: Arc<ObjectStoreMetrics>,
207
208 pub source_is_up: LabelGuardedIntGaugeVec,
211 pub source_enumerator_metrics: Arc<SourceEnumeratorMetrics>,
212
213 pub actor_info: IntGaugeVec,
216 pub table_info: IntGaugeVec,
218 pub sink_info: IntGaugeVec,
220 pub relation_info: IntGaugeVec,
222 pub database_info: IntGaugeVec,
224 pub backfill_fragment_progress: IntGaugeVec,
226 pub streaming_table_change_log_retention_seconds: IntGaugeVec,
228
229 pub system_param_info: IntGaugeVec,
233
234 pub table_write_throughput: IntCounterVec,
236
237 pub merge_compaction_group_count: IntCounterVec,
239
240 pub auto_schema_change_failure_cnt: LabelGuardedIntCounterVec,
242 pub auto_schema_change_success_cnt: LabelGuardedIntCounterVec,
243 pub auto_schema_change_latency: LabelGuardedHistogramVec,
244
245 pub time_travel_version_replay_latency: Histogram,
246
247 pub compaction_group_count: IntGauge,
248 pub compaction_group_size: IntGaugeVec,
249 pub compaction_group_file_count: IntGaugeVec,
250 pub compaction_group_throughput: IntGaugeVec,
251
252 pub refresh_job_duration: LabelGuardedUintGaugeVec,
254 pub refresh_job_finish_cnt: LabelGuardedIntCounterVec,
255 pub refresh_cron_job_trigger_cnt: LabelGuardedIntCounterVec,
256 pub refresh_cron_job_miss_cnt: LabelGuardedIntCounterVec,
257}
258
259pub static GLOBAL_META_METRICS: LazyLock<MetaMetrics> =
260 LazyLock::new(|| MetaMetrics::new(&GLOBAL_METRICS_REGISTRY));
261
262impl MetaMetrics {
263 fn new(registry: &Registry) -> Self {
264 let opts = histogram_opts!(
265 "meta_grpc_duration_seconds",
266 "gRPC latency of meta services",
267 exponential_buckets(0.0001, 2.0, 20).unwrap() );
269 let grpc_latency =
270 register_histogram_vec_with_registry!(opts, &["path"], registry).unwrap();
271
272 let opts = histogram_opts!(
273 "meta_barrier_duration_seconds",
274 "barrier latency",
275 exponential_buckets(0.1, 1.5, 20).unwrap() );
277 let barrier_latency =
278 register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
279 .unwrap();
280
281 let opts = histogram_opts!(
282 "meta_barrier_wait_commit_duration_seconds",
283 "barrier_wait_commit_latency",
284 exponential_buckets(0.1, 1.5, 20).unwrap() );
286 let barrier_wait_commit_latency =
287 register_histogram_with_registry!(opts, registry).unwrap();
288
289 let opts = histogram_opts!(
290 "meta_barrier_send_duration_seconds",
291 "barrier send latency",
292 exponential_buckets(0.1, 1.5, 19).unwrap() );
294 let barrier_send_latency =
295 register_guarded_histogram_vec_with_registry!(opts, &["database_id"], registry)
296 .unwrap();
297 let barrier_interval_by_database = register_gauge_vec_with_registry!(
298 "meta_barrier_interval_by_database",
299 "barrier interval of each database",
300 &["database_id"],
301 registry
302 )
303 .unwrap();
304
305 let all_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
306 "all_barrier_nums",
307 "num of of all_barrier",
308 &["database_id"],
309 registry
310 )
311 .unwrap();
312 let in_flight_barrier_nums = register_guarded_int_gauge_vec_with_registry!(
313 "in_flight_barrier_nums",
314 "num of of in_flight_barrier",
315 &["database_id"],
316 registry
317 )
318 .unwrap();
319 let last_committed_barrier_time = register_int_gauge_vec_with_registry!(
320 "last_committed_barrier_time",
321 "The timestamp (UNIX epoch seconds) of the last committed barrier's epoch time.",
322 &["database_id"],
323 registry
324 )
325 .unwrap();
326
327 let opts = histogram_opts!(
329 "meta_snapshot_backfill_barrier_duration_seconds",
330 "snapshot backfill barrier latency",
331 exponential_buckets(0.1, 1.5, 20).unwrap() );
333 let snapshot_backfill_barrier_latency = register_guarded_histogram_vec_with_registry!(
334 opts,
335 &["table_id", "barrier_type"],
336 registry
337 )
338 .unwrap();
339
340 let snapshot_backfill_lag = register_guarded_int_gauge_vec_with_registry!(
341 "meta_snapshot_backfill_upstream_lag",
342 "snapshot backfill upstream_lag",
343 &["table_id"],
344 registry
345 )
346 .unwrap();
347 let snapshot_backfill_inflight_barrier_num = register_guarded_int_gauge_vec_with_registry!(
348 "meta_snapshot_backfill_inflight_barrier_num",
349 "snapshot backfill inflight_barrier_num",
350 &["table_id"],
351 registry
352 )
353 .unwrap();
354
355 let max_committed_epoch = register_int_gauge_with_registry!(
356 "storage_max_committed_epoch",
357 "max committed epoch",
358 registry
359 )
360 .unwrap();
361
362 let min_committed_epoch = register_int_gauge_with_registry!(
363 "storage_min_committed_epoch",
364 "min committed epoch",
365 registry
366 )
367 .unwrap();
368
369 let level_sst_num = register_int_gauge_vec_with_registry!(
370 "storage_level_sst_num",
371 "num of SSTs in each level",
372 &["level_index"],
373 registry
374 )
375 .unwrap();
376
377 let level_compact_cnt = register_int_gauge_vec_with_registry!(
378 "storage_level_compact_cnt",
379 "num of SSTs to be merged to next level in each level",
380 &["level_index"],
381 registry
382 )
383 .unwrap();
384
385 let compact_frequency = register_int_counter_vec_with_registry!(
386 "storage_level_compact_frequency",
387 "The number of compactions from one level to another level that have completed or failed.",
388 &["compactor", "group", "task_type", "result"],
389 registry
390 )
391 .unwrap();
392 let compact_skip_frequency = register_int_counter_vec_with_registry!(
393 "storage_skip_compact_frequency",
394 "The number of compactions from one level to another level that have been skipped.",
395 &["level", "type"],
396 registry
397 )
398 .unwrap();
399
400 let version_size =
401 register_int_gauge_with_registry!("storage_version_size", "version size", registry)
402 .unwrap();
403
404 let current_version_id = register_int_gauge_with_registry!(
405 "storage_current_version_id",
406 "current version id",
407 registry
408 )
409 .unwrap();
410
411 let checkpoint_version_id = register_int_gauge_with_registry!(
412 "storage_checkpoint_version_id",
413 "checkpoint version id",
414 registry
415 )
416 .unwrap();
417
418 let min_pinned_version_id = register_int_gauge_with_registry!(
419 "storage_min_pinned_version_id",
420 "min pinned version id",
421 registry
422 )
423 .unwrap();
424
425 let write_stop_compaction_groups = register_int_gauge_vec_with_registry!(
426 "storage_write_stop_compaction_groups",
427 "compaction groups of write stop state",
428 &["compaction_group_id"],
429 registry
430 )
431 .unwrap();
432
433 let full_gc_trigger_count = register_int_gauge_with_registry!(
434 "storage_full_gc_trigger_count",
435 "the number of attempts to trigger full GC",
436 registry
437 )
438 .unwrap();
439
440 let opts = histogram_opts!(
441 "storage_full_gc_candidate_object_count",
442 "the number of candidate object to delete after scanning object store",
443 exponential_buckets(1.0, 10.0, 6).unwrap()
444 );
445 let full_gc_candidate_object_count =
446 register_histogram_with_registry!(opts, registry).unwrap();
447
448 let opts = histogram_opts!(
449 "storage_full_gc_selected_object_count",
450 "the number of object to delete after filtering by meta node",
451 exponential_buckets(1.0, 10.0, 6).unwrap()
452 );
453 let full_gc_selected_object_count =
454 register_histogram_with_registry!(opts, registry).unwrap();
455
456 let min_safepoint_version_id = register_int_gauge_with_registry!(
457 "storage_min_safepoint_version_id",
458 "min safepoint version id",
459 registry
460 )
461 .unwrap();
462
463 let level_file_size = register_int_gauge_vec_with_registry!(
464 "storage_level_total_file_size",
465 "KBs total file bytes in each level",
466 &["level_index"],
467 registry
468 )
469 .unwrap();
470
471 let version_stats = register_int_gauge_vec_with_registry!(
472 "storage_version_stats",
473 "per table stats in current hummock version",
474 &["table_id", "metric"],
475 registry
476 )
477 .unwrap();
478
479 let materialized_view_stats = register_int_gauge_vec_with_registry!(
480 "storage_materialized_view_stats",
481 "per materialized view stats in current hummock version",
482 &["table_id", "metric"],
483 registry
484 )
485 .unwrap();
486
487 let stale_object_count = register_int_gauge_with_registry!(
488 "storage_stale_object_count",
489 "total number of objects that is no longer referenced by versions.",
490 registry
491 )
492 .unwrap();
493
494 let stale_object_size = register_int_gauge_with_registry!(
495 "storage_stale_object_size",
496 "total size of objects that is no longer referenced by versions.",
497 registry
498 )
499 .unwrap();
500
501 let old_version_object_count = register_int_gauge_with_registry!(
502 "storage_old_version_object_count",
503 "total number of objects that is still referenced by non-current versions",
504 registry
505 )
506 .unwrap();
507
508 let old_version_object_size = register_int_gauge_with_registry!(
509 "storage_old_version_object_size",
510 "total size of objects that is still referenced by non-current versions",
511 registry
512 )
513 .unwrap();
514
515 let current_version_object_count = register_int_gauge_with_registry!(
516 "storage_current_version_object_count",
517 "total number of objects that is referenced by current version",
518 registry
519 )
520 .unwrap();
521
522 let current_version_object_size = register_int_gauge_with_registry!(
523 "storage_current_version_object_size",
524 "total size of objects that is referenced by current version",
525 registry
526 )
527 .unwrap();
528
529 let total_object_count = register_int_gauge_with_registry!(
530 "storage_total_object_count",
531 "Total number of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
532 registry
533 ).unwrap();
534
535 let total_object_size = register_int_gauge_with_registry!(
536 "storage_total_object_size",
537 "Total size of objects that includes dangling objects. Note that the metric is updated right before full GC. So subsequent full GC may reduce the actual value significantly, without updating the metric.",
538 registry
539 ).unwrap();
540
541 let table_change_log_object_count = register_int_gauge_vec_with_registry!(
542 "storage_table_change_log_object_count",
543 "per table change log object count",
544 &["table_id"],
545 registry
546 )
547 .unwrap();
548
549 let table_change_log_object_size = register_int_gauge_vec_with_registry!(
550 "storage_table_change_log_object_size",
551 "per table change log object size",
552 &["table_id"],
553 registry
554 )
555 .unwrap();
556
557 let table_change_log_min_epoch = register_int_gauge_vec_with_registry!(
558 "storage_table_change_log_min_epoch",
559 "min epoch currently retained in table change log",
560 &["table_id"],
561 registry
562 )
563 .unwrap();
564
565 let time_travel_object_count = register_int_gauge_with_registry!(
566 "storage_time_travel_object_count",
567 "total number of objects that is referenced by time travel.",
568 registry
569 )
570 .unwrap();
571
572 let delta_log_count = register_int_gauge_with_registry!(
573 "storage_delta_log_count",
574 "total number of hummock version delta log",
575 registry
576 )
577 .unwrap();
578
579 let opts = histogram_opts!(
580 "storage_version_checkpoint_latency",
581 "hummock version checkpoint latency",
582 exponential_buckets(0.1, 1.5, 20).unwrap()
583 );
584 let version_checkpoint_latency = register_histogram_with_registry!(opts, registry).unwrap();
585
586 let hummock_manager_lock_time = register_histogram_vec_with_registry!(
587 "hummock_manager_lock_time",
588 "latency for hummock manager to acquire the rwlock",
589 &["lock_name", "lock_type"],
590 registry
591 )
592 .unwrap();
593
594 let hummock_manager_real_process_time = register_histogram_vec_with_registry!(
595 "meta_hummock_manager_real_process_time",
596 "latency for hummock manager to really process the request",
597 &["method"],
598 registry
599 )
600 .unwrap();
601
602 let worker_num = register_int_gauge_vec_with_registry!(
603 "worker_num",
604 "number of nodes in the cluster",
605 &["worker_type"],
606 registry,
607 )
608 .unwrap();
609
610 let meta_type = register_int_gauge_vec_with_registry!(
611 "meta_num",
612 "role of meta nodes in the cluster",
613 &["worker_addr", "role"],
614 registry,
615 )
616 .unwrap();
617
618 let compact_pending_bytes = register_int_gauge_vec_with_registry!(
619 "storage_compact_pending_bytes",
620 "bytes of lsm tree needed to reach balance",
621 &["group"],
622 registry
623 )
624 .unwrap();
625
626 let compact_level_compression_ratio = register_gauge_vec_with_registry!(
627 "storage_compact_level_compression_ratio",
628 "compression ratio of each level of the lsm tree",
629 &["group", "level", "algorithm"],
630 registry
631 )
632 .unwrap();
633
634 let level_compact_task_cnt = register_int_gauge_vec_with_registry!(
635 "storage_level_compact_task_cnt",
636 "num of compact_task organized by group and level",
637 &["task"],
638 registry
639 )
640 .unwrap();
641
642 let time_travel_vacuum_metadata_latency = register_histogram_with_registry!(
643 histogram_opts!(
644 "storage_time_travel_vacuum_metadata_latency",
645 "Latency of vacuuming metadata for time travel",
646 exponential_buckets(0.1, 1.5, 20).unwrap()
647 ),
648 registry
649 )
650 .unwrap();
651 let time_travel_write_metadata_latency = register_histogram_with_registry!(
652 histogram_opts!(
653 "storage_time_travel_write_metadata_latency",
654 "Latency of writing metadata for time travel",
655 exponential_buckets(0.1, 1.5, 20).unwrap()
656 ),
657 registry
658 )
659 .unwrap();
660
661 let object_store_metric = Arc::new(GLOBAL_OBJECT_STORE_METRICS.clone());
662
663 let recovery_failure_cnt = register_int_counter_vec_with_registry!(
664 "recovery_failure_cnt",
665 "Number of failed recovery attempts",
666 &["recovery_type"],
667 registry
668 )
669 .unwrap();
670 let opts = histogram_opts!(
671 "recovery_latency",
672 "Latency of the recovery process",
673 exponential_buckets(0.1, 1.5, 20).unwrap() );
675 let recovery_latency =
676 register_histogram_vec_with_registry!(opts, &["recovery_type"], registry).unwrap();
677
678 let auto_schema_change_failure_cnt = register_guarded_int_counter_vec_with_registry!(
679 "auto_schema_change_failure_cnt",
680 "Number of failed auto schema change",
681 &["table_id", "table_name"],
682 registry
683 )
684 .unwrap();
685
686 let auto_schema_change_success_cnt = register_guarded_int_counter_vec_with_registry!(
687 "auto_schema_change_success_cnt",
688 "Number of success auto schema change",
689 &["table_id", "table_name"],
690 registry
691 )
692 .unwrap();
693
694 let opts = histogram_opts!(
695 "auto_schema_change_latency",
696 "Latency of the auto schema change process",
697 exponential_buckets(0.1, 1.5, 20).unwrap() );
699 let auto_schema_change_latency = register_guarded_histogram_vec_with_registry!(
700 opts,
701 &["table_id", "table_name"],
702 registry
703 )
704 .unwrap();
705
706 let source_is_up = register_guarded_int_gauge_vec_with_registry!(
707 "source_status_is_up",
708 "source is up or not",
709 &["source_id", "source_name"],
710 registry
711 )
712 .unwrap();
713 let source_enumerator_metrics = Arc::new(SourceEnumeratorMetrics::default());
714
715 let actor_info = register_int_gauge_vec_with_registry!(
716 "actor_info",
717 "Mapping from actor id to (fragment id, compute node)",
718 &["actor_id", "fragment_id", "compute_node"],
719 registry
720 )
721 .unwrap();
722
723 let table_info = register_int_gauge_vec_with_registry!(
724 "table_info",
725 "Mapping from table id to (actor id, table name)",
726 &[
727 "materialized_view_id",
728 "table_id",
729 "fragment_id",
730 "table_name",
731 "table_type",
732 "compaction_group_id"
733 ],
734 registry
735 )
736 .unwrap();
737
738 let sink_info = register_int_gauge_vec_with_registry!(
739 "sink_info",
740 "Mapping from actor id to (actor id, sink name)",
741 &["actor_id", "sink_id", "sink_name",],
742 registry
743 )
744 .unwrap();
745
746 let relation_info = register_int_gauge_vec_with_registry!(
747 "relation_info",
748 "Information of the database relation (table/source/sink/materialized view/index/internal)",
749 &["id", "database", "schema", "name", "resource_group", "type"],
750 registry
751 )
752 .unwrap();
753
754 let streaming_table_change_log_retention_seconds = register_int_gauge_vec_with_registry!(
755 "streaming_table_change_log_retention_seconds",
756 "Max subscription retention configured for the table change log in seconds",
757 &["table_id"],
758 registry
759 )
760 .unwrap();
761
762 let database_info = register_int_gauge_vec_with_registry!(
763 "database_info",
764 "Mapping from database id to database name",
765 &["database_id", "database_name"],
766 registry
767 )
768 .unwrap();
769
770 let backfill_fragment_progress = register_int_gauge_vec_with_registry!(
771 "backfill_fragment_progress",
772 "Backfill progress per fragment",
773 &[
774 "job_id",
775 "fragment_id",
776 "backfill_state_table_id",
777 "backfill_target_relation_id",
778 "backfill_target_relation_name",
779 "backfill_target_relation_type",
780 "backfill_type",
781 "backfill_epoch",
782 "upstream_type",
783 "backfill_progress",
784 ],
785 registry
786 )
787 .unwrap();
788
789 let l0_compact_level_count = register_histogram_vec_with_registry!(
790 "storage_l0_compact_level_count",
791 "level_count of l0 compact task",
792 &["group", "type"],
793 registry
794 )
795 .unwrap();
796
797 let system_param_info = register_int_gauge_vec_with_registry!(
799 "system_param_info",
800 "Information of system parameters",
801 &["name", "value"],
802 registry
803 )
804 .unwrap();
805
806 let opts = histogram_opts!(
807 "storage_compact_task_size",
808 "Total size of compact that have been issued to state store",
809 exponential_buckets(1048576.0, 2.0, 16).unwrap()
810 );
811
812 let compact_task_size =
813 register_histogram_vec_with_registry!(opts, &["group", "type"], registry).unwrap();
814
815 let compact_task_file_count = register_histogram_vec_with_registry!(
816 "storage_compact_task_file_count",
817 "file count of compact task",
818 &["group", "type"],
819 registry
820 )
821 .unwrap();
822 let opts = histogram_opts!(
823 "storage_compact_task_batch_count",
824 "count of compact task batch",
825 exponential_buckets(1.0, 2.0, 8).unwrap()
826 );
827 let compact_task_batch_count =
828 register_histogram_vec_with_registry!(opts, &["type"], registry).unwrap();
829
830 let table_write_throughput = register_int_counter_vec_with_registry!(
831 "storage_commit_write_throughput",
832 "The number of compactions from one level to another level that have been skipped.",
833 &["table_id"],
834 registry
835 )
836 .unwrap();
837
838 let split_compaction_group_count = register_int_counter_vec_with_registry!(
839 "storage_split_compaction_group_count",
840 "Count of trigger split compaction group",
841 &["group"],
842 registry
843 )
844 .unwrap();
845
846 let state_table_count = register_int_gauge_vec_with_registry!(
847 "storage_state_table_count",
848 "Count of stable table per compaction group",
849 &["group"],
850 registry
851 )
852 .unwrap();
853
854 let branched_sst_count = register_int_gauge_vec_with_registry!(
855 "storage_branched_sst_count",
856 "Count of branched sst per compaction group",
857 &["group"],
858 registry
859 )
860 .unwrap();
861
862 let opts = histogram_opts!(
863 "storage_compaction_event_consumed_latency",
864 "The latency(ms) of each event being consumed",
865 exponential_buckets(1.0, 1.5, 30).unwrap() );
867 let compaction_event_consumed_latency =
868 register_histogram_with_registry!(opts, registry).unwrap();
869
870 let opts = histogram_opts!(
871 "storage_compaction_event_loop_iteration_latency",
872 "The latency(ms) of each iteration of the compaction event loop",
873 exponential_buckets(1.0, 1.5, 30).unwrap() );
875 let compaction_event_loop_iteration_latency =
876 register_histogram_with_registry!(opts, registry).unwrap();
877
878 let merge_compaction_group_count = register_int_counter_vec_with_registry!(
879 "storage_merge_compaction_group_count",
880 "Count of trigger merge compaction group",
881 &["group"],
882 registry
883 )
884 .unwrap();
885
886 let opts = histogram_opts!(
887 "storage_time_travel_version_replay_latency",
888 "The latency(ms) of replaying a hummock version for time travel",
889 exponential_buckets(0.01, 10.0, 6).unwrap()
890 );
891 let time_travel_version_replay_latency =
892 register_histogram_with_registry!(opts, registry).unwrap();
893
894 let compaction_group_count = register_int_gauge_with_registry!(
895 "storage_compaction_group_count",
896 "The number of compaction groups",
897 registry,
898 )
899 .unwrap();
900
901 let compaction_group_size = register_int_gauge_vec_with_registry!(
902 "storage_compaction_group_size",
903 "The size of compaction group",
904 &["group"],
905 registry
906 )
907 .unwrap();
908
909 let compaction_group_file_count = register_int_gauge_vec_with_registry!(
910 "storage_compaction_group_file_count",
911 "The file count of compaction group",
912 &["group"],
913 registry
914 )
915 .unwrap();
916
917 let compaction_group_throughput = register_int_gauge_vec_with_registry!(
918 "storage_compaction_group_throughput",
919 "The throughput of compaction group",
920 &["group"],
921 registry
922 )
923 .unwrap();
924
925 let opts = histogram_opts!(
926 "storage_compact_task_trivial_move_sst_count",
927 "sst count of compact trivial-move task",
928 exponential_buckets(1.0, 2.0, 8).unwrap()
929 );
930 let compact_task_trivial_move_sst_count =
931 register_histogram_vec_with_registry!(opts, &["group"], registry).unwrap();
932
933 let refresh_job_duration = register_guarded_uint_gauge_vec_with_registry!(
934 "meta_refresh_job_duration",
935 "The duration of refresh job",
936 &["table_id", "status"],
937 registry
938 )
939 .unwrap();
940 let refresh_job_finish_cnt = register_guarded_int_counter_vec_with_registry!(
941 "meta_refresh_job_finish_cnt",
942 "The number of finished refresh jobs",
943 &["table_id", "status"],
944 registry
945 )
946 .unwrap();
947 let refresh_cron_job_trigger_cnt = register_guarded_int_counter_vec_with_registry!(
948 "meta_refresh_cron_job_trigger_cnt",
949 "The number of cron refresh jobs triggered",
950 &["table_id"],
951 registry
952 )
953 .unwrap();
954 let refresh_cron_job_miss_cnt = register_guarded_int_counter_vec_with_registry!(
955 "meta_refresh_cron_job_miss_cnt",
956 "The number of cron refresh jobs missed",
957 &["table_id"],
958 registry
959 )
960 .unwrap();
961
962 Self {
963 grpc_latency,
964 barrier_latency,
965 barrier_wait_commit_latency,
966 barrier_send_latency,
967 all_barrier_nums,
968 in_flight_barrier_nums,
969 last_committed_barrier_time,
970 barrier_interval_by_database,
971 snapshot_backfill_barrier_latency,
972 snapshot_backfill_lag,
973 snapshot_backfill_inflight_barrier_num,
974 recovery_failure_cnt,
975 recovery_latency,
976
977 max_committed_epoch,
978 min_committed_epoch,
979 level_sst_num,
980 level_compact_cnt,
981 compact_frequency,
982 compact_skip_frequency,
983 level_file_size,
984 version_size,
985 version_stats,
986 materialized_view_stats,
987 stale_object_count,
988 stale_object_size,
989 old_version_object_count,
990 old_version_object_size,
991 time_travel_object_count,
992 current_version_object_count,
993 current_version_object_size,
994 total_object_count,
995 total_object_size,
996 table_change_log_object_count,
997 table_change_log_object_size,
998 table_change_log_min_epoch,
999 delta_log_count,
1000 version_checkpoint_latency,
1001 current_version_id,
1002 checkpoint_version_id,
1003 min_pinned_version_id,
1004 min_safepoint_version_id,
1005 write_stop_compaction_groups,
1006 full_gc_trigger_count,
1007 full_gc_candidate_object_count,
1008 full_gc_selected_object_count,
1009 hummock_manager_lock_time,
1010 hummock_manager_real_process_time,
1011 time_after_last_observation: Arc::new(AtomicU64::new(0)),
1012 worker_num,
1013 meta_type,
1014 compact_pending_bytes,
1015 compact_level_compression_ratio,
1016 level_compact_task_cnt,
1017 object_store_metric,
1018 source_is_up,
1019 source_enumerator_metrics,
1020 actor_info,
1021 table_info,
1022 sink_info,
1023 relation_info,
1024 database_info,
1025 backfill_fragment_progress,
1026 streaming_table_change_log_retention_seconds,
1027 system_param_info,
1028 l0_compact_level_count,
1029 compact_task_size,
1030 compact_task_file_count,
1031 compact_task_batch_count,
1032 compact_task_trivial_move_sst_count,
1033 table_write_throughput,
1034 split_compaction_group_count,
1035 state_table_count,
1036 branched_sst_count,
1037 compaction_event_consumed_latency,
1038 compaction_event_loop_iteration_latency,
1039 auto_schema_change_failure_cnt,
1040 auto_schema_change_success_cnt,
1041 auto_schema_change_latency,
1042 merge_compaction_group_count,
1043 time_travel_version_replay_latency,
1044 compaction_group_count,
1045 compaction_group_size,
1046 compaction_group_file_count,
1047 compaction_group_throughput,
1048 refresh_job_duration,
1049 refresh_job_finish_cnt,
1050 refresh_cron_job_trigger_cnt,
1051 refresh_cron_job_miss_cnt,
1052 time_travel_vacuum_metadata_latency,
1053 time_travel_write_metadata_latency,
1054 }
1055 }
1056
1057 #[cfg(test)]
1058 pub fn for_test(registry: &Registry) -> Self {
1059 Self::new(registry)
1060 }
1061}
1062impl Default for MetaMetrics {
1063 fn default() -> Self {
1064 GLOBAL_META_METRICS.clone()
1065 }
1066}
1067
1068pub async fn refresh_system_param_info_metrics(
1070 system_params_controller: &SystemParamsControllerRef,
1071 meta_metrics: Arc<MetaMetrics>,
1072) {
1073 let params_info = system_params_controller.get_params().await.get_all();
1074
1075 meta_metrics.system_param_info.reset();
1076 for info in params_info {
1077 meta_metrics
1078 .system_param_info
1079 .with_label_values(&[info.name, &info.value])
1080 .set(1);
1081 }
1082}
1083
1084pub fn start_worker_info_monitor(
1085 metadata_manager: MetadataManager,
1086 election_client: ElectionClientRef,
1087 interval: Duration,
1088 meta_metrics: Arc<MetaMetrics>,
1089) -> (JoinHandle<()>, Sender<()>) {
1090 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1091 let join_handle = tokio::spawn(async move {
1092 let mut monitor_interval = tokio::time::interval(interval);
1093 monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1094 loop {
1095 tokio::select! {
1096 _ = monitor_interval.tick() => {},
1098 _ = &mut shutdown_rx => {
1100 tracing::info!("Worker number monitor is stopped");
1101 return;
1102 }
1103 }
1104
1105 let node_map = match metadata_manager.count_worker_node().await {
1106 Ok(node_map) => node_map,
1107 Err(err) => {
1108 tracing::warn!(error = %err.as_report(), "fail to count worker node");
1109 continue;
1110 }
1111 };
1112
1113 meta_metrics.worker_num.reset();
1115 meta_metrics.meta_type.reset();
1116
1117 for (worker_type, worker_num) in node_map {
1118 meta_metrics
1119 .worker_num
1120 .with_label_values(&[(worker_type.as_str_name())])
1121 .set(worker_num as i64);
1122 }
1123 if let Ok(meta_members) = election_client.get_members().await {
1124 meta_metrics
1125 .worker_num
1126 .with_label_values(&[WorkerType::Meta.as_str_name()])
1127 .set(meta_members.len() as i64);
1128 meta_members.into_iter().for_each(|m| {
1129 let role = if m.is_leader { "leader" } else { "follower" };
1130 meta_metrics
1131 .meta_type
1132 .with_label_values(&[m.id.as_str(), role])
1133 .set(1);
1134 });
1135 }
1136 }
1137 });
1138
1139 (join_handle, shutdown_tx)
1140}
1141
1142pub async fn refresh_fragment_info_metrics(
1143 catalog_controller: &CatalogControllerRef,
1144 cluster_controller: &ClusterControllerRef,
1145 hummock_manager: &HummockManagerRef,
1146 meta_metrics: Arc<MetaMetrics>,
1147) {
1148 let worker_nodes = match cluster_controller
1149 .list_workers(Some(WorkerType::ComputeNode.into()), None)
1150 .await
1151 {
1152 Ok(worker_nodes) => worker_nodes,
1153 Err(err) => {
1154 tracing::warn!(error=%err.as_report(), "fail to list worker node");
1155 return;
1156 }
1157 };
1158 let actor_locations = match catalog_controller.list_actor_locations() {
1159 Ok(actor_locations) => actor_locations,
1160 Err(err) => {
1161 tracing::warn!(error=%err.as_report(), "fail to get actor locations");
1162 return;
1163 }
1164 };
1165 let sink_actor_mapping = match catalog_controller.list_sink_actor_mapping().await {
1166 Ok(sink_actor_mapping) => sink_actor_mapping,
1167 Err(err) => {
1168 tracing::warn!(error=%err.as_report(), "fail to get sink actor mapping");
1169 return;
1170 }
1171 };
1172 let fragment_state_tables = match catalog_controller.list_fragment_state_tables().await {
1173 Ok(fragment_state_tables) => fragment_state_tables,
1174 Err(err) => {
1175 tracing::warn!(error=%err.as_report(), "fail to get fragment state tables");
1176 return;
1177 }
1178 };
1179 let table_name_and_type_mapping = match catalog_controller.get_table_name_type_mapping().await {
1180 Ok(mapping) => mapping,
1181 Err(err) => {
1182 tracing::warn!(error=%err.as_report(), "fail to get table name mapping");
1183 return;
1184 }
1185 };
1186
1187 let worker_addr_mapping: HashMap<WorkerId, String> = worker_nodes
1188 .into_iter()
1189 .map(|worker_node| {
1190 let addr = match worker_node.host {
1191 Some(host) => format!("{}:{}", host.host, host.port),
1192 None => "".to_owned(),
1193 };
1194 (worker_node.id, addr)
1195 })
1196 .collect();
1197 let table_compaction_group_id_mapping = hummock_manager
1198 .get_table_compaction_group_id_mapping()
1199 .await;
1200
1201 meta_metrics.actor_info.reset();
1204 meta_metrics.table_info.reset();
1205 meta_metrics.sink_info.reset();
1206 for actor_location in actor_locations {
1207 let actor_id_str = actor_location.actor_id.to_string();
1208 let fragment_id_str = actor_location.fragment_id.to_string();
1209 if let Some(address) = worker_addr_mapping.get(&actor_location.worker_id) {
1212 meta_metrics
1213 .actor_info
1214 .with_label_values(&[&actor_id_str, &fragment_id_str, address])
1215 .set(1);
1216 }
1217 }
1218 for (sink_id, (sink_name, actor_ids)) in sink_actor_mapping {
1219 let sink_id_str = sink_id.to_string();
1220 for actor_id in actor_ids {
1221 let actor_id_str = actor_id.to_string();
1222 meta_metrics
1223 .sink_info
1224 .with_label_values(&[&actor_id_str, &sink_id_str, &sink_name])
1225 .set(1);
1226 }
1227 }
1228 for PartialFragmentStateTables {
1229 fragment_id,
1230 job_id,
1231 state_table_ids,
1232 } in fragment_state_tables
1233 {
1234 let fragment_id_str = fragment_id.to_string();
1235 let job_id_str = job_id.to_string();
1236 for table_id in state_table_ids.into_inner() {
1237 let table_id_str = table_id.to_string();
1238 let (table_name, table_type) = table_name_and_type_mapping
1239 .get(&table_id)
1240 .cloned()
1241 .unwrap_or_else(|| ("unknown".to_owned(), "unknown".to_owned()));
1242 let compaction_group_id = table_compaction_group_id_mapping
1243 .get(&table_id)
1244 .map(|cg_id| cg_id.to_string())
1245 .unwrap_or_else(|| "unknown".to_owned());
1246 meta_metrics
1247 .table_info
1248 .with_label_values(&[
1249 &job_id_str,
1250 &table_id_str,
1251 &fragment_id_str,
1252 &table_name,
1253 &table_type,
1254 &compaction_group_id,
1255 ])
1256 .set(1);
1257 }
1258 }
1259}
1260
1261pub async fn refresh_relation_info_metrics(
1262 catalog_controller: &CatalogControllerRef,
1263 meta_metrics: Arc<MetaMetrics>,
1264) {
1265 let table_objects = match catalog_controller.list_table_objects().await {
1266 Ok(table_objects) => table_objects,
1267 Err(err) => {
1268 tracing::warn!(error=%err.as_report(), "fail to get table objects");
1269 return;
1270 }
1271 };
1272
1273 let source_objects = match catalog_controller.list_source_objects().await {
1274 Ok(source_objects) => source_objects,
1275 Err(err) => {
1276 tracing::warn!(error=%err.as_report(), "fail to get source objects");
1277 return;
1278 }
1279 };
1280
1281 let sink_objects = match catalog_controller.list_sink_objects().await {
1282 Ok(sink_objects) => sink_objects,
1283 Err(err) => {
1284 tracing::warn!(error=%err.as_report(), "fail to get sink objects");
1285 return;
1286 }
1287 };
1288 let subscriptions = match catalog_controller.list_subscriptions().await {
1289 Ok(subscriptions) => subscriptions,
1290 Err(err) => {
1291 tracing::warn!(error=%err.as_report(), "fail to get subscription objects");
1292 return;
1293 }
1294 };
1295
1296 meta_metrics.relation_info.reset();
1297 meta_metrics
1298 .streaming_table_change_log_retention_seconds
1299 .reset();
1300
1301 for (id, db, schema, name, resource_group, table_type) in table_objects {
1302 let relation_type = match table_type {
1303 TableType::Table => "table",
1304 TableType::MaterializedView => "materialized_view",
1305 TableType::Index | TableType::VectorIndex => "index",
1306 TableType::Internal => "internal",
1307 };
1308 meta_metrics
1309 .relation_info
1310 .with_label_values(&[
1311 &id.to_string(),
1312 &db,
1313 &schema,
1314 &name,
1315 &resource_group,
1316 &relation_type.to_owned(),
1317 ])
1318 .set(1);
1319 }
1320
1321 for (id, db, schema, name, resource_group) in source_objects {
1322 meta_metrics
1323 .relation_info
1324 .with_label_values(&[
1325 &id.to_string(),
1326 &db,
1327 &schema,
1328 &name,
1329 &resource_group,
1330 &"source".to_owned(),
1331 ])
1332 .set(1);
1333 }
1334
1335 for (id, db, schema, name, resource_group) in sink_objects {
1336 meta_metrics
1337 .relation_info
1338 .with_label_values(&[
1339 &id.to_string(),
1340 &db,
1341 &schema,
1342 &name,
1343 &resource_group,
1344 &"sink".to_owned(),
1345 ])
1346 .set(1);
1347 }
1348
1349 let mut max_retention_by_table = HashMap::new();
1350 for subscription in subscriptions {
1351 max_retention_by_table
1352 .entry(subscription.dependent_table_id)
1353 .and_modify(|retention: &mut u64| {
1354 *retention = (*retention).max(subscription.retention_seconds);
1355 })
1356 .or_insert(subscription.retention_seconds);
1357 }
1358 for (table_id, retention_seconds) in max_retention_by_table {
1359 meta_metrics
1360 .streaming_table_change_log_retention_seconds
1361 .with_label_values(&[&table_id.to_string()])
1362 .set(retention_seconds as _);
1363 }
1364}
1365
1366pub async fn refresh_database_info_metrics(
1367 catalog_controller: &CatalogControllerRef,
1368 meta_metrics: Arc<MetaMetrics>,
1369) {
1370 let databases = match catalog_controller.list_databases().await {
1371 Ok(databases) => databases,
1372 Err(err) => {
1373 tracing::warn!(error=%err.as_report(), "fail to get databases");
1374 return;
1375 }
1376 };
1377
1378 meta_metrics.database_info.reset();
1379
1380 for db in databases {
1381 meta_metrics
1382 .database_info
1383 .with_label_values(&[&db.id.to_string(), &db.name])
1384 .set(1);
1385 }
1386}
1387
1388fn extract_backfill_fragment_info(
1389 distribution: &FragmentDistribution,
1390) -> Option<BackfillFragmentInfo> {
1391 let backfill_type =
1392 if distribution.fragment_type_mask & FragmentTypeFlag::SourceScan as u32 != 0 {
1393 "SOURCE"
1394 } else if distribution.fragment_type_mask
1395 & (FragmentTypeFlag::SnapshotBackfillStreamScan as u32
1396 | FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32)
1397 != 0
1398 {
1399 "SNAPSHOT_BACKFILL"
1400 } else if distribution.fragment_type_mask & FragmentTypeFlag::StreamScan as u32 != 0 {
1401 "ARRANGEMENT_OR_NO_SHUFFLE"
1402 } else {
1403 return None;
1404 };
1405
1406 let stream_node = distribution.node.as_ref()?;
1407 let mut info = None;
1408 match backfill_type {
1409 "SOURCE" => {
1410 visit_stream_node_source_backfill(stream_node, |node| {
1411 info = Some(BackfillFragmentInfo {
1412 job_id: distribution.table_id.as_raw_id(),
1413 fragment_id: distribution.fragment_id.as_raw_id(),
1414 backfill_state_table_id: node
1415 .state_table
1416 .as_ref()
1417 .map(|table| table.id.as_raw_id())
1418 .unwrap_or_default(),
1419 backfill_target_relation_id: node.upstream_source_id.as_raw_id(),
1420 backfill_type,
1421 backfill_epoch: 0,
1422 });
1423 });
1424 }
1425 "SNAPSHOT_BACKFILL" | "ARRANGEMENT_OR_NO_SHUFFLE" => {
1426 visit_stream_node_stream_scan(stream_node, |node| {
1427 info = Some(BackfillFragmentInfo {
1428 job_id: distribution.table_id.as_raw_id(),
1429 fragment_id: distribution.fragment_id.as_raw_id(),
1430 backfill_state_table_id: node
1431 .state_table
1432 .as_ref()
1433 .map(|table| table.id.as_raw_id())
1434 .unwrap_or_default(),
1435 backfill_target_relation_id: node.table_id.as_raw_id(),
1436 backfill_type,
1437 backfill_epoch: node.snapshot_backfill_epoch.unwrap_or_default(),
1438 });
1439 });
1440 }
1441 _ => {}
1442 }
1443
1444 info
1445}
1446
1447pub async fn refresh_backfill_progress_metrics(
1448 catalog_controller: &CatalogControllerRef,
1449 hummock_manager: &HummockManagerRef,
1450 barrier_manager: &BarrierManagerRef,
1451 meta_metrics: Arc<MetaMetrics>,
1452) {
1453 let fragment_descs = match catalog_controller.list_fragment_descs_with_node(true).await {
1454 Ok(fragment_descs) => fragment_descs,
1455 Err(err) => {
1456 tracing::warn!(error=%err.as_report(), "fail to list creating fragment descs");
1457 return;
1458 }
1459 };
1460
1461 let backfill_infos: HashMap<(u32, u32), BackfillFragmentInfo> = fragment_descs
1462 .iter()
1463 .filter_map(|(distribution, _)| extract_backfill_fragment_info(distribution))
1464 .map(|info| ((info.job_id, info.fragment_id), info))
1465 .collect();
1466
1467 let fragment_progresses = match barrier_manager.get_fragment_backfill_progress().await {
1468 Ok(progress) => progress,
1469 Err(err) => {
1470 tracing::warn!(error=%err.as_report(), "fail to get fragment backfill progress");
1471 return;
1472 }
1473 };
1474
1475 let progress_by_fragment: HashMap<(u32, u32), _> = fragment_progresses
1476 .into_iter()
1477 .map(|progress| {
1478 (
1479 (
1480 progress.job_id.as_raw_id(),
1481 progress.fragment_id.as_raw_id(),
1482 ),
1483 progress,
1484 )
1485 })
1486 .collect();
1487
1488 let version_stats = hummock_manager.get_version_stats().await;
1489
1490 let relation_ids: HashSet<_> = backfill_infos
1491 .values()
1492 .map(|info| ObjectId::new(info.backfill_target_relation_id))
1493 .collect();
1494 let relation_objects = match catalog_controller
1495 .list_relation_objects_by_ids(&relation_ids)
1496 .await
1497 {
1498 Ok(relation_objects) => relation_objects,
1499 Err(err) => {
1500 tracing::warn!(error=%err.as_report(), "fail to get relation objects");
1501 return;
1502 }
1503 };
1504
1505 let mut relation_info = HashMap::new();
1506 for (id, db, schema, name, rel_type) in relation_objects {
1507 relation_info.insert(id.as_raw_id(), (db, schema, name, rel_type));
1508 }
1509
1510 meta_metrics.backfill_fragment_progress.reset();
1511
1512 for info in backfill_infos.values() {
1513 let progress = progress_by_fragment.get(&(info.job_id, info.fragment_id));
1514 let (db, schema, name, rel_type) = relation_info
1515 .get(&info.backfill_target_relation_id)
1516 .cloned()
1517 .unwrap_or_else(|| {
1518 (
1519 "unknown".to_owned(),
1520 "unknown".to_owned(),
1521 "unknown".to_owned(),
1522 "unknown".to_owned(),
1523 )
1524 });
1525
1526 let job_id_str = info.job_id.to_string();
1527 let fragment_id_str = info.fragment_id.to_string();
1528 let backfill_state_table_id_str = info.backfill_state_table_id.to_string();
1529 let backfill_target_relation_id_str = info.backfill_target_relation_id.to_string();
1530 let backfill_target_relation_name_str = format!("{db}.{schema}.{name}");
1531 let backfill_target_relation_type_str = rel_type;
1532 let backfill_type_str = info.backfill_type.to_owned();
1533 let backfill_epoch_str = info.backfill_epoch.to_string();
1534 let total_key_count = version_stats
1535 .table_stats
1536 .get(&TableId::new(info.backfill_target_relation_id))
1537 .map(|stats| stats.total_key_count as u64);
1538
1539 let progress_label = match (info.backfill_type, progress) {
1540 ("SOURCE", Some(progress)) => format!("{} consumed rows", progress.consumed_rows),
1541 ("SOURCE", None) => "0 consumed rows".to_owned(),
1542 (_, Some(progress)) if progress.done => {
1543 let total = total_key_count.unwrap_or(0);
1544 format!("100.0000% ({}/{})", total, total)
1545 }
1546 (_, Some(progress)) => {
1547 let total = total_key_count.unwrap_or(0);
1548 if total == 0 {
1549 "100.0000% (0/0)".to_owned()
1550 } else {
1551 let raw = (progress.consumed_rows as f64) / (total as f64) * 100.0;
1552 format!(
1553 "{:.4}% ({}/{})",
1554 raw.min(100.0),
1555 progress.consumed_rows,
1556 total
1557 )
1558 }
1559 }
1560 (_, None) => "0.0000% (0/0)".to_owned(),
1561 };
1562 let upstream_type_str = progress
1563 .map(|progress| progress.upstream_type.to_string())
1564 .unwrap_or_else(|| "Unknown".to_owned());
1565
1566 meta_metrics
1567 .backfill_fragment_progress
1568 .with_label_values(&[
1569 &job_id_str,
1570 &fragment_id_str,
1571 &backfill_state_table_id_str,
1572 &backfill_target_relation_id_str,
1573 &backfill_target_relation_name_str,
1574 &backfill_target_relation_type_str,
1575 &backfill_type_str,
1576 &backfill_epoch_str,
1577 &upstream_type_str,
1578 &progress_label,
1579 ])
1580 .set(1);
1581 }
1582}
1583
1584pub fn start_info_monitor(
1585 metadata_manager: MetadataManager,
1586 hummock_manager: HummockManagerRef,
1587 barrier_manager: BarrierManagerRef,
1588 system_params_controller: SystemParamsControllerRef,
1589 meta_metrics: Arc<MetaMetrics>,
1590) -> (JoinHandle<()>, Sender<()>) {
1591 const COLLECT_INTERVAL_SECONDS: u64 = 60;
1592
1593 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1594 let join_handle = tokio::spawn(async move {
1595 let mut monitor_interval =
1596 tokio::time::interval(Duration::from_secs(COLLECT_INTERVAL_SECONDS));
1597 monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1598 loop {
1599 tokio::select! {
1600 _ = monitor_interval.tick() => {},
1602 _ = &mut shutdown_rx => {
1604 tracing::info!("Meta info monitor is stopped");
1605 return;
1606 }
1607 }
1608
1609 refresh_fragment_info_metrics(
1611 &metadata_manager.catalog_controller,
1612 &metadata_manager.cluster_controller,
1613 &hummock_manager,
1614 meta_metrics.clone(),
1615 )
1616 .await;
1617
1618 refresh_relation_info_metrics(
1619 &metadata_manager.catalog_controller,
1620 meta_metrics.clone(),
1621 )
1622 .await;
1623
1624 refresh_database_info_metrics(
1625 &metadata_manager.catalog_controller,
1626 meta_metrics.clone(),
1627 )
1628 .await;
1629
1630 refresh_backfill_progress_metrics(
1631 &metadata_manager.catalog_controller,
1632 &hummock_manager,
1633 &barrier_manager,
1634 meta_metrics.clone(),
1635 )
1636 .await;
1637
1638 refresh_system_param_info_metrics(&system_params_controller, meta_metrics.clone())
1640 .await;
1641 }
1642 });
1643
1644 (join_handle, shutdown_tx)
1645}