risingwave_connector/source/monitor/
metrics.rs1use std::sync::{Arc, LazyLock};
16
17use prometheus::{Registry, exponential_buckets, histogram_opts};
18use risingwave_common::metrics::{
19 LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
20};
21use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
22use risingwave_common::{
23 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
24 register_guarded_int_gauge_vec_with_registry,
25};
26
27use crate::source::kafka::stats::RdKafkaStats;
28
29#[derive(Debug, Clone)]
30pub struct EnumeratorMetrics {
31 pub high_watermark: LabelGuardedIntGaugeVec,
32 pub pg_cdc_confirmed_flush_lsn: LabelGuardedIntGaugeVec,
34 pub pg_cdc_upstream_max_lsn: LabelGuardedIntGaugeVec,
36 pub mysql_cdc_binlog_file_seq_min: LabelGuardedIntGaugeVec,
38 pub mysql_cdc_binlog_file_seq_max: LabelGuardedIntGaugeVec,
40}
41
42pub static GLOBAL_ENUMERATOR_METRICS: LazyLock<EnumeratorMetrics> =
43 LazyLock::new(|| EnumeratorMetrics::new(&GLOBAL_METRICS_REGISTRY));
44
45impl EnumeratorMetrics {
46 fn new(registry: &Registry) -> Self {
47 let high_watermark = register_guarded_int_gauge_vec_with_registry!(
48 "source_kafka_high_watermark",
49 "High watermark for a exec per partition",
50 &["source_id", "partition"],
51 registry,
52 )
53 .unwrap();
54
55 let pg_cdc_confirmed_flush_lsn = register_guarded_int_gauge_vec_with_registry!(
56 "pg_cdc_confirmed_flush_lsn",
57 "PostgreSQL CDC confirmed flush LSN",
58 &["source_id", "slot_name"],
59 registry,
60 )
61 .unwrap();
62
63 let pg_cdc_upstream_max_lsn = register_guarded_int_gauge_vec_with_registry!(
64 "pg_cdc_upstream_max_lsn",
65 "PostgreSQL CDC upstream max LSN (pg_current_wal_lsn)",
66 &["source_id", "slot_name"],
67 registry,
68 )
69 .unwrap();
70
71 let mysql_cdc_binlog_file_seq_min = register_guarded_int_gauge_vec_with_registry!(
72 "mysql_cdc_binlog_file_seq_min",
73 "MySQL CDC upstream binlog file sequence number (minimum/oldest)",
74 &["hostname", "port"],
75 registry,
76 )
77 .unwrap();
78
79 let mysql_cdc_binlog_file_seq_max = register_guarded_int_gauge_vec_with_registry!(
80 "mysql_cdc_binlog_file_seq_max",
81 "MySQL CDC upstream binlog file sequence number (maximum/newest)",
82 &["hostname", "port"],
83 registry,
84 )
85 .unwrap();
86
87 EnumeratorMetrics {
88 high_watermark,
89 pg_cdc_confirmed_flush_lsn,
90 pg_cdc_upstream_max_lsn,
91 mysql_cdc_binlog_file_seq_min,
92 mysql_cdc_binlog_file_seq_max,
93 }
94 }
95
96 pub fn unused() -> Self {
97 Default::default()
98 }
99}
100
101impl Default for EnumeratorMetrics {
102 fn default() -> Self {
103 GLOBAL_ENUMERATOR_METRICS.clone()
104 }
105}
106
107#[derive(Debug, Clone)]
108pub struct SourceMetrics {
109 pub partition_input_count: LabelGuardedIntCounterVec,
110
111 pub partition_input_bytes: LabelGuardedIntCounterVec,
114 pub latest_message_id: LabelGuardedIntGaugeVec,
116 pub rdkafka_native_metric: Arc<RdKafkaStats>,
117
118 pub direct_cdc_event_lag_latency: LabelGuardedHistogramVec,
119
120 pub parquet_source_skip_row_count: LabelGuardedIntCounterVec,
121 pub file_source_input_row_count: LabelGuardedIntCounterVec,
122 pub file_source_dirty_split_count: LabelGuardedIntGaugeVec,
123 pub file_source_failed_split_count: LabelGuardedIntCounterVec,
124
125 pub kinesis_throughput_exceeded_count: LabelGuardedIntCounterVec,
127 pub kinesis_timeout_count: LabelGuardedIntCounterVec,
128 pub kinesis_rebuild_shard_iter_count: LabelGuardedIntCounterVec,
129 pub kinesis_early_terminate_shard_count: LabelGuardedIntCounterVec,
130 pub kinesis_lag_latency_ms: LabelGuardedHistogramVec,
131}
132
133pub static GLOBAL_SOURCE_METRICS: LazyLock<SourceMetrics> =
134 LazyLock::new(|| SourceMetrics::new(&GLOBAL_METRICS_REGISTRY));
135
136impl SourceMetrics {
137 fn new(registry: &Registry) -> Self {
138 let partition_input_count = register_guarded_int_counter_vec_with_registry!(
139 "source_partition_input_count",
140 "Total number of rows that have been input from specific partition",
141 &[
142 "actor_id",
143 "source_id",
144 "partition",
145 "source_name",
146 "fragment_id"
147 ],
148 registry
149 )
150 .unwrap();
151 let partition_input_bytes = register_guarded_int_counter_vec_with_registry!(
152 "source_partition_input_bytes",
153 "Total bytes that have been input from specific partition",
154 &[
155 "actor_id",
156 "source_id",
157 "partition",
158 "source_name",
159 "fragment_id"
160 ],
161 registry
162 )
163 .unwrap();
164 let latest_message_id = register_guarded_int_gauge_vec_with_registry!(
165 "source_latest_message_id",
166 "Latest message id for a exec per partition",
167 &["source_id", "actor_id", "partition"],
168 registry,
169 )
170 .unwrap();
171
172 let opts = histogram_opts!(
173 "source_cdc_event_lag_duration_milliseconds",
174 "source_cdc_lag_latency",
175 exponential_buckets(1.0, 2.0, 21).unwrap(), );
177
178 let parquet_source_skip_row_count = register_guarded_int_counter_vec_with_registry!(
179 "parquet_source_skip_row_count",
180 "Total number of rows that have been set to null in parquet source",
181 &["actor_id", "source_id", "source_name", "fragment_id"],
182 registry
183 )
184 .unwrap();
185
186 let direct_cdc_event_lag_latency =
187 register_guarded_histogram_vec_with_registry!(opts, &["table_name"], registry).unwrap();
188
189 let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone()));
190
191 let file_source_input_row_count = register_guarded_int_counter_vec_with_registry!(
192 "file_source_input_row_count",
193 "Total number of rows that have been read in file source",
194 &["source_id", "source_name", "actor_id", "fragment_id"],
195 registry
196 )
197 .unwrap();
198 let file_source_dirty_split_count = register_guarded_int_gauge_vec_with_registry!(
199 "file_source_dirty_split_count",
200 "Current number of dirty file splits in file source",
201 &["source_id", "source_name", "actor_id", "fragment_id"],
202 registry
203 )
204 .unwrap();
205 let file_source_failed_split_count = register_guarded_int_counter_vec_with_registry!(
206 "file_source_failed_split_count",
207 "Total number of file splits marked dirty in file source",
208 &["source_id", "source_name", "actor_id", "fragment_id"],
209 registry
210 )
211 .unwrap();
212
213 let kinesis_throughput_exceeded_count = register_guarded_int_counter_vec_with_registry!(
214 "kinesis_throughput_exceeded_count",
215 "Total number of times throughput exceeded in kinesis source",
216 &["source_id", "source_name", "fragment_id", "shard_id"],
217 registry
218 )
219 .unwrap();
220
221 let kinesis_timeout_count = register_guarded_int_counter_vec_with_registry!(
222 "kinesis_timeout_count",
223 "Total number of times timeout in kinesis source",
224 &["source_id", "source_name", "fragment_id", "shard_id"],
225 registry
226 )
227 .unwrap();
228
229 let kinesis_rebuild_shard_iter_count = register_guarded_int_counter_vec_with_registry!(
230 "kinesis_rebuild_shard_iter_count",
231 "Total number of times rebuild shard iter in kinesis source",
232 &["source_id", "source_name", "fragment_id", "shard_id"],
233 registry
234 )
235 .unwrap();
236
237 let kinesis_early_terminate_shard_count = register_guarded_int_counter_vec_with_registry!(
238 "kinesis_early_terminate_shard_count",
239 "Total number of times early terminate shard in kinesis source",
240 &["source_id", "source_name", "fragment_id", "shard_id"],
241 registry
242 )
243 .unwrap();
244
245 let kinesis_lag_latency_ms = register_guarded_histogram_vec_with_registry!(
246 "kinesis_lag_latency_ms",
247 "Lag latency in kinesis source",
248 &["source_id", "source_name", "fragment_id", "shard_id"],
249 registry
250 )
251 .unwrap();
252
253 SourceMetrics {
254 partition_input_count,
255 partition_input_bytes,
256 latest_message_id,
257 rdkafka_native_metric,
258 direct_cdc_event_lag_latency,
259 parquet_source_skip_row_count,
260 file_source_input_row_count,
261 file_source_dirty_split_count,
262 file_source_failed_split_count,
263
264 kinesis_throughput_exceeded_count,
265 kinesis_timeout_count,
266 kinesis_rebuild_shard_iter_count,
267 kinesis_early_terminate_shard_count,
268 kinesis_lag_latency_ms,
269 }
270 }
271}
272
273impl Default for SourceMetrics {
274 fn default() -> Self {
275 GLOBAL_SOURCE_METRICS.clone()
276 }
277}