risingwave_connector/source/monitor/
metrics.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, LazyLock};
16
17use prometheus::{Registry, exponential_buckets, histogram_opts};
18use risingwave_common::metrics::{
19    LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
20};
21use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
22use risingwave_common::{
23    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
24    register_guarded_int_gauge_vec_with_registry,
25};
26
27use crate::source::kafka::stats::RdKafkaStats;
28
29#[derive(Debug, Clone)]
30pub struct EnumeratorMetrics {
31    pub high_watermark: LabelGuardedIntGaugeVec<2>,
32}
33
34pub static GLOBAL_ENUMERATOR_METRICS: LazyLock<EnumeratorMetrics> =
35    LazyLock::new(|| EnumeratorMetrics::new(&GLOBAL_METRICS_REGISTRY));
36
37impl EnumeratorMetrics {
38    fn new(registry: &Registry) -> Self {
39        let high_watermark = register_guarded_int_gauge_vec_with_registry!(
40            "source_kafka_high_watermark",
41            "High watermark for a exec per partition",
42            &["source_id", "partition"],
43            registry,
44        )
45        .unwrap();
46        EnumeratorMetrics { high_watermark }
47    }
48
49    pub fn unused() -> Self {
50        Default::default()
51    }
52}
53
54impl Default for EnumeratorMetrics {
55    fn default() -> Self {
56        GLOBAL_ENUMERATOR_METRICS.clone()
57    }
58}
59
60#[derive(Debug, Clone)]
61pub struct SourceMetrics {
62    pub partition_input_count: LabelGuardedIntCounterVec<5>,
63
64    // **Note**: for normal messages, the metric is the message's payload size.
65    // For messages from load generator, the metric is the size of stream chunk.
66    pub partition_input_bytes: LabelGuardedIntCounterVec<5>,
67    /// Report latest message id
68    pub latest_message_id: LabelGuardedIntGaugeVec<3>,
69    pub rdkafka_native_metric: Arc<RdKafkaStats>,
70
71    pub direct_cdc_event_lag_latency: LabelGuardedHistogramVec<1>,
72
73    pub parquet_source_skip_row_count: LabelGuardedIntCounterVec<4>,
74    pub file_source_input_row_count: LabelGuardedIntCounterVec<4>,
75}
76
77pub static GLOBAL_SOURCE_METRICS: LazyLock<SourceMetrics> =
78    LazyLock::new(|| SourceMetrics::new(&GLOBAL_METRICS_REGISTRY));
79
80impl SourceMetrics {
81    fn new(registry: &Registry) -> Self {
82        let partition_input_count = register_guarded_int_counter_vec_with_registry!(
83            "source_partition_input_count",
84            "Total number of rows that have been input from specific partition",
85            &[
86                "actor_id",
87                "source_id",
88                "partition",
89                "source_name",
90                "fragment_id"
91            ],
92            registry
93        )
94        .unwrap();
95        let partition_input_bytes = register_guarded_int_counter_vec_with_registry!(
96            "source_partition_input_bytes",
97            "Total bytes that have been input from specific partition",
98            &[
99                "actor_id",
100                "source_id",
101                "partition",
102                "source_name",
103                "fragment_id"
104            ],
105            registry
106        )
107        .unwrap();
108        let latest_message_id = register_guarded_int_gauge_vec_with_registry!(
109            "source_latest_message_id",
110            "Latest message id for a exec per partition",
111            &["source_id", "actor_id", "partition"],
112            registry,
113        )
114        .unwrap();
115
116        let opts = histogram_opts!(
117            "source_cdc_event_lag_duration_milliseconds",
118            "source_cdc_lag_latency",
119            exponential_buckets(1.0, 2.0, 21).unwrap(), // max 1048s
120        );
121
122        let parquet_source_skip_row_count = register_guarded_int_counter_vec_with_registry!(
123            "parquet_source_skip_row_count",
124            "Total number of rows that have been set to null in parquet source",
125            &["actor_id", "source_id", "source_name", "fragment_id"],
126            registry
127        )
128        .unwrap();
129
130        let direct_cdc_event_lag_latency =
131            register_guarded_histogram_vec_with_registry!(opts, &["table_name"], registry).unwrap();
132
133        let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone()));
134
135        let file_source_input_row_count = register_guarded_int_counter_vec_with_registry!(
136            "file_source_input_row_count",
137            "Total number of rows that have been read in file source",
138            &["source_id", "source_name", "actor_id", "fragment_id"],
139            registry
140        )
141        .unwrap();
142        SourceMetrics {
143            partition_input_count,
144            partition_input_bytes,
145            latest_message_id,
146            rdkafka_native_metric,
147            direct_cdc_event_lag_latency,
148            parquet_source_skip_row_count,
149            file_source_input_row_count,
150        }
151    }
152}
153
154impl Default for SourceMetrics {
155    fn default() -> Self {
156        GLOBAL_SOURCE_METRICS.clone()
157    }
158}