risingwave_connector/source/iceberg/
metrics.rs1use std::sync::{Arc, LazyLock};
16
17use prometheus::{Registry, exponential_buckets, histogram_opts};
18use risingwave_common::metrics::{
19 LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
20};
21use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
22use risingwave_common::{
23 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
24 register_guarded_int_gauge_vec_with_registry,
25};
26
27#[derive(Clone)]
28pub struct IcebergScanMetrics {
29 pub iceberg_read_bytes: LabelGuardedIntCounterVec,
31
32 pub iceberg_source_snapshot_lag_seconds: LabelGuardedIntGaugeVec,
36
37 pub iceberg_source_snapshots_discovered_total: LabelGuardedIntCounterVec,
39
40 pub iceberg_source_list_duration_seconds: LabelGuardedHistogramVec,
42
43 pub iceberg_source_files_discovered_total: LabelGuardedIntCounterVec,
45
46 pub iceberg_source_file_read_duration_seconds: LabelGuardedHistogramVec,
49
50 pub iceberg_source_rows_read_total: LabelGuardedIntCounterVec,
52
53 pub iceberg_source_files_read_total: LabelGuardedIntCounterVec,
55
56 pub iceberg_source_delete_rows_applied_total: LabelGuardedIntCounterVec,
59
60 pub iceberg_source_delete_files_per_data_file: LabelGuardedHistogramVec,
62
63 pub iceberg_source_inflight_file_count: LabelGuardedIntGaugeVec,
66
67 pub iceberg_source_scan_errors_total: LabelGuardedIntCounterVec,
69}
70
71impl IcebergScanMetrics {
72 fn new(registry: &Registry) -> Self {
73 let iceberg_read_bytes = register_guarded_int_counter_vec_with_registry!(
74 "iceberg_read_bytes",
75 "Total size of iceberg read requests",
76 &["table_name"],
77 registry
78 )
79 .unwrap();
80
81 let iceberg_source_snapshot_lag_seconds = register_guarded_int_gauge_vec_with_registry!(
82 "iceberg_source_snapshot_lag_seconds",
83 "Lag between latest available snapshot and last ingested snapshot in seconds",
84 &["source_id", "source_name", "table_name"],
85 registry
86 )
87 .unwrap();
88
89 let iceberg_source_snapshots_discovered_total =
90 register_guarded_int_counter_vec_with_registry!(
91 "iceberg_source_snapshots_discovered_total",
92 "Total number of snapshots discovered via incremental scan",
93 &["source_id", "source_name", "table_name"],
94 registry
95 )
96 .unwrap();
97
98 let iceberg_source_list_duration_seconds = register_guarded_histogram_vec_with_registry!(
99 histogram_opts!(
100 "iceberg_source_list_duration_seconds",
101 "Time spent planning files from a snapshot",
102 exponential_buckets(0.01, 2.0, 15).unwrap() ),
104 &["source_id", "source_name", "table_name"],
105 registry
106 )
107 .unwrap();
108
109 let iceberg_source_files_discovered_total =
110 register_guarded_int_counter_vec_with_registry!(
111 "iceberg_source_files_discovered_total",
112 "Total number of files discovered per scan",
113 &["source_id", "source_name", "table_name", "file_type"],
114 registry
115 )
116 .unwrap();
117
118 let iceberg_source_file_read_duration_seconds =
121 register_guarded_histogram_vec_with_registry!(
122 histogram_opts!(
123 "iceberg_source_file_read_duration_seconds",
124 "Per-file read duration",
125 exponential_buckets(0.01, 2.0, 15).unwrap() ),
127 &["table_name"],
128 registry
129 )
130 .unwrap();
131
132 let iceberg_source_rows_read_total = register_guarded_int_counter_vec_with_registry!(
133 "iceberg_source_rows_read_total",
134 "Total rows read from Iceberg source",
135 &["table_name"],
136 registry
137 )
138 .unwrap();
139
140 let iceberg_source_files_read_total = register_guarded_int_counter_vec_with_registry!(
141 "iceberg_source_files_read_total",
142 "Total files read from Iceberg source",
143 &["table_name", "file_type"],
144 registry
145 )
146 .unwrap();
147
148 let iceberg_source_delete_rows_applied_total =
149 register_guarded_int_counter_vec_with_registry!(
150 "iceberg_source_delete_rows_applied_total",
151 "Total rows removed by delete processing",
152 &["table_name", "delete_type"],
153 registry
154 )
155 .unwrap();
156
157 let iceberg_source_delete_files_per_data_file =
158 register_guarded_histogram_vec_with_registry!(
159 histogram_opts!(
160 "iceberg_source_delete_files_per_data_file",
161 "Number of delete files attached per data file scan task",
162 exponential_buckets(1.0, 2.0, 8).unwrap()
164 ),
165 &["source_id", "source_name", "table_name"],
166 registry
167 )
168 .unwrap();
169
170 let iceberg_source_inflight_file_count = register_guarded_int_gauge_vec_with_registry!(
171 "iceberg_source_inflight_file_count",
172 "Number of files currently being fetched by the active reader",
173 &["source_id", "source_name", "table_name"],
174 registry
175 )
176 .unwrap();
177
178 let iceberg_source_scan_errors_total = register_guarded_int_counter_vec_with_registry!(
179 "iceberg_source_scan_errors_total",
180 "Total number of scan errors categorized by error type",
181 &["source_id", "source_name", "table_name", "error_type"],
182 registry
183 )
184 .unwrap();
185
186 Self {
187 iceberg_read_bytes,
188 iceberg_source_snapshot_lag_seconds,
189 iceberg_source_snapshots_discovered_total,
190 iceberg_source_list_duration_seconds,
191 iceberg_source_files_discovered_total,
192 iceberg_source_file_read_duration_seconds,
193 iceberg_source_rows_read_total,
194 iceberg_source_files_read_total,
195 iceberg_source_delete_rows_applied_total,
196 iceberg_source_delete_files_per_data_file,
197 iceberg_source_inflight_file_count,
198 iceberg_source_scan_errors_total,
199 }
200 }
201
202 pub fn for_test() -> Arc<Self> {
203 Arc::new(GLOBAL_ICEBERG_SCAN_METRICS.clone())
204 }
205}
206
207pub static GLOBAL_ICEBERG_SCAN_METRICS: LazyLock<IcebergScanMetrics> =
208 LazyLock::new(|| IcebergScanMetrics::new(&GLOBAL_METRICS_REGISTRY));