1use std::sync::{Arc, LazyLock};
16
17use prometheus::{Registry, exponential_buckets, histogram_opts};
18use risingwave_common::metrics::{
19 LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
20};
21use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
22use risingwave_common::{
23 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
24 register_guarded_int_gauge_vec_with_registry,
25};
26
27use crate::source::kafka::stats::RdKafkaStats;
28
29#[derive(Debug, Clone)]
30pub struct EnumeratorMetrics {
31 pub high_watermark: LabelGuardedIntGaugeVec,
32 pub pg_cdc_confirmed_flush_lsn: LabelGuardedIntGaugeVec,
34 pub pg_cdc_upstream_max_lsn: LabelGuardedIntGaugeVec,
36 pub mysql_cdc_binlog_file_seq_min: LabelGuardedIntGaugeVec,
38 pub mysql_cdc_binlog_file_seq_max: LabelGuardedIntGaugeVec,
40 pub sqlserver_cdc_upstream_min_lsn: LabelGuardedIntGaugeVec,
42 pub sqlserver_cdc_upstream_max_lsn: LabelGuardedIntGaugeVec,
44}
45
46pub static GLOBAL_ENUMERATOR_METRICS: LazyLock<EnumeratorMetrics> =
47 LazyLock::new(|| EnumeratorMetrics::new(&GLOBAL_METRICS_REGISTRY));
48
49impl EnumeratorMetrics {
50 fn new(registry: &Registry) -> Self {
51 let high_watermark = register_guarded_int_gauge_vec_with_registry!(
52 "source_kafka_high_watermark",
53 "High watermark for a exec per partition",
54 &["source_id", "partition"],
55 registry,
56 )
57 .unwrap();
58
59 let pg_cdc_confirmed_flush_lsn = register_guarded_int_gauge_vec_with_registry!(
60 "pg_cdc_confirmed_flush_lsn",
61 "PostgreSQL CDC confirmed flush LSN",
62 &["source_id", "slot_name"],
63 registry,
64 )
65 .unwrap();
66
67 let pg_cdc_upstream_max_lsn = register_guarded_int_gauge_vec_with_registry!(
68 "pg_cdc_upstream_max_lsn",
69 "PostgreSQL CDC upstream max LSN (pg_current_wal_lsn)",
70 &["source_id", "slot_name"],
71 registry,
72 )
73 .unwrap();
74
75 let mysql_cdc_binlog_file_seq_min = register_guarded_int_gauge_vec_with_registry!(
76 "mysql_cdc_binlog_file_seq_min",
77 "MySQL CDC upstream binlog file sequence number (minimum/oldest)",
78 &["hostname", "port"],
79 registry,
80 )
81 .unwrap();
82
83 let mysql_cdc_binlog_file_seq_max = register_guarded_int_gauge_vec_with_registry!(
84 "mysql_cdc_binlog_file_seq_max",
85 "MySQL CDC upstream binlog file sequence number (maximum/newest)",
86 &["hostname", "port"],
87 registry,
88 )
89 .unwrap();
90
91 let sqlserver_cdc_upstream_min_lsn = register_guarded_int_gauge_vec_with_registry!(
92 "sqlserver_cdc_upstream_min_lsn",
93 "SQL Server CDC upstream minimum LSN",
94 &["source_id"],
95 registry,
96 )
97 .unwrap();
98
99 let sqlserver_cdc_upstream_max_lsn = register_guarded_int_gauge_vec_with_registry!(
100 "sqlserver_cdc_upstream_max_lsn",
101 "SQL Server CDC upstream maximum LSN",
102 &["source_id"],
103 registry,
104 )
105 .unwrap();
106
107 EnumeratorMetrics {
108 high_watermark,
109 pg_cdc_confirmed_flush_lsn,
110 pg_cdc_upstream_max_lsn,
111 mysql_cdc_binlog_file_seq_min,
112 mysql_cdc_binlog_file_seq_max,
113 sqlserver_cdc_upstream_min_lsn,
114 sqlserver_cdc_upstream_max_lsn,
115 }
116 }
117
118 pub fn unused() -> Self {
119 Default::default()
120 }
121}
122
123impl Default for EnumeratorMetrics {
124 fn default() -> Self {
125 GLOBAL_ENUMERATOR_METRICS.clone()
126 }
127}
128
129#[derive(Debug, Clone)]
130pub struct SourceMetrics {
131 pub partition_input_count: LabelGuardedIntCounterVec,
132
133 pub partition_input_bytes: LabelGuardedIntCounterVec,
136 pub latest_message_id: LabelGuardedIntGaugeVec,
138 pub rdkafka_native_metric: Arc<RdKafkaStats>,
139
140 pub direct_cdc_event_lag_latency: LabelGuardedHistogramVec,
141
142 pub parquet_source_skip_row_count: LabelGuardedIntCounterVec,
143 pub file_source_input_row_count: LabelGuardedIntCounterVec,
144 pub file_source_dirty_split_count: LabelGuardedIntGaugeVec,
145 pub file_source_failed_split_count: LabelGuardedIntCounterVec,
146
147 pub kinesis_throughput_exceeded_count: LabelGuardedIntCounterVec,
149 pub kinesis_timeout_count: LabelGuardedIntCounterVec,
150 pub kinesis_rebuild_shard_iter_count: LabelGuardedIntCounterVec,
151 pub kinesis_early_terminate_shard_count: LabelGuardedIntCounterVec,
152 pub kinesis_lag_latency_ms: LabelGuardedHistogramVec,
153}
154
155pub static GLOBAL_SOURCE_METRICS: LazyLock<SourceMetrics> =
156 LazyLock::new(|| SourceMetrics::new(&GLOBAL_METRICS_REGISTRY));
157
158impl SourceMetrics {
159 fn new(registry: &Registry) -> Self {
160 let partition_input_count = register_guarded_int_counter_vec_with_registry!(
161 "source_partition_input_count",
162 "Total number of rows that have been input from specific partition",
163 &[
164 "actor_id",
165 "source_id",
166 "partition",
167 "source_name",
168 "fragment_id"
169 ],
170 registry
171 )
172 .unwrap();
173 let partition_input_bytes = register_guarded_int_counter_vec_with_registry!(
174 "source_partition_input_bytes",
175 "Total bytes that have been input from specific partition",
176 &[
177 "actor_id",
178 "source_id",
179 "partition",
180 "source_name",
181 "fragment_id"
182 ],
183 registry
184 )
185 .unwrap();
186 let latest_message_id = register_guarded_int_gauge_vec_with_registry!(
187 "source_latest_message_id",
188 "Latest message id for a exec per partition",
189 &["source_id", "actor_id", "partition"],
190 registry,
191 )
192 .unwrap();
193
194 let opts = histogram_opts!(
195 "source_cdc_event_lag_duration_milliseconds",
196 "source_cdc_lag_latency",
197 exponential_buckets(1.0, 2.0, 21).unwrap(), );
199
200 let parquet_source_skip_row_count = register_guarded_int_counter_vec_with_registry!(
201 "parquet_source_skip_row_count",
202 "Total number of rows that have been set to null in parquet source",
203 &["actor_id", "source_id", "source_name", "fragment_id"],
204 registry
205 )
206 .unwrap();
207
208 let direct_cdc_event_lag_latency =
209 register_guarded_histogram_vec_with_registry!(opts, &["table_name"], registry).unwrap();
210
211 let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone()));
212
213 let file_source_input_row_count = register_guarded_int_counter_vec_with_registry!(
214 "file_source_input_row_count",
215 "Total number of rows that have been read in file source",
216 &["source_id", "source_name", "actor_id", "fragment_id"],
217 registry
218 )
219 .unwrap();
220 let file_source_dirty_split_count = register_guarded_int_gauge_vec_with_registry!(
221 "file_source_dirty_split_count",
222 "Current number of dirty file splits in file source",
223 &["source_id", "source_name", "actor_id", "fragment_id"],
224 registry
225 )
226 .unwrap();
227 let file_source_failed_split_count = register_guarded_int_counter_vec_with_registry!(
228 "file_source_failed_split_count",
229 "Total number of file splits marked dirty in file source",
230 &["source_id", "source_name", "actor_id", "fragment_id"],
231 registry
232 )
233 .unwrap();
234
235 let kinesis_throughput_exceeded_count = register_guarded_int_counter_vec_with_registry!(
236 "kinesis_throughput_exceeded_count",
237 "Total number of times throughput exceeded in kinesis source",
238 &["source_id", "source_name", "fragment_id", "shard_id"],
239 registry
240 )
241 .unwrap();
242
243 let kinesis_timeout_count = register_guarded_int_counter_vec_with_registry!(
244 "kinesis_timeout_count",
245 "Total number of times timeout in kinesis source",
246 &["source_id", "source_name", "fragment_id", "shard_id"],
247 registry
248 )
249 .unwrap();
250
251 let kinesis_rebuild_shard_iter_count = register_guarded_int_counter_vec_with_registry!(
252 "kinesis_rebuild_shard_iter_count",
253 "Total number of times rebuild shard iter in kinesis source",
254 &["source_id", "source_name", "fragment_id", "shard_id"],
255 registry
256 )
257 .unwrap();
258
259 let kinesis_early_terminate_shard_count = register_guarded_int_counter_vec_with_registry!(
260 "kinesis_early_terminate_shard_count",
261 "Total number of times early terminate shard in kinesis source",
262 &["source_id", "source_name", "fragment_id", "shard_id"],
263 registry
264 )
265 .unwrap();
266
267 let kinesis_lag_latency_ms = register_guarded_histogram_vec_with_registry!(
268 "kinesis_lag_latency_ms",
269 "Lag latency in kinesis source",
270 &["source_id", "source_name", "fragment_id", "shard_id"],
271 registry
272 )
273 .unwrap();
274
275 SourceMetrics {
276 partition_input_count,
277 partition_input_bytes,
278 latest_message_id,
279 rdkafka_native_metric,
280 direct_cdc_event_lag_latency,
281 parquet_source_skip_row_count,
282 file_source_input_row_count,
283 file_source_dirty_split_count,
284 file_source_failed_split_count,
285
286 kinesis_throughput_exceeded_count,
287 kinesis_timeout_count,
288 kinesis_rebuild_shard_iter_count,
289 kinesis_early_terminate_shard_count,
290 kinesis_lag_latency_ms,
291 }
292 }
293}
294
295impl Default for SourceMetrics {
296 fn default() -> Self {
297 GLOBAL_SOURCE_METRICS.clone()
298 }
299}