1use std::sync::{Arc, LazyLock};
16
17use prometheus::{
18 IntCounterVec, Registry, exponential_buckets, histogram_opts,
19 register_int_counter_vec_with_registry,
20};
21use risingwave_common::metrics::{
22 LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
23};
24use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
25use risingwave_common::{
26 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
27 register_guarded_int_gauge_vec_with_registry,
28};
29
30use crate::source::kafka::stats::RdKafkaStats;
31
32#[derive(Debug, Clone, Copy)]
37pub enum ConnectorAckFailureType {
38 Error,
39 Timeout,
40 EmptyMessageId,
41 ChannelMissing,
42 ChannelSendError,
43 DecodeError,
44 BrokerError,
45}
46
47impl ConnectorAckFailureType {
48 pub fn as_str(self) -> &'static str {
49 match self {
50 Self::Error => "error",
51 Self::Timeout => "timeout",
52 Self::EmptyMessageId => "empty_message_id",
53 Self::ChannelMissing => "channel_missing",
54 Self::ChannelSendError => "channel_send_error",
55 Self::DecodeError => "decode_error",
56 Self::BrokerError => "broker_error",
57 }
58 }
59}
60
61#[derive(Debug, Clone)]
62pub struct EnumeratorMetrics {
63 pub high_watermark: LabelGuardedIntGaugeVec,
64 pub pg_cdc_confirmed_flush_lsn: LabelGuardedIntGaugeVec,
66 pub pg_cdc_upstream_max_lsn: LabelGuardedIntGaugeVec,
68 pub mysql_cdc_binlog_file_seq_min: LabelGuardedIntGaugeVec,
70 pub mysql_cdc_binlog_file_seq_max: LabelGuardedIntGaugeVec,
72 pub sqlserver_cdc_upstream_min_lsn: LabelGuardedIntGaugeVec,
74 pub sqlserver_cdc_upstream_max_lsn: LabelGuardedIntGaugeVec,
76}
77
78pub static GLOBAL_ENUMERATOR_METRICS: LazyLock<EnumeratorMetrics> =
79 LazyLock::new(|| EnumeratorMetrics::new(&GLOBAL_METRICS_REGISTRY));
80
81impl EnumeratorMetrics {
82 fn new(registry: &Registry) -> Self {
83 let high_watermark = register_guarded_int_gauge_vec_with_registry!(
84 "source_kafka_high_watermark",
85 "High watermark for a exec per partition",
86 &["source_id", "partition"],
87 registry,
88 )
89 .unwrap();
90
91 let pg_cdc_confirmed_flush_lsn = register_guarded_int_gauge_vec_with_registry!(
92 "pg_cdc_confirmed_flush_lsn",
93 "PostgreSQL CDC confirmed flush LSN",
94 &["source_id", "slot_name"],
95 registry,
96 )
97 .unwrap();
98
99 let pg_cdc_upstream_max_lsn = register_guarded_int_gauge_vec_with_registry!(
100 "pg_cdc_upstream_max_lsn",
101 "PostgreSQL CDC upstream max LSN (pg_current_wal_lsn)",
102 &["source_id", "slot_name"],
103 registry,
104 )
105 .unwrap();
106
107 let mysql_cdc_binlog_file_seq_min = register_guarded_int_gauge_vec_with_registry!(
108 "mysql_cdc_binlog_file_seq_min",
109 "MySQL CDC upstream binlog file sequence number (minimum/oldest)",
110 &["hostname", "port"],
111 registry,
112 )
113 .unwrap();
114
115 let mysql_cdc_binlog_file_seq_max = register_guarded_int_gauge_vec_with_registry!(
116 "mysql_cdc_binlog_file_seq_max",
117 "MySQL CDC upstream binlog file sequence number (maximum/newest)",
118 &["hostname", "port"],
119 registry,
120 )
121 .unwrap();
122
123 let sqlserver_cdc_upstream_min_lsn = register_guarded_int_gauge_vec_with_registry!(
124 "sqlserver_cdc_upstream_min_lsn",
125 "SQL Server CDC upstream minimum LSN",
126 &["source_id"],
127 registry,
128 )
129 .unwrap();
130
131 let sqlserver_cdc_upstream_max_lsn = register_guarded_int_gauge_vec_with_registry!(
132 "sqlserver_cdc_upstream_max_lsn",
133 "SQL Server CDC upstream maximum LSN",
134 &["source_id"],
135 registry,
136 )
137 .unwrap();
138
139 EnumeratorMetrics {
140 high_watermark,
141 pg_cdc_confirmed_flush_lsn,
142 pg_cdc_upstream_max_lsn,
143 mysql_cdc_binlog_file_seq_min,
144 mysql_cdc_binlog_file_seq_max,
145 sqlserver_cdc_upstream_min_lsn,
146 sqlserver_cdc_upstream_max_lsn,
147 }
148 }
149
150 pub fn unused() -> Self {
151 Default::default()
152 }
153}
154
155impl Default for EnumeratorMetrics {
156 fn default() -> Self {
157 GLOBAL_ENUMERATOR_METRICS.clone()
158 }
159}
160
161#[derive(Debug, Clone)]
162pub struct SourceMetrics {
163 pub partition_input_count: LabelGuardedIntCounterVec,
164
165 pub partition_input_bytes: LabelGuardedIntCounterVec,
168 pub latest_message_id: LabelGuardedIntGaugeVec,
170 pub partition_eof_count: LabelGuardedIntCounterVec,
171 pub partition_eof_offset: LabelGuardedIntGaugeVec,
172 pub rdkafka_native_metric: Arc<RdKafkaStats>,
173
174 pub direct_cdc_event_lag_latency: LabelGuardedHistogramVec,
175
176 pub parquet_source_skip_row_count: LabelGuardedIntCounterVec,
177 pub file_source_input_row_count: LabelGuardedIntCounterVec,
178 pub file_source_dirty_split_count: LabelGuardedIntGaugeVec,
179 pub file_source_failed_split_count: LabelGuardedIntCounterVec,
180
181 pub kinesis_throughput_exceeded_count: LabelGuardedIntCounterVec,
183 pub kinesis_timeout_count: LabelGuardedIntCounterVec,
184 pub kinesis_rebuild_shard_iter_count: LabelGuardedIntCounterVec,
185 pub kinesis_early_terminate_shard_count: LabelGuardedIntCounterVec,
186 pub kinesis_lag_latency_ms: LabelGuardedHistogramVec,
187
188 connector_ack_failure_count: IntCounterVec,
190 connector_ack_success_count: IntCounterVec,
192}
193
194pub static GLOBAL_SOURCE_METRICS: LazyLock<SourceMetrics> =
195 LazyLock::new(|| SourceMetrics::new(&GLOBAL_METRICS_REGISTRY));
196
197impl SourceMetrics {
198 pub fn inc_connector_ack_failure_count(
199 &self,
200 source_name: &str,
201 connector_type: &'static str,
202 failure_type: ConnectorAckFailureType,
203 ) {
204 self.connector_ack_failure_count
205 .with_label_values(&[source_name, connector_type, failure_type.as_str()])
206 .inc();
207 }
208
209 pub fn inc_connector_ack_success_count(&self, source_name: &str, connector_type: &'static str) {
210 self.connector_ack_success_count
211 .with_label_values(&[source_name, connector_type])
212 .inc();
213 }
214
215 fn new(registry: &Registry) -> Self {
216 let partition_input_count = register_guarded_int_counter_vec_with_registry!(
217 "source_partition_input_count",
218 "Total number of rows that have been input from specific partition",
219 &[
220 "actor_id",
221 "source_id",
222 "partition",
223 "source_name",
224 "fragment_id"
225 ],
226 registry
227 )
228 .unwrap();
229 let partition_input_bytes = register_guarded_int_counter_vec_with_registry!(
230 "source_partition_input_bytes",
231 "Total bytes that have been input from specific partition",
232 &[
233 "actor_id",
234 "source_id",
235 "partition",
236 "source_name",
237 "fragment_id"
238 ],
239 registry
240 )
241 .unwrap();
242 let latest_message_id = register_guarded_int_gauge_vec_with_registry!(
243 "source_latest_message_id",
244 "Latest message id for a exec per partition",
245 &["source_id", "actor_id", "partition"],
246 registry,
247 )
248 .unwrap();
249 let partition_eof_count = register_guarded_int_counter_vec_with_registry!(
250 "source_partition_eof_count",
251 "Total number of EOF events received from specific partition",
252 &["source_id", "partition", "source_name", "fragment_id"],
253 registry
254 )
255 .unwrap();
256 let partition_eof_offset = register_guarded_int_gauge_vec_with_registry!(
257 "source_partition_eof_offset",
258 "Latest resolved EOF offset for specific partition",
259 &["source_id", "partition", "source_name", "fragment_id"],
260 registry
261 )
262 .unwrap();
263
264 let opts = histogram_opts!(
265 "source_cdc_event_lag_duration_milliseconds",
266 "source_cdc_lag_latency",
267 exponential_buckets(1.0, 2.0, 21).unwrap(), );
269
270 let parquet_source_skip_row_count = register_guarded_int_counter_vec_with_registry!(
271 "parquet_source_skip_row_count",
272 "Total number of rows that have been set to null in parquet source",
273 &["actor_id", "source_id", "source_name", "fragment_id"],
274 registry
275 )
276 .unwrap();
277
278 let direct_cdc_event_lag_latency =
279 register_guarded_histogram_vec_with_registry!(opts, &["table_name"], registry).unwrap();
280
281 let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone()));
282
283 let file_source_input_row_count = register_guarded_int_counter_vec_with_registry!(
284 "file_source_input_row_count",
285 "Total number of rows that have been read in file source",
286 &["source_id", "source_name", "actor_id", "fragment_id"],
287 registry
288 )
289 .unwrap();
290 let file_source_dirty_split_count = register_guarded_int_gauge_vec_with_registry!(
291 "file_source_dirty_split_count",
292 "Current number of dirty file splits in file source",
293 &["source_id", "source_name", "actor_id", "fragment_id"],
294 registry
295 )
296 .unwrap();
297 let file_source_failed_split_count = register_guarded_int_counter_vec_with_registry!(
298 "file_source_failed_split_count",
299 "Total number of file splits marked dirty in file source",
300 &["source_id", "source_name", "actor_id", "fragment_id"],
301 registry
302 )
303 .unwrap();
304
305 let kinesis_throughput_exceeded_count = register_guarded_int_counter_vec_with_registry!(
306 "kinesis_throughput_exceeded_count",
307 "Total number of times throughput exceeded in kinesis source",
308 &["source_id", "source_name", "fragment_id", "shard_id"],
309 registry
310 )
311 .unwrap();
312
313 let kinesis_timeout_count = register_guarded_int_counter_vec_with_registry!(
314 "kinesis_timeout_count",
315 "Total number of times timeout in kinesis source",
316 &["source_id", "source_name", "fragment_id", "shard_id"],
317 registry
318 )
319 .unwrap();
320
321 let kinesis_rebuild_shard_iter_count = register_guarded_int_counter_vec_with_registry!(
322 "kinesis_rebuild_shard_iter_count",
323 "Total number of times rebuild shard iter in kinesis source",
324 &["source_id", "source_name", "fragment_id", "shard_id"],
325 registry
326 )
327 .unwrap();
328
329 let kinesis_early_terminate_shard_count = register_guarded_int_counter_vec_with_registry!(
330 "kinesis_early_terminate_shard_count",
331 "Total number of times early terminate shard in kinesis source",
332 &["source_id", "source_name", "fragment_id", "shard_id"],
333 registry
334 )
335 .unwrap();
336
337 let kinesis_lag_latency_ms = register_guarded_histogram_vec_with_registry!(
338 "kinesis_lag_latency_ms",
339 "Lag latency in kinesis source",
340 &["source_id", "source_name", "fragment_id", "shard_id"],
341 registry
342 )
343 .unwrap();
344
345 let connector_ack_failure_count = register_int_counter_vec_with_registry!(
346 "source_connector_ack_failure_count",
347 "Total number of connector ack failures after checkpoint commit by bounded failure category",
348 &["source_name", "connector_type", "error_type"],
349 registry
350 )
351 .unwrap();
352 let connector_ack_success_count = register_int_counter_vec_with_registry!(
353 "source_connector_ack_success_count",
354 "Total number of successful connector acks after checkpoint commit",
355 &["source_name", "connector_type"],
356 registry
357 )
358 .unwrap();
359
360 SourceMetrics {
361 partition_input_count,
362 partition_input_bytes,
363 latest_message_id,
364 partition_eof_count,
365 partition_eof_offset,
366 rdkafka_native_metric,
367 direct_cdc_event_lag_latency,
368 parquet_source_skip_row_count,
369 file_source_input_row_count,
370 file_source_dirty_split_count,
371 file_source_failed_split_count,
372
373 kinesis_throughput_exceeded_count,
374 kinesis_timeout_count,
375 kinesis_rebuild_shard_iter_count,
376 kinesis_early_terminate_shard_count,
377 kinesis_lag_latency_ms,
378
379 connector_ack_failure_count,
380 connector_ack_success_count,
381 }
382 }
383}
384
385impl Default for SourceMetrics {
386 fn default() -> Self {
387 GLOBAL_SOURCE_METRICS.clone()
388 }
389}