risingwave_connector/source/monitor/
metrics.rs1use std::sync::{Arc, LazyLock};
16
17use prometheus::{Registry, exponential_buckets, histogram_opts};
18use risingwave_common::metrics::{
19 LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
20};
21use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
22use risingwave_common::{
23 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
24 register_guarded_int_gauge_vec_with_registry,
25};
26
27use crate::source::kafka::stats::RdKafkaStats;
28
29#[derive(Debug, Clone)]
30pub struct EnumeratorMetrics {
31 pub high_watermark: LabelGuardedIntGaugeVec,
32 pub pg_cdc_confirmed_flush_lsn: LabelGuardedIntGaugeVec,
34 pub mysql_cdc_binlog_file_seq_min: LabelGuardedIntGaugeVec,
36 pub mysql_cdc_binlog_file_seq_max: LabelGuardedIntGaugeVec,
38}
39
40pub static GLOBAL_ENUMERATOR_METRICS: LazyLock<EnumeratorMetrics> =
41 LazyLock::new(|| EnumeratorMetrics::new(&GLOBAL_METRICS_REGISTRY));
42
43impl EnumeratorMetrics {
44 fn new(registry: &Registry) -> Self {
45 let high_watermark = register_guarded_int_gauge_vec_with_registry!(
46 "source_kafka_high_watermark",
47 "High watermark for a exec per partition",
48 &["source_id", "partition"],
49 registry,
50 )
51 .unwrap();
52
53 let pg_cdc_confirmed_flush_lsn = register_guarded_int_gauge_vec_with_registry!(
54 "pg_cdc_confirmed_flush_lsn",
55 "PostgreSQL CDC confirmed flush LSN",
56 &["source_id", "slot_name"],
57 registry,
58 )
59 .unwrap();
60
61 let mysql_cdc_binlog_file_seq_min = register_guarded_int_gauge_vec_with_registry!(
62 "mysql_cdc_binlog_file_seq_min",
63 "MySQL CDC upstream binlog file sequence number (minimum/oldest)",
64 &["hostname", "port"],
65 registry,
66 )
67 .unwrap();
68
69 let mysql_cdc_binlog_file_seq_max = register_guarded_int_gauge_vec_with_registry!(
70 "mysql_cdc_binlog_file_seq_max",
71 "MySQL CDC upstream binlog file sequence number (maximum/newest)",
72 &["hostname", "port"],
73 registry,
74 )
75 .unwrap();
76
77 EnumeratorMetrics {
78 high_watermark,
79 pg_cdc_confirmed_flush_lsn,
80 mysql_cdc_binlog_file_seq_min,
81 mysql_cdc_binlog_file_seq_max,
82 }
83 }
84
85 pub fn unused() -> Self {
86 Default::default()
87 }
88}
89
90impl Default for EnumeratorMetrics {
91 fn default() -> Self {
92 GLOBAL_ENUMERATOR_METRICS.clone()
93 }
94}
95
96#[derive(Debug, Clone)]
97pub struct SourceMetrics {
98 pub partition_input_count: LabelGuardedIntCounterVec,
99
100 pub partition_input_bytes: LabelGuardedIntCounterVec,
103 pub latest_message_id: LabelGuardedIntGaugeVec,
105 pub rdkafka_native_metric: Arc<RdKafkaStats>,
106
107 pub direct_cdc_event_lag_latency: LabelGuardedHistogramVec,
108
109 pub parquet_source_skip_row_count: LabelGuardedIntCounterVec,
110 pub file_source_input_row_count: LabelGuardedIntCounterVec,
111
112 pub kinesis_throughput_exceeded_count: LabelGuardedIntCounterVec,
114 pub kinesis_timeout_count: LabelGuardedIntCounterVec,
115 pub kinesis_rebuild_shard_iter_count: LabelGuardedIntCounterVec,
116 pub kinesis_early_terminate_shard_count: LabelGuardedIntCounterVec,
117 pub kinesis_lag_latency_ms: LabelGuardedHistogramVec,
118}
119
120pub static GLOBAL_SOURCE_METRICS: LazyLock<SourceMetrics> =
121 LazyLock::new(|| SourceMetrics::new(&GLOBAL_METRICS_REGISTRY));
122
123impl SourceMetrics {
124 fn new(registry: &Registry) -> Self {
125 let partition_input_count = register_guarded_int_counter_vec_with_registry!(
126 "source_partition_input_count",
127 "Total number of rows that have been input from specific partition",
128 &[
129 "actor_id",
130 "source_id",
131 "partition",
132 "source_name",
133 "fragment_id"
134 ],
135 registry
136 )
137 .unwrap();
138 let partition_input_bytes = register_guarded_int_counter_vec_with_registry!(
139 "source_partition_input_bytes",
140 "Total bytes that have been input from specific partition",
141 &[
142 "actor_id",
143 "source_id",
144 "partition",
145 "source_name",
146 "fragment_id"
147 ],
148 registry
149 )
150 .unwrap();
151 let latest_message_id = register_guarded_int_gauge_vec_with_registry!(
152 "source_latest_message_id",
153 "Latest message id for a exec per partition",
154 &["source_id", "actor_id", "partition"],
155 registry,
156 )
157 .unwrap();
158
159 let opts = histogram_opts!(
160 "source_cdc_event_lag_duration_milliseconds",
161 "source_cdc_lag_latency",
162 exponential_buckets(1.0, 2.0, 21).unwrap(), );
164
165 let parquet_source_skip_row_count = register_guarded_int_counter_vec_with_registry!(
166 "parquet_source_skip_row_count",
167 "Total number of rows that have been set to null in parquet source",
168 &["actor_id", "source_id", "source_name", "fragment_id"],
169 registry
170 )
171 .unwrap();
172
173 let direct_cdc_event_lag_latency =
174 register_guarded_histogram_vec_with_registry!(opts, &["table_name"], registry).unwrap();
175
176 let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone()));
177
178 let file_source_input_row_count = register_guarded_int_counter_vec_with_registry!(
179 "file_source_input_row_count",
180 "Total number of rows that have been read in file source",
181 &["source_id", "source_name", "actor_id", "fragment_id"],
182 registry
183 )
184 .unwrap();
185
186 let kinesis_throughput_exceeded_count = register_guarded_int_counter_vec_with_registry!(
187 "kinesis_throughput_exceeded_count",
188 "Total number of times throughput exceeded in kinesis source",
189 &["source_id", "source_name", "fragment_id", "shard_id"],
190 registry
191 )
192 .unwrap();
193
194 let kinesis_timeout_count = register_guarded_int_counter_vec_with_registry!(
195 "kinesis_timeout_count",
196 "Total number of times timeout in kinesis source",
197 &["source_id", "source_name", "fragment_id", "shard_id"],
198 registry
199 )
200 .unwrap();
201
202 let kinesis_rebuild_shard_iter_count = register_guarded_int_counter_vec_with_registry!(
203 "kinesis_rebuild_shard_iter_count",
204 "Total number of times rebuild shard iter in kinesis source",
205 &["source_id", "source_name", "fragment_id", "shard_id"],
206 registry
207 )
208 .unwrap();
209
210 let kinesis_early_terminate_shard_count = register_guarded_int_counter_vec_with_registry!(
211 "kinesis_early_terminate_shard_count",
212 "Total number of times early terminate shard in kinesis source",
213 &["source_id", "source_name", "fragment_id", "shard_id"],
214 registry
215 )
216 .unwrap();
217
218 let kinesis_lag_latency_ms = register_guarded_histogram_vec_with_registry!(
219 "kinesis_lag_latency_ms",
220 "Lag latency in kinesis source",
221 &["source_id", "source_name", "fragment_id", "shard_id"],
222 registry
223 )
224 .unwrap();
225
226 SourceMetrics {
227 partition_input_count,
228 partition_input_bytes,
229 latest_message_id,
230 rdkafka_native_metric,
231 direct_cdc_event_lag_latency,
232 parquet_source_skip_row_count,
233 file_source_input_row_count,
234
235 kinesis_throughput_exceeded_count,
236 kinesis_timeout_count,
237 kinesis_rebuild_shard_iter_count,
238 kinesis_early_terminate_shard_count,
239 kinesis_lag_latency_ms,
240 }
241 }
242}
243
244impl Default for SourceMetrics {
245 fn default() -> Self {
246 GLOBAL_SOURCE_METRICS.clone()
247 }
248}