risingwave_connector/source/monitor/
metrics.rs1use std::sync::{Arc, LazyLock};
16
17use prometheus::{Registry, exponential_buckets, histogram_opts};
18use risingwave_common::metrics::{
19 LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
20};
21use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
22use risingwave_common::{
23 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
24 register_guarded_int_gauge_vec_with_registry,
25};
26
27use crate::source::kafka::stats::RdKafkaStats;
28
29#[derive(Debug, Clone)]
30pub struct EnumeratorMetrics {
31 pub high_watermark: LabelGuardedIntGaugeVec<2>,
32}
33
34pub static GLOBAL_ENUMERATOR_METRICS: LazyLock<EnumeratorMetrics> =
35 LazyLock::new(|| EnumeratorMetrics::new(&GLOBAL_METRICS_REGISTRY));
36
37impl EnumeratorMetrics {
38 fn new(registry: &Registry) -> Self {
39 let high_watermark = register_guarded_int_gauge_vec_with_registry!(
40 "source_kafka_high_watermark",
41 "High watermark for a exec per partition",
42 &["source_id", "partition"],
43 registry,
44 )
45 .unwrap();
46 EnumeratorMetrics { high_watermark }
47 }
48
49 pub fn unused() -> Self {
50 Default::default()
51 }
52}
53
54impl Default for EnumeratorMetrics {
55 fn default() -> Self {
56 GLOBAL_ENUMERATOR_METRICS.clone()
57 }
58}
59
60#[derive(Debug, Clone)]
61pub struct SourceMetrics {
62 pub partition_input_count: LabelGuardedIntCounterVec<5>,
63
64 pub partition_input_bytes: LabelGuardedIntCounterVec<5>,
67 pub latest_message_id: LabelGuardedIntGaugeVec<3>,
69 pub rdkafka_native_metric: Arc<RdKafkaStats>,
70
71 pub direct_cdc_event_lag_latency: LabelGuardedHistogramVec<1>,
72
73 pub parquet_source_skip_row_count: LabelGuardedIntCounterVec<4>,
74 pub file_source_input_row_count: LabelGuardedIntCounterVec<4>,
75}
76
77pub static GLOBAL_SOURCE_METRICS: LazyLock<SourceMetrics> =
78 LazyLock::new(|| SourceMetrics::new(&GLOBAL_METRICS_REGISTRY));
79
80impl SourceMetrics {
81 fn new(registry: &Registry) -> Self {
82 let partition_input_count = register_guarded_int_counter_vec_with_registry!(
83 "source_partition_input_count",
84 "Total number of rows that have been input from specific partition",
85 &[
86 "actor_id",
87 "source_id",
88 "partition",
89 "source_name",
90 "fragment_id"
91 ],
92 registry
93 )
94 .unwrap();
95 let partition_input_bytes = register_guarded_int_counter_vec_with_registry!(
96 "source_partition_input_bytes",
97 "Total bytes that have been input from specific partition",
98 &[
99 "actor_id",
100 "source_id",
101 "partition",
102 "source_name",
103 "fragment_id"
104 ],
105 registry
106 )
107 .unwrap();
108 let latest_message_id = register_guarded_int_gauge_vec_with_registry!(
109 "source_latest_message_id",
110 "Latest message id for a exec per partition",
111 &["source_id", "actor_id", "partition"],
112 registry,
113 )
114 .unwrap();
115
116 let opts = histogram_opts!(
117 "source_cdc_event_lag_duration_milliseconds",
118 "source_cdc_lag_latency",
119 exponential_buckets(1.0, 2.0, 21).unwrap(), );
121
122 let parquet_source_skip_row_count = register_guarded_int_counter_vec_with_registry!(
123 "parquet_source_skip_row_count",
124 "Total number of rows that have been set to null in parquet source",
125 &["actor_id", "source_id", "source_name", "fragment_id"],
126 registry
127 )
128 .unwrap();
129
130 let direct_cdc_event_lag_latency =
131 register_guarded_histogram_vec_with_registry!(opts, &["table_name"], registry).unwrap();
132
133 let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone()));
134
135 let file_source_input_row_count = register_guarded_int_counter_vec_with_registry!(
136 "file_source_input_row_count",
137 "Total number of rows that have been read in file source",
138 &["source_id", "source_name", "actor_id", "fragment_id"],
139 registry
140 )
141 .unwrap();
142 SourceMetrics {
143 partition_input_count,
144 partition_input_bytes,
145 latest_message_id,
146 rdkafka_native_metric,
147 direct_cdc_event_lag_latency,
148 parquet_source_skip_row_count,
149 file_source_input_row_count,
150 }
151 }
152}
153
154impl Default for SourceMetrics {
155 fn default() -> Self {
156 GLOBAL_SOURCE_METRICS.clone()
157 }
158}