1use std::sync::{Arc, LazyLock};
16
17use prometheus::{
18 IntCounterVec, Registry, exponential_buckets, histogram_opts,
19 register_int_counter_vec_with_registry,
20};
21use risingwave_common::metrics::{
22 LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
23};
24use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
25use risingwave_common::{
26 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
27 register_guarded_int_gauge_vec_with_registry,
28};
29
30use crate::source::kafka::stats::RdKafkaStats;
31
32#[derive(Debug, Clone)]
33pub struct EnumeratorMetrics {
34 pub high_watermark: LabelGuardedIntGaugeVec,
35 pub pg_cdc_confirmed_flush_lsn: LabelGuardedIntGaugeVec,
37 pub pg_cdc_upstream_max_lsn: LabelGuardedIntGaugeVec,
39 pub mysql_cdc_binlog_file_seq_min: LabelGuardedIntGaugeVec,
41 pub mysql_cdc_binlog_file_seq_max: LabelGuardedIntGaugeVec,
43 pub sqlserver_cdc_upstream_min_lsn: LabelGuardedIntGaugeVec,
45 pub sqlserver_cdc_upstream_max_lsn: LabelGuardedIntGaugeVec,
47}
48
49pub static GLOBAL_ENUMERATOR_METRICS: LazyLock<EnumeratorMetrics> =
50 LazyLock::new(|| EnumeratorMetrics::new(&GLOBAL_METRICS_REGISTRY));
51
52impl EnumeratorMetrics {
53 fn new(registry: &Registry) -> Self {
54 let high_watermark = register_guarded_int_gauge_vec_with_registry!(
55 "source_kafka_high_watermark",
56 "High watermark for a exec per partition",
57 &["source_id", "partition"],
58 registry,
59 )
60 .unwrap();
61
62 let pg_cdc_confirmed_flush_lsn = register_guarded_int_gauge_vec_with_registry!(
63 "pg_cdc_confirmed_flush_lsn",
64 "PostgreSQL CDC confirmed flush LSN",
65 &["source_id", "slot_name"],
66 registry,
67 )
68 .unwrap();
69
70 let pg_cdc_upstream_max_lsn = register_guarded_int_gauge_vec_with_registry!(
71 "pg_cdc_upstream_max_lsn",
72 "PostgreSQL CDC upstream max LSN (pg_current_wal_lsn)",
73 &["source_id", "slot_name"],
74 registry,
75 )
76 .unwrap();
77
78 let mysql_cdc_binlog_file_seq_min = register_guarded_int_gauge_vec_with_registry!(
79 "mysql_cdc_binlog_file_seq_min",
80 "MySQL CDC upstream binlog file sequence number (minimum/oldest)",
81 &["hostname", "port"],
82 registry,
83 )
84 .unwrap();
85
86 let mysql_cdc_binlog_file_seq_max = register_guarded_int_gauge_vec_with_registry!(
87 "mysql_cdc_binlog_file_seq_max",
88 "MySQL CDC upstream binlog file sequence number (maximum/newest)",
89 &["hostname", "port"],
90 registry,
91 )
92 .unwrap();
93
94 let sqlserver_cdc_upstream_min_lsn = register_guarded_int_gauge_vec_with_registry!(
95 "sqlserver_cdc_upstream_min_lsn",
96 "SQL Server CDC upstream minimum LSN",
97 &["source_id"],
98 registry,
99 )
100 .unwrap();
101
102 let sqlserver_cdc_upstream_max_lsn = register_guarded_int_gauge_vec_with_registry!(
103 "sqlserver_cdc_upstream_max_lsn",
104 "SQL Server CDC upstream maximum LSN",
105 &["source_id"],
106 registry,
107 )
108 .unwrap();
109
110 EnumeratorMetrics {
111 high_watermark,
112 pg_cdc_confirmed_flush_lsn,
113 pg_cdc_upstream_max_lsn,
114 mysql_cdc_binlog_file_seq_min,
115 mysql_cdc_binlog_file_seq_max,
116 sqlserver_cdc_upstream_min_lsn,
117 sqlserver_cdc_upstream_max_lsn,
118 }
119 }
120
121 pub fn unused() -> Self {
122 Default::default()
123 }
124}
125
126impl Default for EnumeratorMetrics {
127 fn default() -> Self {
128 GLOBAL_ENUMERATOR_METRICS.clone()
129 }
130}
131
132#[derive(Debug, Clone)]
133pub struct SourceMetrics {
134 pub partition_input_count: LabelGuardedIntCounterVec,
135
136 pub partition_input_bytes: LabelGuardedIntCounterVec,
139 pub latest_message_id: LabelGuardedIntGaugeVec,
141 pub rdkafka_native_metric: Arc<RdKafkaStats>,
142
143 pub direct_cdc_event_lag_latency: LabelGuardedHistogramVec,
144
145 pub parquet_source_skip_row_count: LabelGuardedIntCounterVec,
146 pub file_source_input_row_count: LabelGuardedIntCounterVec,
147 pub file_source_dirty_split_count: LabelGuardedIntGaugeVec,
148 pub file_source_failed_split_count: LabelGuardedIntCounterVec,
149
150 pub kinesis_throughput_exceeded_count: LabelGuardedIntCounterVec,
152 pub kinesis_timeout_count: LabelGuardedIntCounterVec,
153 pub kinesis_rebuild_shard_iter_count: LabelGuardedIntCounterVec,
154 pub kinesis_early_terminate_shard_count: LabelGuardedIntCounterVec,
155 pub kinesis_lag_latency_ms: LabelGuardedHistogramVec,
156
157 pub connector_ack_failure_count: IntCounterVec,
159}
160
161pub static GLOBAL_SOURCE_METRICS: LazyLock<SourceMetrics> =
162 LazyLock::new(|| SourceMetrics::new(&GLOBAL_METRICS_REGISTRY));
163
164impl SourceMetrics {
165 fn new(registry: &Registry) -> Self {
166 let partition_input_count = register_guarded_int_counter_vec_with_registry!(
167 "source_partition_input_count",
168 "Total number of rows that have been input from specific partition",
169 &[
170 "actor_id",
171 "source_id",
172 "partition",
173 "source_name",
174 "fragment_id"
175 ],
176 registry
177 )
178 .unwrap();
179 let partition_input_bytes = register_guarded_int_counter_vec_with_registry!(
180 "source_partition_input_bytes",
181 "Total bytes that have been input from specific partition",
182 &[
183 "actor_id",
184 "source_id",
185 "partition",
186 "source_name",
187 "fragment_id"
188 ],
189 registry
190 )
191 .unwrap();
192 let latest_message_id = register_guarded_int_gauge_vec_with_registry!(
193 "source_latest_message_id",
194 "Latest message id for a exec per partition",
195 &["source_id", "actor_id", "partition"],
196 registry,
197 )
198 .unwrap();
199
200 let opts = histogram_opts!(
201 "source_cdc_event_lag_duration_milliseconds",
202 "source_cdc_lag_latency",
203 exponential_buckets(1.0, 2.0, 21).unwrap(), );
205
206 let parquet_source_skip_row_count = register_guarded_int_counter_vec_with_registry!(
207 "parquet_source_skip_row_count",
208 "Total number of rows that have been set to null in parquet source",
209 &["actor_id", "source_id", "source_name", "fragment_id"],
210 registry
211 )
212 .unwrap();
213
214 let direct_cdc_event_lag_latency =
215 register_guarded_histogram_vec_with_registry!(opts, &["table_name"], registry).unwrap();
216
217 let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone()));
218
219 let file_source_input_row_count = register_guarded_int_counter_vec_with_registry!(
220 "file_source_input_row_count",
221 "Total number of rows that have been read in file source",
222 &["source_id", "source_name", "actor_id", "fragment_id"],
223 registry
224 )
225 .unwrap();
226 let file_source_dirty_split_count = register_guarded_int_gauge_vec_with_registry!(
227 "file_source_dirty_split_count",
228 "Current number of dirty file splits in file source",
229 &["source_id", "source_name", "actor_id", "fragment_id"],
230 registry
231 )
232 .unwrap();
233 let file_source_failed_split_count = register_guarded_int_counter_vec_with_registry!(
234 "file_source_failed_split_count",
235 "Total number of file splits marked dirty in file source",
236 &["source_id", "source_name", "actor_id", "fragment_id"],
237 registry
238 )
239 .unwrap();
240
241 let kinesis_throughput_exceeded_count = register_guarded_int_counter_vec_with_registry!(
242 "kinesis_throughput_exceeded_count",
243 "Total number of times throughput exceeded in kinesis source",
244 &["source_id", "source_name", "fragment_id", "shard_id"],
245 registry
246 )
247 .unwrap();
248
249 let kinesis_timeout_count = register_guarded_int_counter_vec_with_registry!(
250 "kinesis_timeout_count",
251 "Total number of times timeout in kinesis source",
252 &["source_id", "source_name", "fragment_id", "shard_id"],
253 registry
254 )
255 .unwrap();
256
257 let kinesis_rebuild_shard_iter_count = register_guarded_int_counter_vec_with_registry!(
258 "kinesis_rebuild_shard_iter_count",
259 "Total number of times rebuild shard iter in kinesis source",
260 &["source_id", "source_name", "fragment_id", "shard_id"],
261 registry
262 )
263 .unwrap();
264
265 let kinesis_early_terminate_shard_count = register_guarded_int_counter_vec_with_registry!(
266 "kinesis_early_terminate_shard_count",
267 "Total number of times early terminate shard in kinesis source",
268 &["source_id", "source_name", "fragment_id", "shard_id"],
269 registry
270 )
271 .unwrap();
272
273 let kinesis_lag_latency_ms = register_guarded_histogram_vec_with_registry!(
274 "kinesis_lag_latency_ms",
275 "Lag latency in kinesis source",
276 &["source_id", "source_name", "fragment_id", "shard_id"],
277 registry
278 )
279 .unwrap();
280
281 let connector_ack_failure_count = register_int_counter_vec_with_registry!(
282 "source_connector_ack_failure_count",
283 "Total number of ack failures during checkpoint for source connectors",
284 &["source_name", "connector_type", "error_type"],
285 registry
286 )
287 .unwrap();
288
289 SourceMetrics {
290 partition_input_count,
291 partition_input_bytes,
292 latest_message_id,
293 rdkafka_native_metric,
294 direct_cdc_event_lag_latency,
295 parquet_source_skip_row_count,
296 file_source_input_row_count,
297 file_source_dirty_split_count,
298 file_source_failed_split_count,
299
300 kinesis_throughput_exceeded_count,
301 kinesis_timeout_count,
302 kinesis_rebuild_shard_iter_count,
303 kinesis_early_terminate_shard_count,
304 kinesis_lag_latency_ms,
305
306 connector_ack_failure_count,
307 }
308 }
309}
310
311impl Default for SourceMetrics {
312 fn default() -> Self {
313 GLOBAL_SOURCE_METRICS.clone()
314 }
315}