1use std::sync::LazyLock;
16
17use prometheus::core::{AtomicU64, GenericCounter, GenericCounterVec};
18use prometheus::{
19 Histogram, HistogramVec, IntGauge, Registry, exponential_buckets, histogram_opts,
20 register_histogram_vec_with_registry, register_histogram_with_registry,
21 register_int_counter_vec_with_registry, register_int_counter_with_registry,
22 register_int_gauge_with_registry,
23};
24use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
25
26#[derive(Debug, Clone)]
27pub struct CompactorMetrics {
28 pub compaction_upload_sst_counts: GenericCounter<AtomicU64>,
29 pub compact_fast_runner_bytes: GenericCounter<AtomicU64>,
30 pub compact_write_bytes: GenericCounterVec<AtomicU64>,
31 pub compact_read_current_level: GenericCounterVec<AtomicU64>,
32 pub compact_read_next_level: GenericCounterVec<AtomicU64>,
33 pub compact_read_sstn_current_level: GenericCounterVec<AtomicU64>,
34 pub compact_read_sstn_next_level: GenericCounterVec<AtomicU64>,
35 pub compact_write_sstn: GenericCounterVec<AtomicU64>,
36 pub compact_sst_duration: Histogram,
37 pub compact_task_duration: HistogramVec,
38 pub compact_task_pending_num: IntGauge,
39 pub compact_task_pending_parallelism: IntGauge,
40 pub write_build_l0_sst_duration: Histogram,
41 pub shared_buffer_to_sstable_size: Histogram,
42 pub get_table_id_total_time_duration: Histogram,
43 pub remote_read_time: Histogram,
44 pub sstable_bloom_filter_size: Histogram,
45 pub sstable_file_size: Histogram,
46 pub sstable_avg_key_size: Histogram,
47 pub sstable_avg_value_size: Histogram,
48 pub iter_scan_key_counts: GenericCounterVec<AtomicU64>,
49 pub write_build_l0_bytes: GenericCounter<AtomicU64>,
50 pub sstable_distinct_epoch_count: Histogram,
51 pub compaction_event_consumed_latency: Histogram,
52 pub compaction_event_loop_iteration_latency: Histogram,
53 pub sstable_block_size: Histogram,
54}
55
56pub static GLOBAL_COMPACTOR_METRICS: LazyLock<CompactorMetrics> =
57 LazyLock::new(|| CompactorMetrics::new(&GLOBAL_METRICS_REGISTRY));
58
59impl CompactorMetrics {
60 fn new(registry: &Registry) -> Self {
61 let size_buckets = exponential_buckets(256.0, 16.0, 7).unwrap();
63 let time_buckets = exponential_buckets(0.01, 10.0, 7).unwrap();
65 let opts = histogram_opts!(
66 "compactor_shared_buffer_to_sstable_size",
67 "Histogram of batch size compacted from shared buffer to remote storage",
68 size_buckets.clone()
69 );
70 let shared_buffer_to_sstable_size =
71 register_histogram_with_registry!(opts, registry).unwrap();
72
73 let compaction_upload_sst_counts = register_int_counter_with_registry!(
74 "compactor_compaction_upload_sst_counts",
75 "Total number of sst uploads during compaction",
76 registry
77 )
78 .unwrap();
79
80 let opts = histogram_opts!(
81 "compactor_compact_sst_duration",
82 "Total time of compact_key_range that have been issued to state store",
83 time_buckets.clone()
84 );
85 let compact_sst_duration = register_histogram_with_registry!(opts, registry).unwrap();
86 let opts = histogram_opts!(
87 "compactor_compact_task_duration",
88 "Total time of compact that have been issued to state store",
89 time_buckets.clone()
90 );
91 let compact_task_duration =
92 register_histogram_vec_with_registry!(opts, &["group", "level"], registry).unwrap();
93
94 let opts = histogram_opts!(
95 "compactor_get_table_id_total_time_duration",
96 "Total time of compact that have been issued to state store",
97 time_buckets.clone()
98 );
99 let get_table_id_total_time_duration =
100 register_histogram_with_registry!(opts, registry).unwrap();
101
102 let opts = histogram_opts!(
103 "compactor_remote_read_time",
104 "Total time of operations which read from remote storage when enable prefetch",
105 time_buckets.clone()
106 );
107 let remote_read_time = register_histogram_with_registry!(opts, registry).unwrap();
108
109 let compact_read_current_level = register_int_counter_vec_with_registry!(
110 "storage_level_compact_read_curr",
111 "KBs read from current level during history compactions to next level",
112 &["group", "level_index"],
113 registry
114 )
115 .unwrap();
116
117 let compact_read_next_level = register_int_counter_vec_with_registry!(
118 "storage_level_compact_read_next",
119 "KBs read from next level during history compactions to next level",
120 &["group", "level_index"],
121 registry
122 )
123 .unwrap();
124
125 let compact_write_bytes = register_int_counter_vec_with_registry!(
126 "storage_level_compact_write",
127 "KBs written into next level during history compactions to next level",
128 &["group", "level_index"],
129 registry
130 )
131 .unwrap();
132
133 let compact_read_sstn_current_level = register_int_counter_vec_with_registry!(
134 "storage_level_compact_read_sstn_curr",
135 "num of SSTs read from current level during history compactions to next level",
136 &["group", "level_index"],
137 registry
138 )
139 .unwrap();
140
141 let compact_read_sstn_next_level = register_int_counter_vec_with_registry!(
142 "storage_level_compact_read_sstn_next",
143 "num of SSTs read from next level during history compactions to next level",
144 &["group", "level_index"],
145 registry
146 )
147 .unwrap();
148
149 let compact_write_sstn = register_int_counter_vec_with_registry!(
150 "storage_level_compact_write_sstn",
151 "num of SSTs written into next level during history compactions to next level",
152 &["group", "level_index"],
153 registry
154 )
155 .unwrap();
156
157 let compact_task_pending_num = register_int_gauge_with_registry!(
158 "storage_compact_task_pending_num",
159 "the num of storage compact task",
160 registry
161 )
162 .unwrap();
163
164 let compact_task_pending_parallelism = register_int_gauge_with_registry!(
165 "storage_compact_task_pending_parallelism",
166 "the num of storage compact parallelism",
167 registry
168 )
169 .unwrap();
170
171 let opts = histogram_opts!(
172 "compactor_sstable_bloom_filter_size",
173 "Total bytes gotten from sstable_bloom_filter, for observing bloom_filter size",
174 exponential_buckets(16.0, 16.0, 7).unwrap() );
176
177 let sstable_bloom_filter_size = register_histogram_with_registry!(opts, registry).unwrap();
178
179 let opts = histogram_opts!(
180 "compactor_sstable_file_size",
181 "Total bytes gotten from sstable_file_size, for observing sstable_file_size",
182 size_buckets.clone()
183 );
184
185 let sstable_file_size = register_histogram_with_registry!(opts, registry).unwrap();
186
187 let opts = histogram_opts!(
188 "compactor_sstable_avg_key_size",
189 "Total bytes gotten from sstable_avg_key_size, for observing sstable_avg_key_size",
190 size_buckets.clone()
191 );
192
193 let sstable_avg_key_size = register_histogram_with_registry!(opts, registry).unwrap();
194
195 let opts = histogram_opts!(
196 "compactor_sstable_avg_value_size",
197 "Total bytes gotten from sstable_avg_value_size, for observing sstable_avg_value_size",
198 size_buckets.clone()
199 );
200
201 let sstable_avg_value_size = register_histogram_with_registry!(opts, registry).unwrap();
202
203 let opts = histogram_opts!(
204 "state_store_write_build_l0_sst_duration",
205 "Total time of batch_write_build_table that have been issued to state store",
206 time_buckets.clone()
207 );
208 let write_build_l0_sst_duration =
209 register_histogram_with_registry!(opts, registry).unwrap();
210
211 let iter_scan_key_counts = register_int_counter_vec_with_registry!(
212 "compactor_iter_scan_key_counts",
213 "Total number of keys read by iterator",
214 &["type"],
215 registry
216 )
217 .unwrap();
218
219 let write_build_l0_bytes = register_int_counter_with_registry!(
220 "compactor_write_build_l0_bytes",
221 "Total size of compaction files size that have been written to object store from shared buffer",
222 registry
223 ).unwrap();
224 let compact_fast_runner_bytes = register_int_counter_with_registry!(
225 "compactor_fast_compact_bytes",
226 "Total size of compaction files size of fast compactor runner",
227 registry
228 )
229 .unwrap();
230 let opts = histogram_opts!(
231 "compactor_sstable_distinct_epoch_count",
232 "Total number gotten from sstable_distinct_epoch_count, for observing sstable_distinct_epoch_count",
233 exponential_buckets(1.0, 10.0, 6).unwrap()
234 );
235
236 let sstable_distinct_epoch_count =
237 register_histogram_with_registry!(opts, registry).unwrap();
238
239 let opts = histogram_opts!(
240 "compactor_compaction_event_consumed_latency",
241 "The latency of each event being consumed",
242 time_buckets.clone()
243 );
244 let compaction_event_consumed_latency =
245 register_histogram_with_registry!(opts, registry).unwrap();
246
247 let opts = histogram_opts!(
248 "compactor_compaction_event_loop_iteration_latency",
249 "The latency of each iteration of the compaction event loop",
250 time_buckets
251 );
252 let compaction_event_loop_iteration_latency =
253 register_histogram_with_registry!(opts, registry).unwrap();
254
255 let opts = histogram_opts!(
256 "compactor_sstable_block_size",
257 "Total bytes gotten from sstable_block_size, for observing sstable_block_size",
258 size_buckets,
259 );
260
261 let sstable_block_size = register_histogram_with_registry!(opts, registry).unwrap();
262
263 Self {
264 compaction_upload_sst_counts,
265 compact_fast_runner_bytes,
266 compact_write_bytes,
267 compact_read_current_level,
268 compact_read_next_level,
269 compact_read_sstn_current_level,
270 compact_read_sstn_next_level,
271 compact_write_sstn,
272 compact_sst_duration,
273 compact_task_duration,
274 compact_task_pending_num,
275 compact_task_pending_parallelism,
276 write_build_l0_sst_duration,
277 shared_buffer_to_sstable_size,
278 get_table_id_total_time_duration,
279 remote_read_time,
280 sstable_bloom_filter_size,
281 sstable_file_size,
282 sstable_avg_key_size,
283 sstable_avg_value_size,
284 iter_scan_key_counts,
285 write_build_l0_bytes,
286 sstable_distinct_epoch_count,
287 compaction_event_consumed_latency,
288 compaction_event_loop_iteration_latency,
289 sstable_block_size,
290 }
291 }
292
293 pub fn unused() -> Self {
295 GLOBAL_COMPACTOR_METRICS.clone()
296 }
297}