risingwave_connector/source/monitor/
metrics.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, LazyLock};
16
17use prometheus::{Registry, exponential_buckets, histogram_opts};
18use risingwave_common::metrics::{
19    LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
20};
21use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
22use risingwave_common::{
23    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
24    register_guarded_int_gauge_vec_with_registry,
25};
26
27use crate::source::kafka::stats::RdKafkaStats;
28
29#[derive(Debug, Clone)]
30pub struct EnumeratorMetrics {
31    pub high_watermark: LabelGuardedIntGaugeVec,
32    /// PostgreSQL CDC confirmed flush LSN monitoring
33    pub pg_cdc_confirmed_flush_lsn: LabelGuardedIntGaugeVec,
34    /// PostgreSQL CDC upstream max LSN monitoring
35    pub pg_cdc_upstream_max_lsn: LabelGuardedIntGaugeVec,
36    /// MySQL CDC binlog file sequence number (min)
37    pub mysql_cdc_binlog_file_seq_min: LabelGuardedIntGaugeVec,
38    /// MySQL CDC binlog file sequence number (max)
39    pub mysql_cdc_binlog_file_seq_max: LabelGuardedIntGaugeVec,
40    /// SQL Server CDC upstream minimum LSN
41    pub sqlserver_cdc_upstream_min_lsn: LabelGuardedIntGaugeVec,
42    /// SQL Server CDC upstream maximum LSN
43    pub sqlserver_cdc_upstream_max_lsn: LabelGuardedIntGaugeVec,
44}
45
46pub static GLOBAL_ENUMERATOR_METRICS: LazyLock<EnumeratorMetrics> =
47    LazyLock::new(|| EnumeratorMetrics::new(&GLOBAL_METRICS_REGISTRY));
48
49impl EnumeratorMetrics {
50    fn new(registry: &Registry) -> Self {
51        let high_watermark = register_guarded_int_gauge_vec_with_registry!(
52            "source_kafka_high_watermark",
53            "High watermark for a exec per partition",
54            &["source_id", "partition"],
55            registry,
56        )
57        .unwrap();
58
59        let pg_cdc_confirmed_flush_lsn = register_guarded_int_gauge_vec_with_registry!(
60            "pg_cdc_confirmed_flush_lsn",
61            "PostgreSQL CDC confirmed flush LSN",
62            &["source_id", "slot_name"],
63            registry,
64        )
65        .unwrap();
66
67        let pg_cdc_upstream_max_lsn = register_guarded_int_gauge_vec_with_registry!(
68            "pg_cdc_upstream_max_lsn",
69            "PostgreSQL CDC upstream max LSN (pg_current_wal_lsn)",
70            &["source_id", "slot_name"],
71            registry,
72        )
73        .unwrap();
74
75        let mysql_cdc_binlog_file_seq_min = register_guarded_int_gauge_vec_with_registry!(
76            "mysql_cdc_binlog_file_seq_min",
77            "MySQL CDC upstream binlog file sequence number (minimum/oldest)",
78            &["hostname", "port"],
79            registry,
80        )
81        .unwrap();
82
83        let mysql_cdc_binlog_file_seq_max = register_guarded_int_gauge_vec_with_registry!(
84            "mysql_cdc_binlog_file_seq_max",
85            "MySQL CDC upstream binlog file sequence number (maximum/newest)",
86            &["hostname", "port"],
87            registry,
88        )
89        .unwrap();
90
91        let sqlserver_cdc_upstream_min_lsn = register_guarded_int_gauge_vec_with_registry!(
92            "sqlserver_cdc_upstream_min_lsn",
93            "SQL Server CDC upstream minimum LSN",
94            &["source_id"],
95            registry,
96        )
97        .unwrap();
98
99        let sqlserver_cdc_upstream_max_lsn = register_guarded_int_gauge_vec_with_registry!(
100            "sqlserver_cdc_upstream_max_lsn",
101            "SQL Server CDC upstream maximum LSN",
102            &["source_id"],
103            registry,
104        )
105        .unwrap();
106
107        EnumeratorMetrics {
108            high_watermark,
109            pg_cdc_confirmed_flush_lsn,
110            pg_cdc_upstream_max_lsn,
111            mysql_cdc_binlog_file_seq_min,
112            mysql_cdc_binlog_file_seq_max,
113            sqlserver_cdc_upstream_min_lsn,
114            sqlserver_cdc_upstream_max_lsn,
115        }
116    }
117
118    pub fn unused() -> Self {
119        Default::default()
120    }
121}
122
123impl Default for EnumeratorMetrics {
124    fn default() -> Self {
125        GLOBAL_ENUMERATOR_METRICS.clone()
126    }
127}
128
129#[derive(Debug, Clone)]
130pub struct SourceMetrics {
131    pub partition_input_count: LabelGuardedIntCounterVec,
132
133    // **Note**: for normal messages, the metric is the message's payload size.
134    // For messages from load generator, the metric is the size of stream chunk.
135    pub partition_input_bytes: LabelGuardedIntCounterVec,
136    /// Report latest message id
137    pub latest_message_id: LabelGuardedIntGaugeVec,
138    pub rdkafka_native_metric: Arc<RdKafkaStats>,
139
140    pub direct_cdc_event_lag_latency: LabelGuardedHistogramVec,
141
142    pub parquet_source_skip_row_count: LabelGuardedIntCounterVec,
143    pub file_source_input_row_count: LabelGuardedIntCounterVec,
144    pub file_source_dirty_split_count: LabelGuardedIntGaugeVec,
145    pub file_source_failed_split_count: LabelGuardedIntCounterVec,
146
147    // kinesis source
148    pub kinesis_throughput_exceeded_count: LabelGuardedIntCounterVec,
149    pub kinesis_timeout_count: LabelGuardedIntCounterVec,
150    pub kinesis_rebuild_shard_iter_count: LabelGuardedIntCounterVec,
151    pub kinesis_early_terminate_shard_count: LabelGuardedIntCounterVec,
152    pub kinesis_lag_latency_ms: LabelGuardedHistogramVec,
153}
154
155pub static GLOBAL_SOURCE_METRICS: LazyLock<SourceMetrics> =
156    LazyLock::new(|| SourceMetrics::new(&GLOBAL_METRICS_REGISTRY));
157
158impl SourceMetrics {
159    fn new(registry: &Registry) -> Self {
160        let partition_input_count = register_guarded_int_counter_vec_with_registry!(
161            "source_partition_input_count",
162            "Total number of rows that have been input from specific partition",
163            &[
164                "actor_id",
165                "source_id",
166                "partition",
167                "source_name",
168                "fragment_id"
169            ],
170            registry
171        )
172        .unwrap();
173        let partition_input_bytes = register_guarded_int_counter_vec_with_registry!(
174            "source_partition_input_bytes",
175            "Total bytes that have been input from specific partition",
176            &[
177                "actor_id",
178                "source_id",
179                "partition",
180                "source_name",
181                "fragment_id"
182            ],
183            registry
184        )
185        .unwrap();
186        let latest_message_id = register_guarded_int_gauge_vec_with_registry!(
187            "source_latest_message_id",
188            "Latest message id for a exec per partition",
189            &["source_id", "actor_id", "partition"],
190            registry,
191        )
192        .unwrap();
193
194        let opts = histogram_opts!(
195            "source_cdc_event_lag_duration_milliseconds",
196            "source_cdc_lag_latency",
197            exponential_buckets(1.0, 2.0, 21).unwrap(), // max 1048s
198        );
199
200        let parquet_source_skip_row_count = register_guarded_int_counter_vec_with_registry!(
201            "parquet_source_skip_row_count",
202            "Total number of rows that have been set to null in parquet source",
203            &["actor_id", "source_id", "source_name", "fragment_id"],
204            registry
205        )
206        .unwrap();
207
208        let direct_cdc_event_lag_latency =
209            register_guarded_histogram_vec_with_registry!(opts, &["table_name"], registry).unwrap();
210
211        let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone()));
212
213        let file_source_input_row_count = register_guarded_int_counter_vec_with_registry!(
214            "file_source_input_row_count",
215            "Total number of rows that have been read in file source",
216            &["source_id", "source_name", "actor_id", "fragment_id"],
217            registry
218        )
219        .unwrap();
220        let file_source_dirty_split_count = register_guarded_int_gauge_vec_with_registry!(
221            "file_source_dirty_split_count",
222            "Current number of dirty file splits in file source",
223            &["source_id", "source_name", "actor_id", "fragment_id"],
224            registry
225        )
226        .unwrap();
227        let file_source_failed_split_count = register_guarded_int_counter_vec_with_registry!(
228            "file_source_failed_split_count",
229            "Total number of file splits marked dirty in file source",
230            &["source_id", "source_name", "actor_id", "fragment_id"],
231            registry
232        )
233        .unwrap();
234
235        let kinesis_throughput_exceeded_count = register_guarded_int_counter_vec_with_registry!(
236            "kinesis_throughput_exceeded_count",
237            "Total number of times throughput exceeded in kinesis source",
238            &["source_id", "source_name", "fragment_id", "shard_id"],
239            registry
240        )
241        .unwrap();
242
243        let kinesis_timeout_count = register_guarded_int_counter_vec_with_registry!(
244            "kinesis_timeout_count",
245            "Total number of times timeout in kinesis source",
246            &["source_id", "source_name", "fragment_id", "shard_id"],
247            registry
248        )
249        .unwrap();
250
251        let kinesis_rebuild_shard_iter_count = register_guarded_int_counter_vec_with_registry!(
252            "kinesis_rebuild_shard_iter_count",
253            "Total number of times rebuild shard iter in kinesis source",
254            &["source_id", "source_name", "fragment_id", "shard_id"],
255            registry
256        )
257        .unwrap();
258
259        let kinesis_early_terminate_shard_count = register_guarded_int_counter_vec_with_registry!(
260            "kinesis_early_terminate_shard_count",
261            "Total number of times early terminate shard in kinesis source",
262            &["source_id", "source_name", "fragment_id", "shard_id"],
263            registry
264        )
265        .unwrap();
266
267        let kinesis_lag_latency_ms = register_guarded_histogram_vec_with_registry!(
268            "kinesis_lag_latency_ms",
269            "Lag latency in kinesis source",
270            &["source_id", "source_name", "fragment_id", "shard_id"],
271            registry
272        )
273        .unwrap();
274
275        SourceMetrics {
276            partition_input_count,
277            partition_input_bytes,
278            latest_message_id,
279            rdkafka_native_metric,
280            direct_cdc_event_lag_latency,
281            parquet_source_skip_row_count,
282            file_source_input_row_count,
283            file_source_dirty_split_count,
284            file_source_failed_split_count,
285
286            kinesis_throughput_exceeded_count,
287            kinesis_timeout_count,
288            kinesis_rebuild_shard_iter_count,
289            kinesis_early_terminate_shard_count,
290            kinesis_lag_latency_ms,
291        }
292    }
293}
294
295impl Default for SourceMetrics {
296    fn default() -> Self {
297        GLOBAL_SOURCE_METRICS.clone()
298    }
299}