risingwave_connector/source/monitor/
metrics.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, LazyLock};
16
17use prometheus::{
18    IntCounterVec, Registry, exponential_buckets, histogram_opts,
19    register_int_counter_vec_with_registry,
20};
21use risingwave_common::metrics::{
22    LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
23};
24use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
25use risingwave_common::{
26    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
27    register_guarded_int_gauge_vec_with_registry,
28};
29
30use crate::source::kafka::stats::RdKafkaStats;
31
32#[derive(Debug, Clone)]
33pub struct EnumeratorMetrics {
34    pub high_watermark: LabelGuardedIntGaugeVec,
35    /// PostgreSQL CDC confirmed flush LSN monitoring
36    pub pg_cdc_confirmed_flush_lsn: LabelGuardedIntGaugeVec,
37    /// PostgreSQL CDC upstream max LSN monitoring
38    pub pg_cdc_upstream_max_lsn: LabelGuardedIntGaugeVec,
39    /// MySQL CDC binlog file sequence number (min)
40    pub mysql_cdc_binlog_file_seq_min: LabelGuardedIntGaugeVec,
41    /// MySQL CDC binlog file sequence number (max)
42    pub mysql_cdc_binlog_file_seq_max: LabelGuardedIntGaugeVec,
43    /// SQL Server CDC upstream minimum LSN
44    pub sqlserver_cdc_upstream_min_lsn: LabelGuardedIntGaugeVec,
45    /// SQL Server CDC upstream maximum LSN
46    pub sqlserver_cdc_upstream_max_lsn: LabelGuardedIntGaugeVec,
47}
48
49pub static GLOBAL_ENUMERATOR_METRICS: LazyLock<EnumeratorMetrics> =
50    LazyLock::new(|| EnumeratorMetrics::new(&GLOBAL_METRICS_REGISTRY));
51
52impl EnumeratorMetrics {
53    fn new(registry: &Registry) -> Self {
54        let high_watermark = register_guarded_int_gauge_vec_with_registry!(
55            "source_kafka_high_watermark",
56            "High watermark for a exec per partition",
57            &["source_id", "partition"],
58            registry,
59        )
60        .unwrap();
61
62        let pg_cdc_confirmed_flush_lsn = register_guarded_int_gauge_vec_with_registry!(
63            "pg_cdc_confirmed_flush_lsn",
64            "PostgreSQL CDC confirmed flush LSN",
65            &["source_id", "slot_name"],
66            registry,
67        )
68        .unwrap();
69
70        let pg_cdc_upstream_max_lsn = register_guarded_int_gauge_vec_with_registry!(
71            "pg_cdc_upstream_max_lsn",
72            "PostgreSQL CDC upstream max LSN (pg_current_wal_lsn)",
73            &["source_id", "slot_name"],
74            registry,
75        )
76        .unwrap();
77
78        let mysql_cdc_binlog_file_seq_min = register_guarded_int_gauge_vec_with_registry!(
79            "mysql_cdc_binlog_file_seq_min",
80            "MySQL CDC upstream binlog file sequence number (minimum/oldest)",
81            &["hostname", "port"],
82            registry,
83        )
84        .unwrap();
85
86        let mysql_cdc_binlog_file_seq_max = register_guarded_int_gauge_vec_with_registry!(
87            "mysql_cdc_binlog_file_seq_max",
88            "MySQL CDC upstream binlog file sequence number (maximum/newest)",
89            &["hostname", "port"],
90            registry,
91        )
92        .unwrap();
93
94        let sqlserver_cdc_upstream_min_lsn = register_guarded_int_gauge_vec_with_registry!(
95            "sqlserver_cdc_upstream_min_lsn",
96            "SQL Server CDC upstream minimum LSN",
97            &["source_id"],
98            registry,
99        )
100        .unwrap();
101
102        let sqlserver_cdc_upstream_max_lsn = register_guarded_int_gauge_vec_with_registry!(
103            "sqlserver_cdc_upstream_max_lsn",
104            "SQL Server CDC upstream maximum LSN",
105            &["source_id"],
106            registry,
107        )
108        .unwrap();
109
110        EnumeratorMetrics {
111            high_watermark,
112            pg_cdc_confirmed_flush_lsn,
113            pg_cdc_upstream_max_lsn,
114            mysql_cdc_binlog_file_seq_min,
115            mysql_cdc_binlog_file_seq_max,
116            sqlserver_cdc_upstream_min_lsn,
117            sqlserver_cdc_upstream_max_lsn,
118        }
119    }
120
121    pub fn unused() -> Self {
122        Default::default()
123    }
124}
125
126impl Default for EnumeratorMetrics {
127    fn default() -> Self {
128        GLOBAL_ENUMERATOR_METRICS.clone()
129    }
130}
131
132#[derive(Debug, Clone)]
133pub struct SourceMetrics {
134    pub partition_input_count: LabelGuardedIntCounterVec,
135
136    // **Note**: for normal messages, the metric is the message's payload size.
137    // For messages from load generator, the metric is the size of stream chunk.
138    pub partition_input_bytes: LabelGuardedIntCounterVec,
139    /// Report latest message id
140    pub latest_message_id: LabelGuardedIntGaugeVec,
141    pub partition_eof_count: LabelGuardedIntCounterVec,
142    pub partition_eof_offset: LabelGuardedIntGaugeVec,
143    pub rdkafka_native_metric: Arc<RdKafkaStats>,
144
145    pub direct_cdc_event_lag_latency: LabelGuardedHistogramVec,
146
147    pub parquet_source_skip_row_count: LabelGuardedIntCounterVec,
148    pub file_source_input_row_count: LabelGuardedIntCounterVec,
149    pub file_source_dirty_split_count: LabelGuardedIntGaugeVec,
150    pub file_source_failed_split_count: LabelGuardedIntCounterVec,
151
152    // kinesis source
153    pub kinesis_throughput_exceeded_count: LabelGuardedIntCounterVec,
154    pub kinesis_timeout_count: LabelGuardedIntCounterVec,
155    pub kinesis_rebuild_shard_iter_count: LabelGuardedIntCounterVec,
156    pub kinesis_early_terminate_shard_count: LabelGuardedIntCounterVec,
157    pub kinesis_lag_latency_ms: LabelGuardedHistogramVec,
158
159    /// Total ack failures (RPC errors and timeouts) during checkpoint for source connectors.
160    pub connector_ack_failure_count: IntCounterVec,
161}
162
163pub static GLOBAL_SOURCE_METRICS: LazyLock<SourceMetrics> =
164    LazyLock::new(|| SourceMetrics::new(&GLOBAL_METRICS_REGISTRY));
165
166impl SourceMetrics {
167    fn new(registry: &Registry) -> Self {
168        let partition_input_count = register_guarded_int_counter_vec_with_registry!(
169            "source_partition_input_count",
170            "Total number of rows that have been input from specific partition",
171            &[
172                "actor_id",
173                "source_id",
174                "partition",
175                "source_name",
176                "fragment_id"
177            ],
178            registry
179        )
180        .unwrap();
181        let partition_input_bytes = register_guarded_int_counter_vec_with_registry!(
182            "source_partition_input_bytes",
183            "Total bytes that have been input from specific partition",
184            &[
185                "actor_id",
186                "source_id",
187                "partition",
188                "source_name",
189                "fragment_id"
190            ],
191            registry
192        )
193        .unwrap();
194        let latest_message_id = register_guarded_int_gauge_vec_with_registry!(
195            "source_latest_message_id",
196            "Latest message id for a exec per partition",
197            &["source_id", "actor_id", "partition"],
198            registry,
199        )
200        .unwrap();
201        let partition_eof_count = register_guarded_int_counter_vec_with_registry!(
202            "source_partition_eof_count",
203            "Total number of EOF events received from specific partition",
204            &["source_id", "partition", "source_name", "fragment_id"],
205            registry
206        )
207        .unwrap();
208        let partition_eof_offset = register_guarded_int_gauge_vec_with_registry!(
209            "source_partition_eof_offset",
210            "Latest resolved EOF offset for specific partition",
211            &["source_id", "partition", "source_name", "fragment_id"],
212            registry
213        )
214        .unwrap();
215
216        let opts = histogram_opts!(
217            "source_cdc_event_lag_duration_milliseconds",
218            "source_cdc_lag_latency",
219            exponential_buckets(1.0, 2.0, 21).unwrap(), // max 1048s
220        );
221
222        let parquet_source_skip_row_count = register_guarded_int_counter_vec_with_registry!(
223            "parquet_source_skip_row_count",
224            "Total number of rows that have been set to null in parquet source",
225            &["actor_id", "source_id", "source_name", "fragment_id"],
226            registry
227        )
228        .unwrap();
229
230        let direct_cdc_event_lag_latency =
231            register_guarded_histogram_vec_with_registry!(opts, &["table_name"], registry).unwrap();
232
233        let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone()));
234
235        let file_source_input_row_count = register_guarded_int_counter_vec_with_registry!(
236            "file_source_input_row_count",
237            "Total number of rows that have been read in file source",
238            &["source_id", "source_name", "actor_id", "fragment_id"],
239            registry
240        )
241        .unwrap();
242        let file_source_dirty_split_count = register_guarded_int_gauge_vec_with_registry!(
243            "file_source_dirty_split_count",
244            "Current number of dirty file splits in file source",
245            &["source_id", "source_name", "actor_id", "fragment_id"],
246            registry
247        )
248        .unwrap();
249        let file_source_failed_split_count = register_guarded_int_counter_vec_with_registry!(
250            "file_source_failed_split_count",
251            "Total number of file splits marked dirty in file source",
252            &["source_id", "source_name", "actor_id", "fragment_id"],
253            registry
254        )
255        .unwrap();
256
257        let kinesis_throughput_exceeded_count = register_guarded_int_counter_vec_with_registry!(
258            "kinesis_throughput_exceeded_count",
259            "Total number of times throughput exceeded in kinesis source",
260            &["source_id", "source_name", "fragment_id", "shard_id"],
261            registry
262        )
263        .unwrap();
264
265        let kinesis_timeout_count = register_guarded_int_counter_vec_with_registry!(
266            "kinesis_timeout_count",
267            "Total number of times timeout in kinesis source",
268            &["source_id", "source_name", "fragment_id", "shard_id"],
269            registry
270        )
271        .unwrap();
272
273        let kinesis_rebuild_shard_iter_count = register_guarded_int_counter_vec_with_registry!(
274            "kinesis_rebuild_shard_iter_count",
275            "Total number of times rebuild shard iter in kinesis source",
276            &["source_id", "source_name", "fragment_id", "shard_id"],
277            registry
278        )
279        .unwrap();
280
281        let kinesis_early_terminate_shard_count = register_guarded_int_counter_vec_with_registry!(
282            "kinesis_early_terminate_shard_count",
283            "Total number of times early terminate shard in kinesis source",
284            &["source_id", "source_name", "fragment_id", "shard_id"],
285            registry
286        )
287        .unwrap();
288
289        let kinesis_lag_latency_ms = register_guarded_histogram_vec_with_registry!(
290            "kinesis_lag_latency_ms",
291            "Lag latency in kinesis source",
292            &["source_id", "source_name", "fragment_id", "shard_id"],
293            registry
294        )
295        .unwrap();
296
297        let connector_ack_failure_count = register_int_counter_vec_with_registry!(
298            "source_connector_ack_failure_count",
299            "Total number of ack failures during checkpoint for source connectors",
300            &["source_name", "connector_type", "error_type"],
301            registry
302        )
303        .unwrap();
304
305        SourceMetrics {
306            partition_input_count,
307            partition_input_bytes,
308            latest_message_id,
309            partition_eof_count,
310            partition_eof_offset,
311            rdkafka_native_metric,
312            direct_cdc_event_lag_latency,
313            parquet_source_skip_row_count,
314            file_source_input_row_count,
315            file_source_dirty_split_count,
316            file_source_failed_split_count,
317
318            kinesis_throughput_exceeded_count,
319            kinesis_timeout_count,
320            kinesis_rebuild_shard_iter_count,
321            kinesis_early_terminate_shard_count,
322            kinesis_lag_latency_ms,
323
324            connector_ack_failure_count,
325        }
326    }
327}
328
329impl Default for SourceMetrics {
330    fn default() -> Self {
331        GLOBAL_SOURCE_METRICS.clone()
332    }
333}