risingwave_connector/source/monitor/
metrics.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, LazyLock};
16
17use prometheus::{Registry, exponential_buckets, histogram_opts};
18use risingwave_common::metrics::{
19    LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
20};
21use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
22use risingwave_common::{
23    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
24    register_guarded_int_gauge_vec_with_registry,
25};
26
27use crate::source::kafka::stats::RdKafkaStats;
28
29#[derive(Debug, Clone)]
30pub struct EnumeratorMetrics {
31    pub high_watermark: LabelGuardedIntGaugeVec,
32    /// PostgreSQL CDC confirmed flush LSN monitoring
33    pub pg_cdc_confirmed_flush_lsn: LabelGuardedIntGaugeVec,
34    /// MySQL CDC binlog file sequence number (min)
35    pub mysql_cdc_binlog_file_seq_min: LabelGuardedIntGaugeVec,
36    /// MySQL CDC binlog file sequence number (max)
37    pub mysql_cdc_binlog_file_seq_max: LabelGuardedIntGaugeVec,
38}
39
40pub static GLOBAL_ENUMERATOR_METRICS: LazyLock<EnumeratorMetrics> =
41    LazyLock::new(|| EnumeratorMetrics::new(&GLOBAL_METRICS_REGISTRY));
42
43impl EnumeratorMetrics {
44    fn new(registry: &Registry) -> Self {
45        let high_watermark = register_guarded_int_gauge_vec_with_registry!(
46            "source_kafka_high_watermark",
47            "High watermark for a exec per partition",
48            &["source_id", "partition"],
49            registry,
50        )
51        .unwrap();
52
53        let pg_cdc_confirmed_flush_lsn = register_guarded_int_gauge_vec_with_registry!(
54            "pg_cdc_confirmed_flush_lsn",
55            "PostgreSQL CDC confirmed flush LSN",
56            &["source_id", "slot_name"],
57            registry,
58        )
59        .unwrap();
60
61        let mysql_cdc_binlog_file_seq_min = register_guarded_int_gauge_vec_with_registry!(
62            "mysql_cdc_binlog_file_seq_min",
63            "MySQL CDC upstream binlog file sequence number (minimum/oldest)",
64            &["hostname", "port"],
65            registry,
66        )
67        .unwrap();
68
69        let mysql_cdc_binlog_file_seq_max = register_guarded_int_gauge_vec_with_registry!(
70            "mysql_cdc_binlog_file_seq_max",
71            "MySQL CDC upstream binlog file sequence number (maximum/newest)",
72            &["hostname", "port"],
73            registry,
74        )
75        .unwrap();
76
77        EnumeratorMetrics {
78            high_watermark,
79            pg_cdc_confirmed_flush_lsn,
80            mysql_cdc_binlog_file_seq_min,
81            mysql_cdc_binlog_file_seq_max,
82        }
83    }
84
85    pub fn unused() -> Self {
86        Default::default()
87    }
88}
89
90impl Default for EnumeratorMetrics {
91    fn default() -> Self {
92        GLOBAL_ENUMERATOR_METRICS.clone()
93    }
94}
95
96#[derive(Debug, Clone)]
97pub struct SourceMetrics {
98    pub partition_input_count: LabelGuardedIntCounterVec,
99
100    // **Note**: for normal messages, the metric is the message's payload size.
101    // For messages from load generator, the metric is the size of stream chunk.
102    pub partition_input_bytes: LabelGuardedIntCounterVec,
103    /// Report latest message id
104    pub latest_message_id: LabelGuardedIntGaugeVec,
105    pub rdkafka_native_metric: Arc<RdKafkaStats>,
106
107    pub direct_cdc_event_lag_latency: LabelGuardedHistogramVec,
108
109    pub parquet_source_skip_row_count: LabelGuardedIntCounterVec,
110    pub file_source_input_row_count: LabelGuardedIntCounterVec,
111    pub file_source_dirty_split_count: LabelGuardedIntGaugeVec,
112    pub file_source_failed_split_count: LabelGuardedIntCounterVec,
113
114    // kinesis source
115    pub kinesis_throughput_exceeded_count: LabelGuardedIntCounterVec,
116    pub kinesis_timeout_count: LabelGuardedIntCounterVec,
117    pub kinesis_rebuild_shard_iter_count: LabelGuardedIntCounterVec,
118    pub kinesis_early_terminate_shard_count: LabelGuardedIntCounterVec,
119    pub kinesis_lag_latency_ms: LabelGuardedHistogramVec,
120}
121
122pub static GLOBAL_SOURCE_METRICS: LazyLock<SourceMetrics> =
123    LazyLock::new(|| SourceMetrics::new(&GLOBAL_METRICS_REGISTRY));
124
125impl SourceMetrics {
126    fn new(registry: &Registry) -> Self {
127        let partition_input_count = register_guarded_int_counter_vec_with_registry!(
128            "source_partition_input_count",
129            "Total number of rows that have been input from specific partition",
130            &[
131                "actor_id",
132                "source_id",
133                "partition",
134                "source_name",
135                "fragment_id"
136            ],
137            registry
138        )
139        .unwrap();
140        let partition_input_bytes = register_guarded_int_counter_vec_with_registry!(
141            "source_partition_input_bytes",
142            "Total bytes that have been input from specific partition",
143            &[
144                "actor_id",
145                "source_id",
146                "partition",
147                "source_name",
148                "fragment_id"
149            ],
150            registry
151        )
152        .unwrap();
153        let latest_message_id = register_guarded_int_gauge_vec_with_registry!(
154            "source_latest_message_id",
155            "Latest message id for a exec per partition",
156            &["source_id", "actor_id", "partition"],
157            registry,
158        )
159        .unwrap();
160
161        let opts = histogram_opts!(
162            "source_cdc_event_lag_duration_milliseconds",
163            "source_cdc_lag_latency",
164            exponential_buckets(1.0, 2.0, 21).unwrap(), // max 1048s
165        );
166
167        let parquet_source_skip_row_count = register_guarded_int_counter_vec_with_registry!(
168            "parquet_source_skip_row_count",
169            "Total number of rows that have been set to null in parquet source",
170            &["actor_id", "source_id", "source_name", "fragment_id"],
171            registry
172        )
173        .unwrap();
174
175        let direct_cdc_event_lag_latency =
176            register_guarded_histogram_vec_with_registry!(opts, &["table_name"], registry).unwrap();
177
178        let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone()));
179
180        let file_source_input_row_count = register_guarded_int_counter_vec_with_registry!(
181            "file_source_input_row_count",
182            "Total number of rows that have been read in file source",
183            &["source_id", "source_name", "actor_id", "fragment_id"],
184            registry
185        )
186        .unwrap();
187        let file_source_dirty_split_count = register_guarded_int_gauge_vec_with_registry!(
188            "file_source_dirty_split_count",
189            "Current number of dirty file splits in file source",
190            &["source_id", "source_name", "actor_id", "fragment_id"],
191            registry
192        )
193        .unwrap();
194        let file_source_failed_split_count = register_guarded_int_counter_vec_with_registry!(
195            "file_source_failed_split_count",
196            "Total number of file splits marked dirty in file source",
197            &["source_id", "source_name", "actor_id", "fragment_id"],
198            registry
199        )
200        .unwrap();
201
202        let kinesis_throughput_exceeded_count = register_guarded_int_counter_vec_with_registry!(
203            "kinesis_throughput_exceeded_count",
204            "Total number of times throughput exceeded in kinesis source",
205            &["source_id", "source_name", "fragment_id", "shard_id"],
206            registry
207        )
208        .unwrap();
209
210        let kinesis_timeout_count = register_guarded_int_counter_vec_with_registry!(
211            "kinesis_timeout_count",
212            "Total number of times timeout in kinesis source",
213            &["source_id", "source_name", "fragment_id", "shard_id"],
214            registry
215        )
216        .unwrap();
217
218        let kinesis_rebuild_shard_iter_count = register_guarded_int_counter_vec_with_registry!(
219            "kinesis_rebuild_shard_iter_count",
220            "Total number of times rebuild shard iter in kinesis source",
221            &["source_id", "source_name", "fragment_id", "shard_id"],
222            registry
223        )
224        .unwrap();
225
226        let kinesis_early_terminate_shard_count = register_guarded_int_counter_vec_with_registry!(
227            "kinesis_early_terminate_shard_count",
228            "Total number of times early terminate shard in kinesis source",
229            &["source_id", "source_name", "fragment_id", "shard_id"],
230            registry
231        )
232        .unwrap();
233
234        let kinesis_lag_latency_ms = register_guarded_histogram_vec_with_registry!(
235            "kinesis_lag_latency_ms",
236            "Lag latency in kinesis source",
237            &["source_id", "source_name", "fragment_id", "shard_id"],
238            registry
239        )
240        .unwrap();
241
242        SourceMetrics {
243            partition_input_count,
244            partition_input_bytes,
245            latest_message_id,
246            rdkafka_native_metric,
247            direct_cdc_event_lag_latency,
248            parquet_source_skip_row_count,
249            file_source_input_row_count,
250            file_source_dirty_split_count,
251            file_source_failed_split_count,
252
253            kinesis_throughput_exceeded_count,
254            kinesis_timeout_count,
255            kinesis_rebuild_shard_iter_count,
256            kinesis_early_terminate_shard_count,
257            kinesis_lag_latency_ms,
258        }
259    }
260}
261
262impl Default for SourceMetrics {
263    fn default() -> Self {
264        GLOBAL_SOURCE_METRICS.clone()
265    }
266}