risingwave_connector/source/monitor/
metrics.rs1use std::sync::{Arc, LazyLock};
16
17use prometheus::{Registry, exponential_buckets, histogram_opts};
18use risingwave_common::metrics::{
19 LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
20};
21use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
22use risingwave_common::{
23 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
24 register_guarded_int_gauge_vec_with_registry,
25};
26
27use crate::source::kafka::stats::RdKafkaStats;
28
29#[derive(Debug, Clone)]
30pub struct EnumeratorMetrics {
31 pub high_watermark: LabelGuardedIntGaugeVec,
32 pub pg_cdc_confirmed_flush_lsn: LabelGuardedIntGaugeVec,
34}
35
36pub static GLOBAL_ENUMERATOR_METRICS: LazyLock<EnumeratorMetrics> =
37 LazyLock::new(|| EnumeratorMetrics::new(&GLOBAL_METRICS_REGISTRY));
38
39impl EnumeratorMetrics {
40 fn new(registry: &Registry) -> Self {
41 let high_watermark = register_guarded_int_gauge_vec_with_registry!(
42 "source_kafka_high_watermark",
43 "High watermark for a exec per partition",
44 &["source_id", "partition"],
45 registry,
46 )
47 .unwrap();
48
49 let pg_cdc_confirmed_flush_lsn = register_guarded_int_gauge_vec_with_registry!(
50 "pg_cdc_confirmed_flush_lsn",
51 "PostgreSQL CDC confirmed flush LSN",
52 &["source_id", "slot_name"],
53 registry,
54 )
55 .unwrap();
56
57 EnumeratorMetrics {
58 high_watermark,
59 pg_cdc_confirmed_flush_lsn,
60 }
61 }
62
63 pub fn unused() -> Self {
64 Default::default()
65 }
66}
67
68impl Default for EnumeratorMetrics {
69 fn default() -> Self {
70 GLOBAL_ENUMERATOR_METRICS.clone()
71 }
72}
73
74#[derive(Debug, Clone)]
75pub struct SourceMetrics {
76 pub partition_input_count: LabelGuardedIntCounterVec,
77
78 pub partition_input_bytes: LabelGuardedIntCounterVec,
81 pub latest_message_id: LabelGuardedIntGaugeVec,
83 pub rdkafka_native_metric: Arc<RdKafkaStats>,
84
85 pub direct_cdc_event_lag_latency: LabelGuardedHistogramVec,
86
87 pub parquet_source_skip_row_count: LabelGuardedIntCounterVec,
88 pub file_source_input_row_count: LabelGuardedIntCounterVec,
89
90 pub kinesis_throughput_exceeded_count: LabelGuardedIntCounterVec,
92 pub kinesis_timeout_count: LabelGuardedIntCounterVec,
93 pub kinesis_rebuild_shard_iter_count: LabelGuardedIntCounterVec,
94 pub kinesis_early_terminate_shard_count: LabelGuardedIntCounterVec,
95 pub kinesis_lag_latency_ms: LabelGuardedHistogramVec,
96}
97
98pub static GLOBAL_SOURCE_METRICS: LazyLock<SourceMetrics> =
99 LazyLock::new(|| SourceMetrics::new(&GLOBAL_METRICS_REGISTRY));
100
101impl SourceMetrics {
102 fn new(registry: &Registry) -> Self {
103 let partition_input_count = register_guarded_int_counter_vec_with_registry!(
104 "source_partition_input_count",
105 "Total number of rows that have been input from specific partition",
106 &[
107 "actor_id",
108 "source_id",
109 "partition",
110 "source_name",
111 "fragment_id"
112 ],
113 registry
114 )
115 .unwrap();
116 let partition_input_bytes = register_guarded_int_counter_vec_with_registry!(
117 "source_partition_input_bytes",
118 "Total bytes that have been input from specific partition",
119 &[
120 "actor_id",
121 "source_id",
122 "partition",
123 "source_name",
124 "fragment_id"
125 ],
126 registry
127 )
128 .unwrap();
129 let latest_message_id = register_guarded_int_gauge_vec_with_registry!(
130 "source_latest_message_id",
131 "Latest message id for a exec per partition",
132 &["source_id", "actor_id", "partition"],
133 registry,
134 )
135 .unwrap();
136
137 let opts = histogram_opts!(
138 "source_cdc_event_lag_duration_milliseconds",
139 "source_cdc_lag_latency",
140 exponential_buckets(1.0, 2.0, 21).unwrap(), );
142
143 let parquet_source_skip_row_count = register_guarded_int_counter_vec_with_registry!(
144 "parquet_source_skip_row_count",
145 "Total number of rows that have been set to null in parquet source",
146 &["actor_id", "source_id", "source_name", "fragment_id"],
147 registry
148 )
149 .unwrap();
150
151 let direct_cdc_event_lag_latency =
152 register_guarded_histogram_vec_with_registry!(opts, &["table_name"], registry).unwrap();
153
154 let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone()));
155
156 let file_source_input_row_count = register_guarded_int_counter_vec_with_registry!(
157 "file_source_input_row_count",
158 "Total number of rows that have been read in file source",
159 &["source_id", "source_name", "actor_id", "fragment_id"],
160 registry
161 )
162 .unwrap();
163
164 let kinesis_throughput_exceeded_count = register_guarded_int_counter_vec_with_registry!(
165 "kinesis_throughput_exceeded_count",
166 "Total number of times throughput exceeded in kinesis source",
167 &["source_id", "source_name", "fragment_id", "shard_id"],
168 registry
169 )
170 .unwrap();
171
172 let kinesis_timeout_count = register_guarded_int_counter_vec_with_registry!(
173 "kinesis_timeout_count",
174 "Total number of times timeout in kinesis source",
175 &["source_id", "source_name", "fragment_id", "shard_id"],
176 registry
177 )
178 .unwrap();
179
180 let kinesis_rebuild_shard_iter_count = register_guarded_int_counter_vec_with_registry!(
181 "kinesis_rebuild_shard_iter_count",
182 "Total number of times rebuild shard iter in kinesis source",
183 &["source_id", "source_name", "fragment_id", "shard_id"],
184 registry
185 )
186 .unwrap();
187
188 let kinesis_early_terminate_shard_count = register_guarded_int_counter_vec_with_registry!(
189 "kinesis_early_terminate_shard_count",
190 "Total number of times early terminate shard in kinesis source",
191 &["source_id", "source_name", "fragment_id", "shard_id"],
192 registry
193 )
194 .unwrap();
195
196 let kinesis_lag_latency_ms = register_guarded_histogram_vec_with_registry!(
197 "kinesis_lag_latency_ms",
198 "Lag latency in kinesis source",
199 &["source_id", "source_name", "fragment_id", "shard_id"],
200 registry
201 )
202 .unwrap();
203
204 SourceMetrics {
205 partition_input_count,
206 partition_input_bytes,
207 latest_message_id,
208 rdkafka_native_metric,
209 direct_cdc_event_lag_latency,
210 parquet_source_skip_row_count,
211 file_source_input_row_count,
212
213 kinesis_throughput_exceeded_count,
214 kinesis_timeout_count,
215 kinesis_rebuild_shard_iter_count,
216 kinesis_early_terminate_shard_count,
217 kinesis_lag_latency_ms,
218 }
219 }
220}
221
222impl Default for SourceMetrics {
223 fn default() -> Self {
224 GLOBAL_SOURCE_METRICS.clone()
225 }
226}