1use std::sync::{Arc, LazyLock};
16
17use prometheus::{
18 IntCounterVec, Registry, exponential_buckets, histogram_opts,
19 register_int_counter_vec_with_registry,
20};
21use risingwave_common::metrics::{
22 LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
23};
24use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
25use risingwave_common::{
26 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
27 register_guarded_int_gauge_vec_with_registry,
28};
29
30use crate::source::kafka::stats::RdKafkaStats;
31
32#[derive(Debug, Clone)]
33pub struct EnumeratorMetrics {
34 pub high_watermark: LabelGuardedIntGaugeVec,
35 pub pg_cdc_confirmed_flush_lsn: LabelGuardedIntGaugeVec,
37 pub pg_cdc_upstream_max_lsn: LabelGuardedIntGaugeVec,
39 pub mysql_cdc_binlog_file_seq_min: LabelGuardedIntGaugeVec,
41 pub mysql_cdc_binlog_file_seq_max: LabelGuardedIntGaugeVec,
43 pub sqlserver_cdc_upstream_min_lsn: LabelGuardedIntGaugeVec,
45 pub sqlserver_cdc_upstream_max_lsn: LabelGuardedIntGaugeVec,
47}
48
49pub static GLOBAL_ENUMERATOR_METRICS: LazyLock<EnumeratorMetrics> =
50 LazyLock::new(|| EnumeratorMetrics::new(&GLOBAL_METRICS_REGISTRY));
51
52impl EnumeratorMetrics {
53 fn new(registry: &Registry) -> Self {
54 let high_watermark = register_guarded_int_gauge_vec_with_registry!(
55 "source_kafka_high_watermark",
56 "High watermark for a exec per partition",
57 &["source_id", "partition"],
58 registry,
59 )
60 .unwrap();
61
62 let pg_cdc_confirmed_flush_lsn = register_guarded_int_gauge_vec_with_registry!(
63 "pg_cdc_confirmed_flush_lsn",
64 "PostgreSQL CDC confirmed flush LSN",
65 &["source_id", "slot_name"],
66 registry,
67 )
68 .unwrap();
69
70 let pg_cdc_upstream_max_lsn = register_guarded_int_gauge_vec_with_registry!(
71 "pg_cdc_upstream_max_lsn",
72 "PostgreSQL CDC upstream max LSN (pg_current_wal_lsn)",
73 &["source_id", "slot_name"],
74 registry,
75 )
76 .unwrap();
77
78 let mysql_cdc_binlog_file_seq_min = register_guarded_int_gauge_vec_with_registry!(
79 "mysql_cdc_binlog_file_seq_min",
80 "MySQL CDC upstream binlog file sequence number (minimum/oldest)",
81 &["hostname", "port"],
82 registry,
83 )
84 .unwrap();
85
86 let mysql_cdc_binlog_file_seq_max = register_guarded_int_gauge_vec_with_registry!(
87 "mysql_cdc_binlog_file_seq_max",
88 "MySQL CDC upstream binlog file sequence number (maximum/newest)",
89 &["hostname", "port"],
90 registry,
91 )
92 .unwrap();
93
94 let sqlserver_cdc_upstream_min_lsn = register_guarded_int_gauge_vec_with_registry!(
95 "sqlserver_cdc_upstream_min_lsn",
96 "SQL Server CDC upstream minimum LSN",
97 &["source_id"],
98 registry,
99 )
100 .unwrap();
101
102 let sqlserver_cdc_upstream_max_lsn = register_guarded_int_gauge_vec_with_registry!(
103 "sqlserver_cdc_upstream_max_lsn",
104 "SQL Server CDC upstream maximum LSN",
105 &["source_id"],
106 registry,
107 )
108 .unwrap();
109
110 EnumeratorMetrics {
111 high_watermark,
112 pg_cdc_confirmed_flush_lsn,
113 pg_cdc_upstream_max_lsn,
114 mysql_cdc_binlog_file_seq_min,
115 mysql_cdc_binlog_file_seq_max,
116 sqlserver_cdc_upstream_min_lsn,
117 sqlserver_cdc_upstream_max_lsn,
118 }
119 }
120
121 pub fn unused() -> Self {
122 Default::default()
123 }
124}
125
126impl Default for EnumeratorMetrics {
127 fn default() -> Self {
128 GLOBAL_ENUMERATOR_METRICS.clone()
129 }
130}
131
132#[derive(Debug, Clone)]
133pub struct SourceMetrics {
134 pub partition_input_count: LabelGuardedIntCounterVec,
135
136 pub partition_input_bytes: LabelGuardedIntCounterVec,
139 pub latest_message_id: LabelGuardedIntGaugeVec,
141 pub partition_eof_count: LabelGuardedIntCounterVec,
142 pub partition_eof_offset: LabelGuardedIntGaugeVec,
143 pub rdkafka_native_metric: Arc<RdKafkaStats>,
144
145 pub direct_cdc_event_lag_latency: LabelGuardedHistogramVec,
146
147 pub parquet_source_skip_row_count: LabelGuardedIntCounterVec,
148 pub file_source_input_row_count: LabelGuardedIntCounterVec,
149 pub file_source_dirty_split_count: LabelGuardedIntGaugeVec,
150 pub file_source_failed_split_count: LabelGuardedIntCounterVec,
151
152 pub kinesis_throughput_exceeded_count: LabelGuardedIntCounterVec,
154 pub kinesis_timeout_count: LabelGuardedIntCounterVec,
155 pub kinesis_rebuild_shard_iter_count: LabelGuardedIntCounterVec,
156 pub kinesis_early_terminate_shard_count: LabelGuardedIntCounterVec,
157 pub kinesis_lag_latency_ms: LabelGuardedHistogramVec,
158
159 pub connector_ack_failure_count: IntCounterVec,
161}
162
163pub static GLOBAL_SOURCE_METRICS: LazyLock<SourceMetrics> =
164 LazyLock::new(|| SourceMetrics::new(&GLOBAL_METRICS_REGISTRY));
165
166impl SourceMetrics {
167 fn new(registry: &Registry) -> Self {
168 let partition_input_count = register_guarded_int_counter_vec_with_registry!(
169 "source_partition_input_count",
170 "Total number of rows that have been input from specific partition",
171 &[
172 "actor_id",
173 "source_id",
174 "partition",
175 "source_name",
176 "fragment_id"
177 ],
178 registry
179 )
180 .unwrap();
181 let partition_input_bytes = register_guarded_int_counter_vec_with_registry!(
182 "source_partition_input_bytes",
183 "Total bytes that have been input from specific partition",
184 &[
185 "actor_id",
186 "source_id",
187 "partition",
188 "source_name",
189 "fragment_id"
190 ],
191 registry
192 )
193 .unwrap();
194 let latest_message_id = register_guarded_int_gauge_vec_with_registry!(
195 "source_latest_message_id",
196 "Latest message id for a exec per partition",
197 &["source_id", "actor_id", "partition"],
198 registry,
199 )
200 .unwrap();
201 let partition_eof_count = register_guarded_int_counter_vec_with_registry!(
202 "source_partition_eof_count",
203 "Total number of EOF events received from specific partition",
204 &["source_id", "partition", "source_name", "fragment_id"],
205 registry
206 )
207 .unwrap();
208 let partition_eof_offset = register_guarded_int_gauge_vec_with_registry!(
209 "source_partition_eof_offset",
210 "Latest resolved EOF offset for specific partition",
211 &["source_id", "partition", "source_name", "fragment_id"],
212 registry
213 )
214 .unwrap();
215
216 let opts = histogram_opts!(
217 "source_cdc_event_lag_duration_milliseconds",
218 "source_cdc_lag_latency",
219 exponential_buckets(1.0, 2.0, 21).unwrap(), );
221
222 let parquet_source_skip_row_count = register_guarded_int_counter_vec_with_registry!(
223 "parquet_source_skip_row_count",
224 "Total number of rows that have been set to null in parquet source",
225 &["actor_id", "source_id", "source_name", "fragment_id"],
226 registry
227 )
228 .unwrap();
229
230 let direct_cdc_event_lag_latency =
231 register_guarded_histogram_vec_with_registry!(opts, &["table_name"], registry).unwrap();
232
233 let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone()));
234
235 let file_source_input_row_count = register_guarded_int_counter_vec_with_registry!(
236 "file_source_input_row_count",
237 "Total number of rows that have been read in file source",
238 &["source_id", "source_name", "actor_id", "fragment_id"],
239 registry
240 )
241 .unwrap();
242 let file_source_dirty_split_count = register_guarded_int_gauge_vec_with_registry!(
243 "file_source_dirty_split_count",
244 "Current number of dirty file splits in file source",
245 &["source_id", "source_name", "actor_id", "fragment_id"],
246 registry
247 )
248 .unwrap();
249 let file_source_failed_split_count = register_guarded_int_counter_vec_with_registry!(
250 "file_source_failed_split_count",
251 "Total number of file splits marked dirty in file source",
252 &["source_id", "source_name", "actor_id", "fragment_id"],
253 registry
254 )
255 .unwrap();
256
257 let kinesis_throughput_exceeded_count = register_guarded_int_counter_vec_with_registry!(
258 "kinesis_throughput_exceeded_count",
259 "Total number of times throughput exceeded in kinesis source",
260 &["source_id", "source_name", "fragment_id", "shard_id"],
261 registry
262 )
263 .unwrap();
264
265 let kinesis_timeout_count = register_guarded_int_counter_vec_with_registry!(
266 "kinesis_timeout_count",
267 "Total number of times timeout in kinesis source",
268 &["source_id", "source_name", "fragment_id", "shard_id"],
269 registry
270 )
271 .unwrap();
272
273 let kinesis_rebuild_shard_iter_count = register_guarded_int_counter_vec_with_registry!(
274 "kinesis_rebuild_shard_iter_count",
275 "Total number of times rebuild shard iter in kinesis source",
276 &["source_id", "source_name", "fragment_id", "shard_id"],
277 registry
278 )
279 .unwrap();
280
281 let kinesis_early_terminate_shard_count = register_guarded_int_counter_vec_with_registry!(
282 "kinesis_early_terminate_shard_count",
283 "Total number of times early terminate shard in kinesis source",
284 &["source_id", "source_name", "fragment_id", "shard_id"],
285 registry
286 )
287 .unwrap();
288
289 let kinesis_lag_latency_ms = register_guarded_histogram_vec_with_registry!(
290 "kinesis_lag_latency_ms",
291 "Lag latency in kinesis source",
292 &["source_id", "source_name", "fragment_id", "shard_id"],
293 registry
294 )
295 .unwrap();
296
297 let connector_ack_failure_count = register_int_counter_vec_with_registry!(
298 "source_connector_ack_failure_count",
299 "Total number of ack failures during checkpoint for source connectors",
300 &["source_name", "connector_type", "error_type"],
301 registry
302 )
303 .unwrap();
304
305 SourceMetrics {
306 partition_input_count,
307 partition_input_bytes,
308 latest_message_id,
309 partition_eof_count,
310 partition_eof_offset,
311 rdkafka_native_metric,
312 direct_cdc_event_lag_latency,
313 parquet_source_skip_row_count,
314 file_source_input_row_count,
315 file_source_dirty_split_count,
316 file_source_failed_split_count,
317
318 kinesis_throughput_exceeded_count,
319 kinesis_timeout_count,
320 kinesis_rebuild_shard_iter_count,
321 kinesis_early_terminate_shard_count,
322 kinesis_lag_latency_ms,
323
324 connector_ack_failure_count,
325 }
326 }
327}
328
329impl Default for SourceMetrics {
330 fn default() -> Self {
331 GLOBAL_SOURCE_METRICS.clone()
332 }
333}