risingwave_connector/source/monitor/
metrics.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, LazyLock};
16
17use prometheus::{Registry, exponential_buckets, histogram_opts};
18use risingwave_common::metrics::{
19    LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
20};
21use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
22use risingwave_common::{
23    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
24    register_guarded_int_gauge_vec_with_registry,
25};
26
27use crate::source::kafka::stats::RdKafkaStats;
28
29#[derive(Debug, Clone)]
30pub struct EnumeratorMetrics {
31    pub high_watermark: LabelGuardedIntGaugeVec,
32    /// PostgreSQL CDC confirmed flush LSN monitoring
33    pub pg_cdc_confirmed_flush_lsn: LabelGuardedIntGaugeVec,
34}
35
36pub static GLOBAL_ENUMERATOR_METRICS: LazyLock<EnumeratorMetrics> =
37    LazyLock::new(|| EnumeratorMetrics::new(&GLOBAL_METRICS_REGISTRY));
38
39impl EnumeratorMetrics {
40    fn new(registry: &Registry) -> Self {
41        let high_watermark = register_guarded_int_gauge_vec_with_registry!(
42            "source_kafka_high_watermark",
43            "High watermark for a exec per partition",
44            &["source_id", "partition"],
45            registry,
46        )
47        .unwrap();
48
49        let pg_cdc_confirmed_flush_lsn = register_guarded_int_gauge_vec_with_registry!(
50            "pg_cdc_confirmed_flush_lsn",
51            "PostgreSQL CDC confirmed flush LSN",
52            &["source_id", "slot_name"],
53            registry,
54        )
55        .unwrap();
56
57        EnumeratorMetrics {
58            high_watermark,
59            pg_cdc_confirmed_flush_lsn,
60        }
61    }
62
63    pub fn unused() -> Self {
64        Default::default()
65    }
66}
67
68impl Default for EnumeratorMetrics {
69    fn default() -> Self {
70        GLOBAL_ENUMERATOR_METRICS.clone()
71    }
72}
73
74#[derive(Debug, Clone)]
75pub struct SourceMetrics {
76    pub partition_input_count: LabelGuardedIntCounterVec,
77
78    // **Note**: for normal messages, the metric is the message's payload size.
79    // For messages from load generator, the metric is the size of stream chunk.
80    pub partition_input_bytes: LabelGuardedIntCounterVec,
81    /// Report latest message id
82    pub latest_message_id: LabelGuardedIntGaugeVec,
83    pub rdkafka_native_metric: Arc<RdKafkaStats>,
84
85    pub direct_cdc_event_lag_latency: LabelGuardedHistogramVec,
86
87    pub parquet_source_skip_row_count: LabelGuardedIntCounterVec,
88    pub file_source_input_row_count: LabelGuardedIntCounterVec,
89}
90
91pub static GLOBAL_SOURCE_METRICS: LazyLock<SourceMetrics> =
92    LazyLock::new(|| SourceMetrics::new(&GLOBAL_METRICS_REGISTRY));
93
94impl SourceMetrics {
95    fn new(registry: &Registry) -> Self {
96        let partition_input_count = register_guarded_int_counter_vec_with_registry!(
97            "source_partition_input_count",
98            "Total number of rows that have been input from specific partition",
99            &[
100                "actor_id",
101                "source_id",
102                "partition",
103                "source_name",
104                "fragment_id"
105            ],
106            registry
107        )
108        .unwrap();
109        let partition_input_bytes = register_guarded_int_counter_vec_with_registry!(
110            "source_partition_input_bytes",
111            "Total bytes that have been input from specific partition",
112            &[
113                "actor_id",
114                "source_id",
115                "partition",
116                "source_name",
117                "fragment_id"
118            ],
119            registry
120        )
121        .unwrap();
122        let latest_message_id = register_guarded_int_gauge_vec_with_registry!(
123            "source_latest_message_id",
124            "Latest message id for a exec per partition",
125            &["source_id", "actor_id", "partition"],
126            registry,
127        )
128        .unwrap();
129
130        let opts = histogram_opts!(
131            "source_cdc_event_lag_duration_milliseconds",
132            "source_cdc_lag_latency",
133            exponential_buckets(1.0, 2.0, 21).unwrap(), // max 1048s
134        );
135
136        let parquet_source_skip_row_count = register_guarded_int_counter_vec_with_registry!(
137            "parquet_source_skip_row_count",
138            "Total number of rows that have been set to null in parquet source",
139            &["actor_id", "source_id", "source_name", "fragment_id"],
140            registry
141        )
142        .unwrap();
143
144        let direct_cdc_event_lag_latency =
145            register_guarded_histogram_vec_with_registry!(opts, &["table_name"], registry).unwrap();
146
147        let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone()));
148
149        let file_source_input_row_count = register_guarded_int_counter_vec_with_registry!(
150            "file_source_input_row_count",
151            "Total number of rows that have been read in file source",
152            &["source_id", "source_name", "actor_id", "fragment_id"],
153            registry
154        )
155        .unwrap();
156        SourceMetrics {
157            partition_input_count,
158            partition_input_bytes,
159            latest_message_id,
160            rdkafka_native_metric,
161            direct_cdc_event_lag_latency,
162            parquet_source_skip_row_count,
163            file_source_input_row_count,
164        }
165    }
166}
167
168impl Default for SourceMetrics {
169    fn default() -> Self {
170        GLOBAL_SOURCE_METRICS.clone()
171    }
172}