risingwave_connector/source/monitor/
metrics.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, LazyLock};
16
17use prometheus::{Registry, exponential_buckets, histogram_opts};
18use risingwave_common::metrics::{
19    LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
20};
21use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
22use risingwave_common::{
23    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
24    register_guarded_int_gauge_vec_with_registry,
25};
26
27use crate::source::kafka::stats::RdKafkaStats;
28
29#[derive(Debug, Clone)]
30pub struct EnumeratorMetrics {
31    pub high_watermark: LabelGuardedIntGaugeVec,
32    /// PostgreSQL CDC confirmed flush LSN monitoring
33    pub pg_cdc_confirmed_flush_lsn: LabelGuardedIntGaugeVec,
34    /// PostgreSQL CDC upstream max LSN monitoring
35    pub pg_cdc_upstream_max_lsn: LabelGuardedIntGaugeVec,
36    /// MySQL CDC binlog file sequence number (min)
37    pub mysql_cdc_binlog_file_seq_min: LabelGuardedIntGaugeVec,
38    /// MySQL CDC binlog file sequence number (max)
39    pub mysql_cdc_binlog_file_seq_max: LabelGuardedIntGaugeVec,
40}
41
42pub static GLOBAL_ENUMERATOR_METRICS: LazyLock<EnumeratorMetrics> =
43    LazyLock::new(|| EnumeratorMetrics::new(&GLOBAL_METRICS_REGISTRY));
44
45impl EnumeratorMetrics {
46    fn new(registry: &Registry) -> Self {
47        let high_watermark = register_guarded_int_gauge_vec_with_registry!(
48            "source_kafka_high_watermark",
49            "High watermark for a exec per partition",
50            &["source_id", "partition"],
51            registry,
52        )
53        .unwrap();
54
55        let pg_cdc_confirmed_flush_lsn = register_guarded_int_gauge_vec_with_registry!(
56            "pg_cdc_confirmed_flush_lsn",
57            "PostgreSQL CDC confirmed flush LSN",
58            &["source_id", "slot_name"],
59            registry,
60        )
61        .unwrap();
62
63        let pg_cdc_upstream_max_lsn = register_guarded_int_gauge_vec_with_registry!(
64            "pg_cdc_upstream_max_lsn",
65            "PostgreSQL CDC upstream max LSN (pg_current_wal_lsn)",
66            &["source_id", "slot_name"],
67            registry,
68        )
69        .unwrap();
70
71        let mysql_cdc_binlog_file_seq_min = register_guarded_int_gauge_vec_with_registry!(
72            "mysql_cdc_binlog_file_seq_min",
73            "MySQL CDC upstream binlog file sequence number (minimum/oldest)",
74            &["hostname", "port"],
75            registry,
76        )
77        .unwrap();
78
79        let mysql_cdc_binlog_file_seq_max = register_guarded_int_gauge_vec_with_registry!(
80            "mysql_cdc_binlog_file_seq_max",
81            "MySQL CDC upstream binlog file sequence number (maximum/newest)",
82            &["hostname", "port"],
83            registry,
84        )
85        .unwrap();
86
87        EnumeratorMetrics {
88            high_watermark,
89            pg_cdc_confirmed_flush_lsn,
90            pg_cdc_upstream_max_lsn,
91            mysql_cdc_binlog_file_seq_min,
92            mysql_cdc_binlog_file_seq_max,
93        }
94    }
95
96    pub fn unused() -> Self {
97        Default::default()
98    }
99}
100
101impl Default for EnumeratorMetrics {
102    fn default() -> Self {
103        GLOBAL_ENUMERATOR_METRICS.clone()
104    }
105}
106
107#[derive(Debug, Clone)]
108pub struct SourceMetrics {
109    pub partition_input_count: LabelGuardedIntCounterVec,
110
111    // **Note**: for normal messages, the metric is the message's payload size.
112    // For messages from load generator, the metric is the size of stream chunk.
113    pub partition_input_bytes: LabelGuardedIntCounterVec,
114    /// Report latest message id
115    pub latest_message_id: LabelGuardedIntGaugeVec,
116    pub rdkafka_native_metric: Arc<RdKafkaStats>,
117
118    pub direct_cdc_event_lag_latency: LabelGuardedHistogramVec,
119
120    pub parquet_source_skip_row_count: LabelGuardedIntCounterVec,
121    pub file_source_input_row_count: LabelGuardedIntCounterVec,
122    pub file_source_dirty_split_count: LabelGuardedIntGaugeVec,
123    pub file_source_failed_split_count: LabelGuardedIntCounterVec,
124
125    // kinesis source
126    pub kinesis_throughput_exceeded_count: LabelGuardedIntCounterVec,
127    pub kinesis_timeout_count: LabelGuardedIntCounterVec,
128    pub kinesis_rebuild_shard_iter_count: LabelGuardedIntCounterVec,
129    pub kinesis_early_terminate_shard_count: LabelGuardedIntCounterVec,
130    pub kinesis_lag_latency_ms: LabelGuardedHistogramVec,
131}
132
133pub static GLOBAL_SOURCE_METRICS: LazyLock<SourceMetrics> =
134    LazyLock::new(|| SourceMetrics::new(&GLOBAL_METRICS_REGISTRY));
135
136impl SourceMetrics {
137    fn new(registry: &Registry) -> Self {
138        let partition_input_count = register_guarded_int_counter_vec_with_registry!(
139            "source_partition_input_count",
140            "Total number of rows that have been input from specific partition",
141            &[
142                "actor_id",
143                "source_id",
144                "partition",
145                "source_name",
146                "fragment_id"
147            ],
148            registry
149        )
150        .unwrap();
151        let partition_input_bytes = register_guarded_int_counter_vec_with_registry!(
152            "source_partition_input_bytes",
153            "Total bytes that have been input from specific partition",
154            &[
155                "actor_id",
156                "source_id",
157                "partition",
158                "source_name",
159                "fragment_id"
160            ],
161            registry
162        )
163        .unwrap();
164        let latest_message_id = register_guarded_int_gauge_vec_with_registry!(
165            "source_latest_message_id",
166            "Latest message id for a exec per partition",
167            &["source_id", "actor_id", "partition"],
168            registry,
169        )
170        .unwrap();
171
172        let opts = histogram_opts!(
173            "source_cdc_event_lag_duration_milliseconds",
174            "source_cdc_lag_latency",
175            exponential_buckets(1.0, 2.0, 21).unwrap(), // max 1048s
176        );
177
178        let parquet_source_skip_row_count = register_guarded_int_counter_vec_with_registry!(
179            "parquet_source_skip_row_count",
180            "Total number of rows that have been set to null in parquet source",
181            &["actor_id", "source_id", "source_name", "fragment_id"],
182            registry
183        )
184        .unwrap();
185
186        let direct_cdc_event_lag_latency =
187            register_guarded_histogram_vec_with_registry!(opts, &["table_name"], registry).unwrap();
188
189        let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone()));
190
191        let file_source_input_row_count = register_guarded_int_counter_vec_with_registry!(
192            "file_source_input_row_count",
193            "Total number of rows that have been read in file source",
194            &["source_id", "source_name", "actor_id", "fragment_id"],
195            registry
196        )
197        .unwrap();
198        let file_source_dirty_split_count = register_guarded_int_gauge_vec_with_registry!(
199            "file_source_dirty_split_count",
200            "Current number of dirty file splits in file source",
201            &["source_id", "source_name", "actor_id", "fragment_id"],
202            registry
203        )
204        .unwrap();
205        let file_source_failed_split_count = register_guarded_int_counter_vec_with_registry!(
206            "file_source_failed_split_count",
207            "Total number of file splits marked dirty in file source",
208            &["source_id", "source_name", "actor_id", "fragment_id"],
209            registry
210        )
211        .unwrap();
212
213        let kinesis_throughput_exceeded_count = register_guarded_int_counter_vec_with_registry!(
214            "kinesis_throughput_exceeded_count",
215            "Total number of times throughput exceeded in kinesis source",
216            &["source_id", "source_name", "fragment_id", "shard_id"],
217            registry
218        )
219        .unwrap();
220
221        let kinesis_timeout_count = register_guarded_int_counter_vec_with_registry!(
222            "kinesis_timeout_count",
223            "Total number of times timeout in kinesis source",
224            &["source_id", "source_name", "fragment_id", "shard_id"],
225            registry
226        )
227        .unwrap();
228
229        let kinesis_rebuild_shard_iter_count = register_guarded_int_counter_vec_with_registry!(
230            "kinesis_rebuild_shard_iter_count",
231            "Total number of times rebuild shard iter in kinesis source",
232            &["source_id", "source_name", "fragment_id", "shard_id"],
233            registry
234        )
235        .unwrap();
236
237        let kinesis_early_terminate_shard_count = register_guarded_int_counter_vec_with_registry!(
238            "kinesis_early_terminate_shard_count",
239            "Total number of times early terminate shard in kinesis source",
240            &["source_id", "source_name", "fragment_id", "shard_id"],
241            registry
242        )
243        .unwrap();
244
245        let kinesis_lag_latency_ms = register_guarded_histogram_vec_with_registry!(
246            "kinesis_lag_latency_ms",
247            "Lag latency in kinesis source",
248            &["source_id", "source_name", "fragment_id", "shard_id"],
249            registry
250        )
251        .unwrap();
252
253        SourceMetrics {
254            partition_input_count,
255            partition_input_bytes,
256            latest_message_id,
257            rdkafka_native_metric,
258            direct_cdc_event_lag_latency,
259            parquet_source_skip_row_count,
260            file_source_input_row_count,
261            file_source_dirty_split_count,
262            file_source_failed_split_count,
263
264            kinesis_throughput_exceeded_count,
265            kinesis_timeout_count,
266            kinesis_rebuild_shard_iter_count,
267            kinesis_early_terminate_shard_count,
268            kinesis_lag_latency_ms,
269        }
270    }
271}
272
273impl Default for SourceMetrics {
274    fn default() -> Self {
275        GLOBAL_SOURCE_METRICS.clone()
276    }
277}