risingwave_connector/source/monitor/
metrics.rs1use std::sync::{Arc, LazyLock};
16
17use prometheus::{Registry, exponential_buckets, histogram_opts};
18use risingwave_common::metrics::{
19 LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
20};
21use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
22use risingwave_common::{
23 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
24 register_guarded_int_gauge_vec_with_registry,
25};
26
27use crate::source::kafka::stats::RdKafkaStats;
28
29#[derive(Debug, Clone)]
30pub struct EnumeratorMetrics {
31 pub high_watermark: LabelGuardedIntGaugeVec,
32 pub pg_cdc_confirmed_flush_lsn: LabelGuardedIntGaugeVec,
34}
35
36pub static GLOBAL_ENUMERATOR_METRICS: LazyLock<EnumeratorMetrics> =
37 LazyLock::new(|| EnumeratorMetrics::new(&GLOBAL_METRICS_REGISTRY));
38
39impl EnumeratorMetrics {
40 fn new(registry: &Registry) -> Self {
41 let high_watermark = register_guarded_int_gauge_vec_with_registry!(
42 "source_kafka_high_watermark",
43 "High watermark for a exec per partition",
44 &["source_id", "partition"],
45 registry,
46 )
47 .unwrap();
48
49 let pg_cdc_confirmed_flush_lsn = register_guarded_int_gauge_vec_with_registry!(
50 "pg_cdc_confirmed_flush_lsn",
51 "PostgreSQL CDC confirmed flush LSN",
52 &["source_id", "slot_name"],
53 registry,
54 )
55 .unwrap();
56
57 EnumeratorMetrics {
58 high_watermark,
59 pg_cdc_confirmed_flush_lsn,
60 }
61 }
62
63 pub fn unused() -> Self {
64 Default::default()
65 }
66}
67
68impl Default for EnumeratorMetrics {
69 fn default() -> Self {
70 GLOBAL_ENUMERATOR_METRICS.clone()
71 }
72}
73
74#[derive(Debug, Clone)]
75pub struct SourceMetrics {
76 pub partition_input_count: LabelGuardedIntCounterVec,
77
78 pub partition_input_bytes: LabelGuardedIntCounterVec,
81 pub latest_message_id: LabelGuardedIntGaugeVec,
83 pub rdkafka_native_metric: Arc<RdKafkaStats>,
84
85 pub direct_cdc_event_lag_latency: LabelGuardedHistogramVec,
86
87 pub parquet_source_skip_row_count: LabelGuardedIntCounterVec,
88 pub file_source_input_row_count: LabelGuardedIntCounterVec,
89}
90
91pub static GLOBAL_SOURCE_METRICS: LazyLock<SourceMetrics> =
92 LazyLock::new(|| SourceMetrics::new(&GLOBAL_METRICS_REGISTRY));
93
94impl SourceMetrics {
95 fn new(registry: &Registry) -> Self {
96 let partition_input_count = register_guarded_int_counter_vec_with_registry!(
97 "source_partition_input_count",
98 "Total number of rows that have been input from specific partition",
99 &[
100 "actor_id",
101 "source_id",
102 "partition",
103 "source_name",
104 "fragment_id"
105 ],
106 registry
107 )
108 .unwrap();
109 let partition_input_bytes = register_guarded_int_counter_vec_with_registry!(
110 "source_partition_input_bytes",
111 "Total bytes that have been input from specific partition",
112 &[
113 "actor_id",
114 "source_id",
115 "partition",
116 "source_name",
117 "fragment_id"
118 ],
119 registry
120 )
121 .unwrap();
122 let latest_message_id = register_guarded_int_gauge_vec_with_registry!(
123 "source_latest_message_id",
124 "Latest message id for a exec per partition",
125 &["source_id", "actor_id", "partition"],
126 registry,
127 )
128 .unwrap();
129
130 let opts = histogram_opts!(
131 "source_cdc_event_lag_duration_milliseconds",
132 "source_cdc_lag_latency",
133 exponential_buckets(1.0, 2.0, 21).unwrap(), );
135
136 let parquet_source_skip_row_count = register_guarded_int_counter_vec_with_registry!(
137 "parquet_source_skip_row_count",
138 "Total number of rows that have been set to null in parquet source",
139 &["actor_id", "source_id", "source_name", "fragment_id"],
140 registry
141 )
142 .unwrap();
143
144 let direct_cdc_event_lag_latency =
145 register_guarded_histogram_vec_with_registry!(opts, &["table_name"], registry).unwrap();
146
147 let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone()));
148
149 let file_source_input_row_count = register_guarded_int_counter_vec_with_registry!(
150 "file_source_input_row_count",
151 "Total number of rows that have been read in file source",
152 &["source_id", "source_name", "actor_id", "fragment_id"],
153 registry
154 )
155 .unwrap();
156 SourceMetrics {
157 partition_input_count,
158 partition_input_bytes,
159 latest_message_id,
160 rdkafka_native_metric,
161 direct_cdc_event_lag_latency,
162 parquet_source_skip_row_count,
163 file_source_input_row_count,
164 }
165 }
166}
167
168impl Default for SourceMetrics {
169 fn default() -> Self {
170 GLOBAL_SOURCE_METRICS.clone()
171 }
172}