Skip to main content

risingwave_connector/source/monitor/
metrics.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, LazyLock};
16
17use prometheus::{
18    IntCounterVec, Registry, exponential_buckets, histogram_opts,
19    register_int_counter_vec_with_registry,
20};
21use risingwave_common::metrics::{
22    LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
23};
24use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
25use risingwave_common::{
26    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
27    register_guarded_int_gauge_vec_with_registry,
28};
29
30use crate::source::kafka::stats::RdKafkaStats;
31
32/// Low-cardinality connector ack failure categories.
33///
34/// Keep this list bounded. Do not add raw connector error messages, topics,
35/// partitions, or split identifiers as metric label values.
36#[derive(Debug, Clone, Copy)]
37pub enum ConnectorAckFailureType {
38    Error,
39    Timeout,
40    EmptyMessageId,
41    ChannelMissing,
42    ChannelSendError,
43    DecodeError,
44    BrokerError,
45}
46
47impl ConnectorAckFailureType {
48    pub fn as_str(self) -> &'static str {
49        match self {
50            Self::Error => "error",
51            Self::Timeout => "timeout",
52            Self::EmptyMessageId => "empty_message_id",
53            Self::ChannelMissing => "channel_missing",
54            Self::ChannelSendError => "channel_send_error",
55            Self::DecodeError => "decode_error",
56            Self::BrokerError => "broker_error",
57        }
58    }
59}
60
61#[derive(Debug, Clone)]
62pub struct EnumeratorMetrics {
63    pub high_watermark: LabelGuardedIntGaugeVec,
64    /// PostgreSQL CDC confirmed flush LSN monitoring
65    pub pg_cdc_confirmed_flush_lsn: LabelGuardedIntGaugeVec,
66    /// PostgreSQL CDC upstream max LSN monitoring
67    pub pg_cdc_upstream_max_lsn: LabelGuardedIntGaugeVec,
68    /// MySQL CDC binlog file sequence number (min)
69    pub mysql_cdc_binlog_file_seq_min: LabelGuardedIntGaugeVec,
70    /// MySQL CDC binlog file sequence number (max)
71    pub mysql_cdc_binlog_file_seq_max: LabelGuardedIntGaugeVec,
72    /// SQL Server CDC upstream minimum LSN
73    pub sqlserver_cdc_upstream_min_lsn: LabelGuardedIntGaugeVec,
74    /// SQL Server CDC upstream maximum LSN
75    pub sqlserver_cdc_upstream_max_lsn: LabelGuardedIntGaugeVec,
76}
77
78pub static GLOBAL_ENUMERATOR_METRICS: LazyLock<EnumeratorMetrics> =
79    LazyLock::new(|| EnumeratorMetrics::new(&GLOBAL_METRICS_REGISTRY));
80
81impl EnumeratorMetrics {
82    fn new(registry: &Registry) -> Self {
83        let high_watermark = register_guarded_int_gauge_vec_with_registry!(
84            "source_kafka_high_watermark",
85            "High watermark for a exec per partition",
86            &["source_id", "partition"],
87            registry,
88        )
89        .unwrap();
90
91        let pg_cdc_confirmed_flush_lsn = register_guarded_int_gauge_vec_with_registry!(
92            "pg_cdc_confirmed_flush_lsn",
93            "PostgreSQL CDC confirmed flush LSN",
94            &["source_id", "slot_name"],
95            registry,
96        )
97        .unwrap();
98
99        let pg_cdc_upstream_max_lsn = register_guarded_int_gauge_vec_with_registry!(
100            "pg_cdc_upstream_max_lsn",
101            "PostgreSQL CDC upstream max LSN (pg_current_wal_lsn)",
102            &["source_id", "slot_name"],
103            registry,
104        )
105        .unwrap();
106
107        let mysql_cdc_binlog_file_seq_min = register_guarded_int_gauge_vec_with_registry!(
108            "mysql_cdc_binlog_file_seq_min",
109            "MySQL CDC upstream binlog file sequence number (minimum/oldest)",
110            &["hostname", "port"],
111            registry,
112        )
113        .unwrap();
114
115        let mysql_cdc_binlog_file_seq_max = register_guarded_int_gauge_vec_with_registry!(
116            "mysql_cdc_binlog_file_seq_max",
117            "MySQL CDC upstream binlog file sequence number (maximum/newest)",
118            &["hostname", "port"],
119            registry,
120        )
121        .unwrap();
122
123        let sqlserver_cdc_upstream_min_lsn = register_guarded_int_gauge_vec_with_registry!(
124            "sqlserver_cdc_upstream_min_lsn",
125            "SQL Server CDC upstream minimum LSN",
126            &["source_id"],
127            registry,
128        )
129        .unwrap();
130
131        let sqlserver_cdc_upstream_max_lsn = register_guarded_int_gauge_vec_with_registry!(
132            "sqlserver_cdc_upstream_max_lsn",
133            "SQL Server CDC upstream maximum LSN",
134            &["source_id"],
135            registry,
136        )
137        .unwrap();
138
139        EnumeratorMetrics {
140            high_watermark,
141            pg_cdc_confirmed_flush_lsn,
142            pg_cdc_upstream_max_lsn,
143            mysql_cdc_binlog_file_seq_min,
144            mysql_cdc_binlog_file_seq_max,
145            sqlserver_cdc_upstream_min_lsn,
146            sqlserver_cdc_upstream_max_lsn,
147        }
148    }
149
150    pub fn unused() -> Self {
151        Default::default()
152    }
153}
154
155impl Default for EnumeratorMetrics {
156    fn default() -> Self {
157        GLOBAL_ENUMERATOR_METRICS.clone()
158    }
159}
160
161#[derive(Debug, Clone)]
162pub struct SourceMetrics {
163    pub partition_input_count: LabelGuardedIntCounterVec,
164
165    // **Note**: for normal messages, the metric is the message's payload size.
166    // For messages from load generator, the metric is the size of stream chunk.
167    pub partition_input_bytes: LabelGuardedIntCounterVec,
168    /// Report latest message id
169    pub latest_message_id: LabelGuardedIntGaugeVec,
170    pub partition_eof_count: LabelGuardedIntCounterVec,
171    pub partition_eof_offset: LabelGuardedIntGaugeVec,
172    pub rdkafka_native_metric: Arc<RdKafkaStats>,
173
174    pub direct_cdc_event_lag_latency: LabelGuardedHistogramVec,
175
176    pub parquet_source_skip_row_count: LabelGuardedIntCounterVec,
177    pub file_source_input_row_count: LabelGuardedIntCounterVec,
178    pub file_source_dirty_split_count: LabelGuardedIntGaugeVec,
179    pub file_source_failed_split_count: LabelGuardedIntCounterVec,
180
181    // kinesis source
182    pub kinesis_throughput_exceeded_count: LabelGuardedIntCounterVec,
183    pub kinesis_timeout_count: LabelGuardedIntCounterVec,
184    pub kinesis_rebuild_shard_iter_count: LabelGuardedIntCounterVec,
185    pub kinesis_early_terminate_shard_count: LabelGuardedIntCounterVec,
186    pub kinesis_lag_latency_ms: LabelGuardedHistogramVec,
187
188    /// Total connector ack failures after checkpoint commit by bounded failure category.
189    connector_ack_failure_count: IntCounterVec,
190    /// Total successful connector acks after checkpoint commit.
191    connector_ack_success_count: IntCounterVec,
192}
193
194pub static GLOBAL_SOURCE_METRICS: LazyLock<SourceMetrics> =
195    LazyLock::new(|| SourceMetrics::new(&GLOBAL_METRICS_REGISTRY));
196
197impl SourceMetrics {
198    pub fn inc_connector_ack_failure_count(
199        &self,
200        source_name: &str,
201        connector_type: &'static str,
202        failure_type: ConnectorAckFailureType,
203    ) {
204        self.connector_ack_failure_count
205            .with_label_values(&[source_name, connector_type, failure_type.as_str()])
206            .inc();
207    }
208
209    pub fn inc_connector_ack_success_count(&self, source_name: &str, connector_type: &'static str) {
210        self.connector_ack_success_count
211            .with_label_values(&[source_name, connector_type])
212            .inc();
213    }
214
215    fn new(registry: &Registry) -> Self {
216        let partition_input_count = register_guarded_int_counter_vec_with_registry!(
217            "source_partition_input_count",
218            "Total number of rows that have been input from specific partition",
219            &[
220                "actor_id",
221                "source_id",
222                "partition",
223                "source_name",
224                "fragment_id"
225            ],
226            registry
227        )
228        .unwrap();
229        let partition_input_bytes = register_guarded_int_counter_vec_with_registry!(
230            "source_partition_input_bytes",
231            "Total bytes that have been input from specific partition",
232            &[
233                "actor_id",
234                "source_id",
235                "partition",
236                "source_name",
237                "fragment_id"
238            ],
239            registry
240        )
241        .unwrap();
242        let latest_message_id = register_guarded_int_gauge_vec_with_registry!(
243            "source_latest_message_id",
244            "Latest message id for a exec per partition",
245            &["source_id", "actor_id", "partition"],
246            registry,
247        )
248        .unwrap();
249        let partition_eof_count = register_guarded_int_counter_vec_with_registry!(
250            "source_partition_eof_count",
251            "Total number of EOF events received from specific partition",
252            &["source_id", "partition", "source_name", "fragment_id"],
253            registry
254        )
255        .unwrap();
256        let partition_eof_offset = register_guarded_int_gauge_vec_with_registry!(
257            "source_partition_eof_offset",
258            "Latest resolved EOF offset for specific partition",
259            &["source_id", "partition", "source_name", "fragment_id"],
260            registry
261        )
262        .unwrap();
263
264        let opts = histogram_opts!(
265            "source_cdc_event_lag_duration_milliseconds",
266            "source_cdc_lag_latency",
267            exponential_buckets(1.0, 2.0, 21).unwrap(), // max 1048s
268        );
269
270        let parquet_source_skip_row_count = register_guarded_int_counter_vec_with_registry!(
271            "parquet_source_skip_row_count",
272            "Total number of rows that have been set to null in parquet source",
273            &["actor_id", "source_id", "source_name", "fragment_id"],
274            registry
275        )
276        .unwrap();
277
278        let direct_cdc_event_lag_latency =
279            register_guarded_histogram_vec_with_registry!(opts, &["table_name"], registry).unwrap();
280
281        let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone()));
282
283        let file_source_input_row_count = register_guarded_int_counter_vec_with_registry!(
284            "file_source_input_row_count",
285            "Total number of rows that have been read in file source",
286            &["source_id", "source_name", "actor_id", "fragment_id"],
287            registry
288        )
289        .unwrap();
290        let file_source_dirty_split_count = register_guarded_int_gauge_vec_with_registry!(
291            "file_source_dirty_split_count",
292            "Current number of dirty file splits in file source",
293            &["source_id", "source_name", "actor_id", "fragment_id"],
294            registry
295        )
296        .unwrap();
297        let file_source_failed_split_count = register_guarded_int_counter_vec_with_registry!(
298            "file_source_failed_split_count",
299            "Total number of file splits marked dirty in file source",
300            &["source_id", "source_name", "actor_id", "fragment_id"],
301            registry
302        )
303        .unwrap();
304
305        let kinesis_throughput_exceeded_count = register_guarded_int_counter_vec_with_registry!(
306            "kinesis_throughput_exceeded_count",
307            "Total number of times throughput exceeded in kinesis source",
308            &["source_id", "source_name", "fragment_id", "shard_id"],
309            registry
310        )
311        .unwrap();
312
313        let kinesis_timeout_count = register_guarded_int_counter_vec_with_registry!(
314            "kinesis_timeout_count",
315            "Total number of times timeout in kinesis source",
316            &["source_id", "source_name", "fragment_id", "shard_id"],
317            registry
318        )
319        .unwrap();
320
321        let kinesis_rebuild_shard_iter_count = register_guarded_int_counter_vec_with_registry!(
322            "kinesis_rebuild_shard_iter_count",
323            "Total number of times rebuild shard iter in kinesis source",
324            &["source_id", "source_name", "fragment_id", "shard_id"],
325            registry
326        )
327        .unwrap();
328
329        let kinesis_early_terminate_shard_count = register_guarded_int_counter_vec_with_registry!(
330            "kinesis_early_terminate_shard_count",
331            "Total number of times early terminate shard in kinesis source",
332            &["source_id", "source_name", "fragment_id", "shard_id"],
333            registry
334        )
335        .unwrap();
336
337        let kinesis_lag_latency_ms = register_guarded_histogram_vec_with_registry!(
338            "kinesis_lag_latency_ms",
339            "Lag latency in kinesis source",
340            &["source_id", "source_name", "fragment_id", "shard_id"],
341            registry
342        )
343        .unwrap();
344
345        let connector_ack_failure_count = register_int_counter_vec_with_registry!(
346            "source_connector_ack_failure_count",
347            "Total number of connector ack failures after checkpoint commit by bounded failure category",
348            &["source_name", "connector_type", "error_type"],
349            registry
350        )
351        .unwrap();
352        let connector_ack_success_count = register_int_counter_vec_with_registry!(
353            "source_connector_ack_success_count",
354            "Total number of successful connector acks after checkpoint commit",
355            &["source_name", "connector_type"],
356            registry
357        )
358        .unwrap();
359
360        SourceMetrics {
361            partition_input_count,
362            partition_input_bytes,
363            latest_message_id,
364            partition_eof_count,
365            partition_eof_offset,
366            rdkafka_native_metric,
367            direct_cdc_event_lag_latency,
368            parquet_source_skip_row_count,
369            file_source_input_row_count,
370            file_source_dirty_split_count,
371            file_source_failed_split_count,
372
373            kinesis_throughput_exceeded_count,
374            kinesis_timeout_count,
375            kinesis_rebuild_shard_iter_count,
376            kinesis_early_terminate_shard_count,
377            kinesis_lag_latency_ms,
378
379            connector_ack_failure_count,
380            connector_ack_success_count,
381        }
382    }
383}
384
385impl Default for SourceMetrics {
386    fn default() -> Self {
387        GLOBAL_SOURCE_METRICS.clone()
388    }
389}