risingwave_connector/source/monitor/
metrics.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, LazyLock};
16
17use prometheus::{Registry, exponential_buckets, histogram_opts};
18use risingwave_common::metrics::{
19    LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
20};
21use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
22use risingwave_common::{
23    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
24    register_guarded_int_gauge_vec_with_registry,
25};
26
27use crate::source::kafka::stats::RdKafkaStats;
28
29#[derive(Debug, Clone)]
30pub struct EnumeratorMetrics {
31    pub high_watermark: LabelGuardedIntGaugeVec,
32    /// PostgreSQL CDC confirmed flush LSN monitoring
33    pub pg_cdc_confirmed_flush_lsn: LabelGuardedIntGaugeVec,
34    /// MySQL CDC binlog file sequence number (min)
35    pub mysql_cdc_binlog_file_seq_min: LabelGuardedIntGaugeVec,
36    /// MySQL CDC binlog file sequence number (max)
37    pub mysql_cdc_binlog_file_seq_max: LabelGuardedIntGaugeVec,
38}
39
40pub static GLOBAL_ENUMERATOR_METRICS: LazyLock<EnumeratorMetrics> =
41    LazyLock::new(|| EnumeratorMetrics::new(&GLOBAL_METRICS_REGISTRY));
42
43impl EnumeratorMetrics {
44    fn new(registry: &Registry) -> Self {
45        let high_watermark = register_guarded_int_gauge_vec_with_registry!(
46            "source_kafka_high_watermark",
47            "High watermark for a exec per partition",
48            &["source_id", "partition"],
49            registry,
50        )
51        .unwrap();
52
53        let pg_cdc_confirmed_flush_lsn = register_guarded_int_gauge_vec_with_registry!(
54            "pg_cdc_confirmed_flush_lsn",
55            "PostgreSQL CDC confirmed flush LSN",
56            &["source_id", "slot_name"],
57            registry,
58        )
59        .unwrap();
60
61        let mysql_cdc_binlog_file_seq_min = register_guarded_int_gauge_vec_with_registry!(
62            "mysql_cdc_binlog_file_seq_min",
63            "MySQL CDC upstream binlog file sequence number (minimum/oldest)",
64            &["hostname", "port"],
65            registry,
66        )
67        .unwrap();
68
69        let mysql_cdc_binlog_file_seq_max = register_guarded_int_gauge_vec_with_registry!(
70            "mysql_cdc_binlog_file_seq_max",
71            "MySQL CDC upstream binlog file sequence number (maximum/newest)",
72            &["hostname", "port"],
73            registry,
74        )
75        .unwrap();
76
77        EnumeratorMetrics {
78            high_watermark,
79            pg_cdc_confirmed_flush_lsn,
80            mysql_cdc_binlog_file_seq_min,
81            mysql_cdc_binlog_file_seq_max,
82        }
83    }
84
85    pub fn unused() -> Self {
86        Default::default()
87    }
88}
89
90impl Default for EnumeratorMetrics {
91    fn default() -> Self {
92        GLOBAL_ENUMERATOR_METRICS.clone()
93    }
94}
95
96#[derive(Debug, Clone)]
97pub struct SourceMetrics {
98    pub partition_input_count: LabelGuardedIntCounterVec,
99
100    // **Note**: for normal messages, the metric is the message's payload size.
101    // For messages from load generator, the metric is the size of stream chunk.
102    pub partition_input_bytes: LabelGuardedIntCounterVec,
103    /// Report latest message id
104    pub latest_message_id: LabelGuardedIntGaugeVec,
105    pub rdkafka_native_metric: Arc<RdKafkaStats>,
106
107    pub direct_cdc_event_lag_latency: LabelGuardedHistogramVec,
108
109    pub parquet_source_skip_row_count: LabelGuardedIntCounterVec,
110    pub file_source_input_row_count: LabelGuardedIntCounterVec,
111
112    // kinesis source
113    pub kinesis_throughput_exceeded_count: LabelGuardedIntCounterVec,
114    pub kinesis_timeout_count: LabelGuardedIntCounterVec,
115    pub kinesis_rebuild_shard_iter_count: LabelGuardedIntCounterVec,
116    pub kinesis_early_terminate_shard_count: LabelGuardedIntCounterVec,
117    pub kinesis_lag_latency_ms: LabelGuardedHistogramVec,
118}
119
120pub static GLOBAL_SOURCE_METRICS: LazyLock<SourceMetrics> =
121    LazyLock::new(|| SourceMetrics::new(&GLOBAL_METRICS_REGISTRY));
122
123impl SourceMetrics {
124    fn new(registry: &Registry) -> Self {
125        let partition_input_count = register_guarded_int_counter_vec_with_registry!(
126            "source_partition_input_count",
127            "Total number of rows that have been input from specific partition",
128            &[
129                "actor_id",
130                "source_id",
131                "partition",
132                "source_name",
133                "fragment_id"
134            ],
135            registry
136        )
137        .unwrap();
138        let partition_input_bytes = register_guarded_int_counter_vec_with_registry!(
139            "source_partition_input_bytes",
140            "Total bytes that have been input from specific partition",
141            &[
142                "actor_id",
143                "source_id",
144                "partition",
145                "source_name",
146                "fragment_id"
147            ],
148            registry
149        )
150        .unwrap();
151        let latest_message_id = register_guarded_int_gauge_vec_with_registry!(
152            "source_latest_message_id",
153            "Latest message id for a exec per partition",
154            &["source_id", "actor_id", "partition"],
155            registry,
156        )
157        .unwrap();
158
159        let opts = histogram_opts!(
160            "source_cdc_event_lag_duration_milliseconds",
161            "source_cdc_lag_latency",
162            exponential_buckets(1.0, 2.0, 21).unwrap(), // max 1048s
163        );
164
165        let parquet_source_skip_row_count = register_guarded_int_counter_vec_with_registry!(
166            "parquet_source_skip_row_count",
167            "Total number of rows that have been set to null in parquet source",
168            &["actor_id", "source_id", "source_name", "fragment_id"],
169            registry
170        )
171        .unwrap();
172
173        let direct_cdc_event_lag_latency =
174            register_guarded_histogram_vec_with_registry!(opts, &["table_name"], registry).unwrap();
175
176        let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone()));
177
178        let file_source_input_row_count = register_guarded_int_counter_vec_with_registry!(
179            "file_source_input_row_count",
180            "Total number of rows that have been read in file source",
181            &["source_id", "source_name", "actor_id", "fragment_id"],
182            registry
183        )
184        .unwrap();
185
186        let kinesis_throughput_exceeded_count = register_guarded_int_counter_vec_with_registry!(
187            "kinesis_throughput_exceeded_count",
188            "Total number of times throughput exceeded in kinesis source",
189            &["source_id", "source_name", "fragment_id", "shard_id"],
190            registry
191        )
192        .unwrap();
193
194        let kinesis_timeout_count = register_guarded_int_counter_vec_with_registry!(
195            "kinesis_timeout_count",
196            "Total number of times timeout in kinesis source",
197            &["source_id", "source_name", "fragment_id", "shard_id"],
198            registry
199        )
200        .unwrap();
201
202        let kinesis_rebuild_shard_iter_count = register_guarded_int_counter_vec_with_registry!(
203            "kinesis_rebuild_shard_iter_count",
204            "Total number of times rebuild shard iter in kinesis source",
205            &["source_id", "source_name", "fragment_id", "shard_id"],
206            registry
207        )
208        .unwrap();
209
210        let kinesis_early_terminate_shard_count = register_guarded_int_counter_vec_with_registry!(
211            "kinesis_early_terminate_shard_count",
212            "Total number of times early terminate shard in kinesis source",
213            &["source_id", "source_name", "fragment_id", "shard_id"],
214            registry
215        )
216        .unwrap();
217
218        let kinesis_lag_latency_ms = register_guarded_histogram_vec_with_registry!(
219            "kinesis_lag_latency_ms",
220            "Lag latency in kinesis source",
221            &["source_id", "source_name", "fragment_id", "shard_id"],
222            registry
223        )
224        .unwrap();
225
226        SourceMetrics {
227            partition_input_count,
228            partition_input_bytes,
229            latest_message_id,
230            rdkafka_native_metric,
231            direct_cdc_event_lag_latency,
232            parquet_source_skip_row_count,
233            file_source_input_row_count,
234
235            kinesis_throughput_exceeded_count,
236            kinesis_timeout_count,
237            kinesis_rebuild_shard_iter_count,
238            kinesis_early_terminate_shard_count,
239            kinesis_lag_latency_ms,
240        }
241    }
242}
243
244impl Default for SourceMetrics {
245    fn default() -> Self {
246        GLOBAL_SOURCE_METRICS.clone()
247    }
248}