risingwave_connector/source/monitor/
metrics.rs1use std::sync::{Arc, LazyLock};
16
17use prometheus::{Registry, exponential_buckets, histogram_opts};
18use risingwave_common::metrics::{
19 LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
20};
21use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
22use risingwave_common::{
23 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
24 register_guarded_int_gauge_vec_with_registry,
25};
26
27use crate::source::kafka::stats::RdKafkaStats;
28
29#[derive(Debug, Clone)]
30pub struct EnumeratorMetrics {
31 pub high_watermark: LabelGuardedIntGaugeVec,
32 pub pg_cdc_confirmed_flush_lsn: LabelGuardedIntGaugeVec,
34 pub mysql_cdc_binlog_file_seq_min: LabelGuardedIntGaugeVec,
36 pub mysql_cdc_binlog_file_seq_max: LabelGuardedIntGaugeVec,
38}
39
40pub static GLOBAL_ENUMERATOR_METRICS: LazyLock<EnumeratorMetrics> =
41 LazyLock::new(|| EnumeratorMetrics::new(&GLOBAL_METRICS_REGISTRY));
42
43impl EnumeratorMetrics {
44 fn new(registry: &Registry) -> Self {
45 let high_watermark = register_guarded_int_gauge_vec_with_registry!(
46 "source_kafka_high_watermark",
47 "High watermark for a exec per partition",
48 &["source_id", "partition"],
49 registry,
50 )
51 .unwrap();
52
53 let pg_cdc_confirmed_flush_lsn = register_guarded_int_gauge_vec_with_registry!(
54 "pg_cdc_confirmed_flush_lsn",
55 "PostgreSQL CDC confirmed flush LSN",
56 &["source_id", "slot_name"],
57 registry,
58 )
59 .unwrap();
60
61 let mysql_cdc_binlog_file_seq_min = register_guarded_int_gauge_vec_with_registry!(
62 "mysql_cdc_binlog_file_seq_min",
63 "MySQL CDC upstream binlog file sequence number (minimum/oldest)",
64 &["hostname", "port"],
65 registry,
66 )
67 .unwrap();
68
69 let mysql_cdc_binlog_file_seq_max = register_guarded_int_gauge_vec_with_registry!(
70 "mysql_cdc_binlog_file_seq_max",
71 "MySQL CDC upstream binlog file sequence number (maximum/newest)",
72 &["hostname", "port"],
73 registry,
74 )
75 .unwrap();
76
77 EnumeratorMetrics {
78 high_watermark,
79 pg_cdc_confirmed_flush_lsn,
80 mysql_cdc_binlog_file_seq_min,
81 mysql_cdc_binlog_file_seq_max,
82 }
83 }
84
85 pub fn unused() -> Self {
86 Default::default()
87 }
88}
89
90impl Default for EnumeratorMetrics {
91 fn default() -> Self {
92 GLOBAL_ENUMERATOR_METRICS.clone()
93 }
94}
95
96#[derive(Debug, Clone)]
97pub struct SourceMetrics {
98 pub partition_input_count: LabelGuardedIntCounterVec,
99
100 pub partition_input_bytes: LabelGuardedIntCounterVec,
103 pub latest_message_id: LabelGuardedIntGaugeVec,
105 pub rdkafka_native_metric: Arc<RdKafkaStats>,
106
107 pub direct_cdc_event_lag_latency: LabelGuardedHistogramVec,
108
109 pub parquet_source_skip_row_count: LabelGuardedIntCounterVec,
110 pub file_source_input_row_count: LabelGuardedIntCounterVec,
111 pub file_source_dirty_split_count: LabelGuardedIntGaugeVec,
112 pub file_source_failed_split_count: LabelGuardedIntCounterVec,
113
114 pub kinesis_throughput_exceeded_count: LabelGuardedIntCounterVec,
116 pub kinesis_timeout_count: LabelGuardedIntCounterVec,
117 pub kinesis_rebuild_shard_iter_count: LabelGuardedIntCounterVec,
118 pub kinesis_early_terminate_shard_count: LabelGuardedIntCounterVec,
119 pub kinesis_lag_latency_ms: LabelGuardedHistogramVec,
120}
121
122pub static GLOBAL_SOURCE_METRICS: LazyLock<SourceMetrics> =
123 LazyLock::new(|| SourceMetrics::new(&GLOBAL_METRICS_REGISTRY));
124
125impl SourceMetrics {
126 fn new(registry: &Registry) -> Self {
127 let partition_input_count = register_guarded_int_counter_vec_with_registry!(
128 "source_partition_input_count",
129 "Total number of rows that have been input from specific partition",
130 &[
131 "actor_id",
132 "source_id",
133 "partition",
134 "source_name",
135 "fragment_id"
136 ],
137 registry
138 )
139 .unwrap();
140 let partition_input_bytes = register_guarded_int_counter_vec_with_registry!(
141 "source_partition_input_bytes",
142 "Total bytes that have been input from specific partition",
143 &[
144 "actor_id",
145 "source_id",
146 "partition",
147 "source_name",
148 "fragment_id"
149 ],
150 registry
151 )
152 .unwrap();
153 let latest_message_id = register_guarded_int_gauge_vec_with_registry!(
154 "source_latest_message_id",
155 "Latest message id for a exec per partition",
156 &["source_id", "actor_id", "partition"],
157 registry,
158 )
159 .unwrap();
160
161 let opts = histogram_opts!(
162 "source_cdc_event_lag_duration_milliseconds",
163 "source_cdc_lag_latency",
164 exponential_buckets(1.0, 2.0, 21).unwrap(), );
166
167 let parquet_source_skip_row_count = register_guarded_int_counter_vec_with_registry!(
168 "parquet_source_skip_row_count",
169 "Total number of rows that have been set to null in parquet source",
170 &["actor_id", "source_id", "source_name", "fragment_id"],
171 registry
172 )
173 .unwrap();
174
175 let direct_cdc_event_lag_latency =
176 register_guarded_histogram_vec_with_registry!(opts, &["table_name"], registry).unwrap();
177
178 let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone()));
179
180 let file_source_input_row_count = register_guarded_int_counter_vec_with_registry!(
181 "file_source_input_row_count",
182 "Total number of rows that have been read in file source",
183 &["source_id", "source_name", "actor_id", "fragment_id"],
184 registry
185 )
186 .unwrap();
187 let file_source_dirty_split_count = register_guarded_int_gauge_vec_with_registry!(
188 "file_source_dirty_split_count",
189 "Current number of dirty file splits in file source",
190 &["source_id", "source_name", "actor_id", "fragment_id"],
191 registry
192 )
193 .unwrap();
194 let file_source_failed_split_count = register_guarded_int_counter_vec_with_registry!(
195 "file_source_failed_split_count",
196 "Total number of file splits marked dirty in file source",
197 &["source_id", "source_name", "actor_id", "fragment_id"],
198 registry
199 )
200 .unwrap();
201
202 let kinesis_throughput_exceeded_count = register_guarded_int_counter_vec_with_registry!(
203 "kinesis_throughput_exceeded_count",
204 "Total number of times throughput exceeded in kinesis source",
205 &["source_id", "source_name", "fragment_id", "shard_id"],
206 registry
207 )
208 .unwrap();
209
210 let kinesis_timeout_count = register_guarded_int_counter_vec_with_registry!(
211 "kinesis_timeout_count",
212 "Total number of times timeout in kinesis source",
213 &["source_id", "source_name", "fragment_id", "shard_id"],
214 registry
215 )
216 .unwrap();
217
218 let kinesis_rebuild_shard_iter_count = register_guarded_int_counter_vec_with_registry!(
219 "kinesis_rebuild_shard_iter_count",
220 "Total number of times rebuild shard iter in kinesis source",
221 &["source_id", "source_name", "fragment_id", "shard_id"],
222 registry
223 )
224 .unwrap();
225
226 let kinesis_early_terminate_shard_count = register_guarded_int_counter_vec_with_registry!(
227 "kinesis_early_terminate_shard_count",
228 "Total number of times early terminate shard in kinesis source",
229 &["source_id", "source_name", "fragment_id", "shard_id"],
230 registry
231 )
232 .unwrap();
233
234 let kinesis_lag_latency_ms = register_guarded_histogram_vec_with_registry!(
235 "kinesis_lag_latency_ms",
236 "Lag latency in kinesis source",
237 &["source_id", "source_name", "fragment_id", "shard_id"],
238 registry
239 )
240 .unwrap();
241
242 SourceMetrics {
243 partition_input_count,
244 partition_input_bytes,
245 latest_message_id,
246 rdkafka_native_metric,
247 direct_cdc_event_lag_latency,
248 parquet_source_skip_row_count,
249 file_source_input_row_count,
250 file_source_dirty_split_count,
251 file_source_failed_split_count,
252
253 kinesis_throughput_exceeded_count,
254 kinesis_timeout_count,
255 kinesis_rebuild_shard_iter_count,
256 kinesis_early_terminate_shard_count,
257 kinesis_lag_latency_ms,
258 }
259 }
260}
261
262impl Default for SourceMetrics {
263 fn default() -> Self {
264 GLOBAL_SOURCE_METRICS.clone()
265 }
266}