risingwave_connector/source/monitor/
metrics.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, LazyLock};
16
17use prometheus::{Registry, exponential_buckets, histogram_opts};
18use risingwave_common::metrics::{
19    LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
20};
21use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
22use risingwave_common::{
23    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
24    register_guarded_int_gauge_vec_with_registry,
25};
26
27use crate::source::kafka::stats::RdKafkaStats;
28
29#[derive(Debug, Clone)]
30pub struct EnumeratorMetrics {
31    pub high_watermark: LabelGuardedIntGaugeVec,
32    /// PostgreSQL CDC confirmed flush LSN monitoring
33    pub pg_cdc_confirmed_flush_lsn: LabelGuardedIntGaugeVec,
34}
35
36pub static GLOBAL_ENUMERATOR_METRICS: LazyLock<EnumeratorMetrics> =
37    LazyLock::new(|| EnumeratorMetrics::new(&GLOBAL_METRICS_REGISTRY));
38
39impl EnumeratorMetrics {
40    fn new(registry: &Registry) -> Self {
41        let high_watermark = register_guarded_int_gauge_vec_with_registry!(
42            "source_kafka_high_watermark",
43            "High watermark for a exec per partition",
44            &["source_id", "partition"],
45            registry,
46        )
47        .unwrap();
48
49        let pg_cdc_confirmed_flush_lsn = register_guarded_int_gauge_vec_with_registry!(
50            "pg_cdc_confirmed_flush_lsn",
51            "PostgreSQL CDC confirmed flush LSN",
52            &["source_id", "slot_name"],
53            registry,
54        )
55        .unwrap();
56
57        EnumeratorMetrics {
58            high_watermark,
59            pg_cdc_confirmed_flush_lsn,
60        }
61    }
62
63    pub fn unused() -> Self {
64        Default::default()
65    }
66}
67
68impl Default for EnumeratorMetrics {
69    fn default() -> Self {
70        GLOBAL_ENUMERATOR_METRICS.clone()
71    }
72}
73
74#[derive(Debug, Clone)]
75pub struct SourceMetrics {
76    pub partition_input_count: LabelGuardedIntCounterVec,
77
78    // **Note**: for normal messages, the metric is the message's payload size.
79    // For messages from load generator, the metric is the size of stream chunk.
80    pub partition_input_bytes: LabelGuardedIntCounterVec,
81    /// Report latest message id
82    pub latest_message_id: LabelGuardedIntGaugeVec,
83    pub rdkafka_native_metric: Arc<RdKafkaStats>,
84
85    pub direct_cdc_event_lag_latency: LabelGuardedHistogramVec,
86
87    pub parquet_source_skip_row_count: LabelGuardedIntCounterVec,
88    pub file_source_input_row_count: LabelGuardedIntCounterVec,
89
90    // kinesis source
91    pub kinesis_throughput_exceeded_count: LabelGuardedIntCounterVec,
92    pub kinesis_timeout_count: LabelGuardedIntCounterVec,
93    pub kinesis_rebuild_shard_iter_count: LabelGuardedIntCounterVec,
94    pub kinesis_early_terminate_shard_count: LabelGuardedIntCounterVec,
95    pub kinesis_lag_latency_ms: LabelGuardedHistogramVec,
96}
97
98pub static GLOBAL_SOURCE_METRICS: LazyLock<SourceMetrics> =
99    LazyLock::new(|| SourceMetrics::new(&GLOBAL_METRICS_REGISTRY));
100
101impl SourceMetrics {
102    fn new(registry: &Registry) -> Self {
103        let partition_input_count = register_guarded_int_counter_vec_with_registry!(
104            "source_partition_input_count",
105            "Total number of rows that have been input from specific partition",
106            &[
107                "actor_id",
108                "source_id",
109                "partition",
110                "source_name",
111                "fragment_id"
112            ],
113            registry
114        )
115        .unwrap();
116        let partition_input_bytes = register_guarded_int_counter_vec_with_registry!(
117            "source_partition_input_bytes",
118            "Total bytes that have been input from specific partition",
119            &[
120                "actor_id",
121                "source_id",
122                "partition",
123                "source_name",
124                "fragment_id"
125            ],
126            registry
127        )
128        .unwrap();
129        let latest_message_id = register_guarded_int_gauge_vec_with_registry!(
130            "source_latest_message_id",
131            "Latest message id for a exec per partition",
132            &["source_id", "actor_id", "partition"],
133            registry,
134        )
135        .unwrap();
136
137        let opts = histogram_opts!(
138            "source_cdc_event_lag_duration_milliseconds",
139            "source_cdc_lag_latency",
140            exponential_buckets(1.0, 2.0, 21).unwrap(), // max 1048s
141        );
142
143        let parquet_source_skip_row_count = register_guarded_int_counter_vec_with_registry!(
144            "parquet_source_skip_row_count",
145            "Total number of rows that have been set to null in parquet source",
146            &["actor_id", "source_id", "source_name", "fragment_id"],
147            registry
148        )
149        .unwrap();
150
151        let direct_cdc_event_lag_latency =
152            register_guarded_histogram_vec_with_registry!(opts, &["table_name"], registry).unwrap();
153
154        let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone()));
155
156        let file_source_input_row_count = register_guarded_int_counter_vec_with_registry!(
157            "file_source_input_row_count",
158            "Total number of rows that have been read in file source",
159            &["source_id", "source_name", "actor_id", "fragment_id"],
160            registry
161        )
162        .unwrap();
163
164        let kinesis_throughput_exceeded_count = register_guarded_int_counter_vec_with_registry!(
165            "kinesis_throughput_exceeded_count",
166            "Total number of times throughput exceeded in kinesis source",
167            &["source_id", "source_name", "fragment_id", "shard_id"],
168            registry
169        )
170        .unwrap();
171
172        let kinesis_timeout_count = register_guarded_int_counter_vec_with_registry!(
173            "kinesis_timeout_count",
174            "Total number of times timeout in kinesis source",
175            &["source_id", "source_name", "fragment_id", "shard_id"],
176            registry
177        )
178        .unwrap();
179
180        let kinesis_rebuild_shard_iter_count = register_guarded_int_counter_vec_with_registry!(
181            "kinesis_rebuild_shard_iter_count",
182            "Total number of times rebuild shard iter in kinesis source",
183            &["source_id", "source_name", "fragment_id", "shard_id"],
184            registry
185        )
186        .unwrap();
187
188        let kinesis_early_terminate_shard_count = register_guarded_int_counter_vec_with_registry!(
189            "kinesis_early_terminate_shard_count",
190            "Total number of times early terminate shard in kinesis source",
191            &["source_id", "source_name", "fragment_id", "shard_id"],
192            registry
193        )
194        .unwrap();
195
196        let kinesis_lag_latency_ms = register_guarded_histogram_vec_with_registry!(
197            "kinesis_lag_latency_ms",
198            "Lag latency in kinesis source",
199            &["source_id", "source_name", "fragment_id", "shard_id"],
200            registry
201        )
202        .unwrap();
203
204        SourceMetrics {
205            partition_input_count,
206            partition_input_bytes,
207            latest_message_id,
208            rdkafka_native_metric,
209            direct_cdc_event_lag_latency,
210            parquet_source_skip_row_count,
211            file_source_input_row_count,
212
213            kinesis_throughput_exceeded_count,
214            kinesis_timeout_count,
215            kinesis_rebuild_shard_iter_count,
216            kinesis_early_terminate_shard_count,
217            kinesis_lag_latency_ms,
218        }
219    }
220}
221
222impl Default for SourceMetrics {
223    fn default() -> Self {
224        GLOBAL_SOURCE_METRICS.clone()
225    }
226}