risingwave_connector/source/monitor/
metrics.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, LazyLock};
16
17use prometheus::{
18    IntCounterVec, Registry, exponential_buckets, histogram_opts,
19    register_int_counter_vec_with_registry,
20};
21use risingwave_common::metrics::{
22    LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
23};
24use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
25use risingwave_common::{
26    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
27    register_guarded_int_gauge_vec_with_registry,
28};
29
30use crate::source::kafka::stats::RdKafkaStats;
31
32#[derive(Debug, Clone)]
33pub struct EnumeratorMetrics {
34    pub high_watermark: LabelGuardedIntGaugeVec,
35    /// PostgreSQL CDC confirmed flush LSN monitoring
36    pub pg_cdc_confirmed_flush_lsn: LabelGuardedIntGaugeVec,
37    /// PostgreSQL CDC upstream max LSN monitoring
38    pub pg_cdc_upstream_max_lsn: LabelGuardedIntGaugeVec,
39    /// MySQL CDC binlog file sequence number (min)
40    pub mysql_cdc_binlog_file_seq_min: LabelGuardedIntGaugeVec,
41    /// MySQL CDC binlog file sequence number (max)
42    pub mysql_cdc_binlog_file_seq_max: LabelGuardedIntGaugeVec,
43    /// SQL Server CDC upstream minimum LSN
44    pub sqlserver_cdc_upstream_min_lsn: LabelGuardedIntGaugeVec,
45    /// SQL Server CDC upstream maximum LSN
46    pub sqlserver_cdc_upstream_max_lsn: LabelGuardedIntGaugeVec,
47}
48
49pub static GLOBAL_ENUMERATOR_METRICS: LazyLock<EnumeratorMetrics> =
50    LazyLock::new(|| EnumeratorMetrics::new(&GLOBAL_METRICS_REGISTRY));
51
52impl EnumeratorMetrics {
53    fn new(registry: &Registry) -> Self {
54        let high_watermark = register_guarded_int_gauge_vec_with_registry!(
55            "source_kafka_high_watermark",
56            "High watermark for a exec per partition",
57            &["source_id", "partition"],
58            registry,
59        )
60        .unwrap();
61
62        let pg_cdc_confirmed_flush_lsn = register_guarded_int_gauge_vec_with_registry!(
63            "pg_cdc_confirmed_flush_lsn",
64            "PostgreSQL CDC confirmed flush LSN",
65            &["source_id", "slot_name"],
66            registry,
67        )
68        .unwrap();
69
70        let pg_cdc_upstream_max_lsn = register_guarded_int_gauge_vec_with_registry!(
71            "pg_cdc_upstream_max_lsn",
72            "PostgreSQL CDC upstream max LSN (pg_current_wal_lsn)",
73            &["source_id", "slot_name"],
74            registry,
75        )
76        .unwrap();
77
78        let mysql_cdc_binlog_file_seq_min = register_guarded_int_gauge_vec_with_registry!(
79            "mysql_cdc_binlog_file_seq_min",
80            "MySQL CDC upstream binlog file sequence number (minimum/oldest)",
81            &["hostname", "port"],
82            registry,
83        )
84        .unwrap();
85
86        let mysql_cdc_binlog_file_seq_max = register_guarded_int_gauge_vec_with_registry!(
87            "mysql_cdc_binlog_file_seq_max",
88            "MySQL CDC upstream binlog file sequence number (maximum/newest)",
89            &["hostname", "port"],
90            registry,
91        )
92        .unwrap();
93
94        let sqlserver_cdc_upstream_min_lsn = register_guarded_int_gauge_vec_with_registry!(
95            "sqlserver_cdc_upstream_min_lsn",
96            "SQL Server CDC upstream minimum LSN",
97            &["source_id"],
98            registry,
99        )
100        .unwrap();
101
102        let sqlserver_cdc_upstream_max_lsn = register_guarded_int_gauge_vec_with_registry!(
103            "sqlserver_cdc_upstream_max_lsn",
104            "SQL Server CDC upstream maximum LSN",
105            &["source_id"],
106            registry,
107        )
108        .unwrap();
109
110        EnumeratorMetrics {
111            high_watermark,
112            pg_cdc_confirmed_flush_lsn,
113            pg_cdc_upstream_max_lsn,
114            mysql_cdc_binlog_file_seq_min,
115            mysql_cdc_binlog_file_seq_max,
116            sqlserver_cdc_upstream_min_lsn,
117            sqlserver_cdc_upstream_max_lsn,
118        }
119    }
120
121    pub fn unused() -> Self {
122        Default::default()
123    }
124}
125
126impl Default for EnumeratorMetrics {
127    fn default() -> Self {
128        GLOBAL_ENUMERATOR_METRICS.clone()
129    }
130}
131
132#[derive(Debug, Clone)]
133pub struct SourceMetrics {
134    pub partition_input_count: LabelGuardedIntCounterVec,
135
136    // **Note**: for normal messages, the metric is the message's payload size.
137    // For messages from load generator, the metric is the size of stream chunk.
138    pub partition_input_bytes: LabelGuardedIntCounterVec,
139    /// Report latest message id
140    pub latest_message_id: LabelGuardedIntGaugeVec,
141    pub rdkafka_native_metric: Arc<RdKafkaStats>,
142
143    pub direct_cdc_event_lag_latency: LabelGuardedHistogramVec,
144
145    pub parquet_source_skip_row_count: LabelGuardedIntCounterVec,
146    pub file_source_input_row_count: LabelGuardedIntCounterVec,
147    pub file_source_dirty_split_count: LabelGuardedIntGaugeVec,
148    pub file_source_failed_split_count: LabelGuardedIntCounterVec,
149
150    // kinesis source
151    pub kinesis_throughput_exceeded_count: LabelGuardedIntCounterVec,
152    pub kinesis_timeout_count: LabelGuardedIntCounterVec,
153    pub kinesis_rebuild_shard_iter_count: LabelGuardedIntCounterVec,
154    pub kinesis_early_terminate_shard_count: LabelGuardedIntCounterVec,
155    pub kinesis_lag_latency_ms: LabelGuardedHistogramVec,
156
157    /// Total ack failures (RPC errors and timeouts) during checkpoint for source connectors.
158    pub connector_ack_failure_count: IntCounterVec,
159}
160
161pub static GLOBAL_SOURCE_METRICS: LazyLock<SourceMetrics> =
162    LazyLock::new(|| SourceMetrics::new(&GLOBAL_METRICS_REGISTRY));
163
164impl SourceMetrics {
165    fn new(registry: &Registry) -> Self {
166        let partition_input_count = register_guarded_int_counter_vec_with_registry!(
167            "source_partition_input_count",
168            "Total number of rows that have been input from specific partition",
169            &[
170                "actor_id",
171                "source_id",
172                "partition",
173                "source_name",
174                "fragment_id"
175            ],
176            registry
177        )
178        .unwrap();
179        let partition_input_bytes = register_guarded_int_counter_vec_with_registry!(
180            "source_partition_input_bytes",
181            "Total bytes that have been input from specific partition",
182            &[
183                "actor_id",
184                "source_id",
185                "partition",
186                "source_name",
187                "fragment_id"
188            ],
189            registry
190        )
191        .unwrap();
192        let latest_message_id = register_guarded_int_gauge_vec_with_registry!(
193            "source_latest_message_id",
194            "Latest message id for a exec per partition",
195            &["source_id", "actor_id", "partition"],
196            registry,
197        )
198        .unwrap();
199
200        let opts = histogram_opts!(
201            "source_cdc_event_lag_duration_milliseconds",
202            "source_cdc_lag_latency",
203            exponential_buckets(1.0, 2.0, 21).unwrap(), // max 1048s
204        );
205
206        let parquet_source_skip_row_count = register_guarded_int_counter_vec_with_registry!(
207            "parquet_source_skip_row_count",
208            "Total number of rows that have been set to null in parquet source",
209            &["actor_id", "source_id", "source_name", "fragment_id"],
210            registry
211        )
212        .unwrap();
213
214        let direct_cdc_event_lag_latency =
215            register_guarded_histogram_vec_with_registry!(opts, &["table_name"], registry).unwrap();
216
217        let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone()));
218
219        let file_source_input_row_count = register_guarded_int_counter_vec_with_registry!(
220            "file_source_input_row_count",
221            "Total number of rows that have been read in file source",
222            &["source_id", "source_name", "actor_id", "fragment_id"],
223            registry
224        )
225        .unwrap();
226        let file_source_dirty_split_count = register_guarded_int_gauge_vec_with_registry!(
227            "file_source_dirty_split_count",
228            "Current number of dirty file splits in file source",
229            &["source_id", "source_name", "actor_id", "fragment_id"],
230            registry
231        )
232        .unwrap();
233        let file_source_failed_split_count = register_guarded_int_counter_vec_with_registry!(
234            "file_source_failed_split_count",
235            "Total number of file splits marked dirty in file source",
236            &["source_id", "source_name", "actor_id", "fragment_id"],
237            registry
238        )
239        .unwrap();
240
241        let kinesis_throughput_exceeded_count = register_guarded_int_counter_vec_with_registry!(
242            "kinesis_throughput_exceeded_count",
243            "Total number of times throughput exceeded in kinesis source",
244            &["source_id", "source_name", "fragment_id", "shard_id"],
245            registry
246        )
247        .unwrap();
248
249        let kinesis_timeout_count = register_guarded_int_counter_vec_with_registry!(
250            "kinesis_timeout_count",
251            "Total number of times timeout in kinesis source",
252            &["source_id", "source_name", "fragment_id", "shard_id"],
253            registry
254        )
255        .unwrap();
256
257        let kinesis_rebuild_shard_iter_count = register_guarded_int_counter_vec_with_registry!(
258            "kinesis_rebuild_shard_iter_count",
259            "Total number of times rebuild shard iter in kinesis source",
260            &["source_id", "source_name", "fragment_id", "shard_id"],
261            registry
262        )
263        .unwrap();
264
265        let kinesis_early_terminate_shard_count = register_guarded_int_counter_vec_with_registry!(
266            "kinesis_early_terminate_shard_count",
267            "Total number of times early terminate shard in kinesis source",
268            &["source_id", "source_name", "fragment_id", "shard_id"],
269            registry
270        )
271        .unwrap();
272
273        let kinesis_lag_latency_ms = register_guarded_histogram_vec_with_registry!(
274            "kinesis_lag_latency_ms",
275            "Lag latency in kinesis source",
276            &["source_id", "source_name", "fragment_id", "shard_id"],
277            registry
278        )
279        .unwrap();
280
281        let connector_ack_failure_count = register_int_counter_vec_with_registry!(
282            "source_connector_ack_failure_count",
283            "Total number of ack failures during checkpoint for source connectors",
284            &["source_name", "connector_type", "error_type"],
285            registry
286        )
287        .unwrap();
288
289        SourceMetrics {
290            partition_input_count,
291            partition_input_bytes,
292            latest_message_id,
293            rdkafka_native_metric,
294            direct_cdc_event_lag_latency,
295            parquet_source_skip_row_count,
296            file_source_input_row_count,
297            file_source_dirty_split_count,
298            file_source_failed_split_count,
299
300            kinesis_throughput_exceeded_count,
301            kinesis_timeout_count,
302            kinesis_rebuild_shard_iter_count,
303            kinesis_early_terminate_shard_count,
304            kinesis_lag_latency_ms,
305
306            connector_ack_failure_count,
307        }
308    }
309}
310
311impl Default for SourceMetrics {
312    fn default() -> Self {
313        GLOBAL_SOURCE_METRICS.clone()
314    }
315}