risingwave_storage/monitor/
compactor_metrics.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::LazyLock;
16
17use prometheus::core::{AtomicU64, GenericCounter, GenericCounterVec};
18use prometheus::{
19    Histogram, HistogramVec, IntGauge, Registry, exponential_buckets, histogram_opts,
20    register_histogram_vec_with_registry, register_histogram_with_registry,
21    register_int_counter_vec_with_registry, register_int_counter_with_registry,
22    register_int_gauge_with_registry,
23};
24use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
25
26#[derive(Debug, Clone)]
27pub struct CompactorMetrics {
28    pub compaction_upload_sst_counts: GenericCounter<AtomicU64>,
29    pub compact_fast_runner_bytes: GenericCounter<AtomicU64>,
30    pub compact_write_bytes: GenericCounterVec<AtomicU64>,
31    pub compact_read_current_level: GenericCounterVec<AtomicU64>,
32    pub compact_read_next_level: GenericCounterVec<AtomicU64>,
33    pub compact_read_sstn_current_level: GenericCounterVec<AtomicU64>,
34    pub compact_read_sstn_next_level: GenericCounterVec<AtomicU64>,
35    pub compact_write_sstn: GenericCounterVec<AtomicU64>,
36    pub compact_sst_duration: Histogram,
37    pub compact_task_duration: HistogramVec,
38    pub compact_task_pending_num: IntGauge,
39    pub compact_task_pending_parallelism: IntGauge,
40    pub write_build_l0_sst_duration: Histogram,
41    pub shared_buffer_to_sstable_size: Histogram,
42    pub get_table_id_total_time_duration: Histogram,
43    pub remote_read_time: Histogram,
44    pub sstable_bloom_filter_size: Histogram,
45    pub sstable_file_size: Histogram,
46    pub sstable_avg_key_size: Histogram,
47    pub sstable_avg_value_size: Histogram,
48    pub iter_scan_key_counts: GenericCounterVec<AtomicU64>,
49    pub write_build_l0_bytes: GenericCounter<AtomicU64>,
50    pub sstable_distinct_epoch_count: Histogram,
51    pub compaction_event_consumed_latency: Histogram,
52    pub compaction_event_loop_iteration_latency: Histogram,
53    pub sstable_block_size: Histogram,
54}
55
56pub static GLOBAL_COMPACTOR_METRICS: LazyLock<CompactorMetrics> =
57    LazyLock::new(|| CompactorMetrics::new(&GLOBAL_METRICS_REGISTRY));
58
59impl CompactorMetrics {
60    fn new(registry: &Registry) -> Self {
61        // 256B - 4GB
62        let size_buckets = exponential_buckets(256.0, 16.0, 7).unwrap();
63        // 10ms - 2.7h
64        let time_buckets = exponential_buckets(0.01, 10.0, 7).unwrap();
65        let opts = histogram_opts!(
66            "compactor_shared_buffer_to_sstable_size",
67            "Histogram of batch size compacted from shared buffer to remote storage",
68            size_buckets.clone()
69        );
70        let shared_buffer_to_sstable_size =
71            register_histogram_with_registry!(opts, registry).unwrap();
72
73        let compaction_upload_sst_counts = register_int_counter_with_registry!(
74            "compactor_compaction_upload_sst_counts",
75            "Total number of sst uploads during compaction",
76            registry
77        )
78        .unwrap();
79
80        let opts = histogram_opts!(
81            "compactor_compact_sst_duration",
82            "Total time of compact_key_range that have been issued to state store",
83            time_buckets.clone()
84        );
85        let compact_sst_duration = register_histogram_with_registry!(opts, registry).unwrap();
86        let opts = histogram_opts!(
87            "compactor_compact_task_duration",
88            "Total time of compact that have been issued to state store",
89            time_buckets.clone()
90        );
91        let compact_task_duration =
92            register_histogram_vec_with_registry!(opts, &["group", "level"], registry).unwrap();
93
94        let opts = histogram_opts!(
95            "compactor_get_table_id_total_time_duration",
96            "Total time of compact that have been issued to state store",
97            time_buckets.clone()
98        );
99        let get_table_id_total_time_duration =
100            register_histogram_with_registry!(opts, registry).unwrap();
101
102        let opts = histogram_opts!(
103            "compactor_remote_read_time",
104            "Total time of operations which read from remote storage when enable prefetch",
105            time_buckets.clone()
106        );
107        let remote_read_time = register_histogram_with_registry!(opts, registry).unwrap();
108
109        let compact_read_current_level = register_int_counter_vec_with_registry!(
110            "storage_level_compact_read_curr",
111            "KBs read from current level during history compactions to next level",
112            &["group", "level_index"],
113            registry
114        )
115        .unwrap();
116
117        let compact_read_next_level = register_int_counter_vec_with_registry!(
118            "storage_level_compact_read_next",
119            "KBs read from next level during history compactions to next level",
120            &["group", "level_index"],
121            registry
122        )
123        .unwrap();
124
125        let compact_write_bytes = register_int_counter_vec_with_registry!(
126            "storage_level_compact_write",
127            "KBs written into next level during history compactions to next level",
128            &["group", "level_index"],
129            registry
130        )
131        .unwrap();
132
133        let compact_read_sstn_current_level = register_int_counter_vec_with_registry!(
134            "storage_level_compact_read_sstn_curr",
135            "num of SSTs read from current level during history compactions to next level",
136            &["group", "level_index"],
137            registry
138        )
139        .unwrap();
140
141        let compact_read_sstn_next_level = register_int_counter_vec_with_registry!(
142            "storage_level_compact_read_sstn_next",
143            "num of SSTs read from next level during history compactions to next level",
144            &["group", "level_index"],
145            registry
146        )
147        .unwrap();
148
149        let compact_write_sstn = register_int_counter_vec_with_registry!(
150            "storage_level_compact_write_sstn",
151            "num of SSTs written into next level during history compactions to next level",
152            &["group", "level_index"],
153            registry
154        )
155        .unwrap();
156
157        let compact_task_pending_num = register_int_gauge_with_registry!(
158            "storage_compact_task_pending_num",
159            "the num of storage compact task",
160            registry
161        )
162        .unwrap();
163
164        let compact_task_pending_parallelism = register_int_gauge_with_registry!(
165            "storage_compact_task_pending_parallelism",
166            "the num of storage compact parallelism",
167            registry
168        )
169        .unwrap();
170
171        let opts = histogram_opts!(
172            "compactor_sstable_bloom_filter_size",
173            "Total bytes gotten from sstable_bloom_filter, for observing bloom_filter size",
174            exponential_buckets(16.0, 16.0, 7).unwrap() // max 256MB
175        );
176
177        let sstable_bloom_filter_size = register_histogram_with_registry!(opts, registry).unwrap();
178
179        let opts = histogram_opts!(
180            "compactor_sstable_file_size",
181            "Total bytes gotten from sstable_file_size, for observing sstable_file_size",
182            size_buckets.clone()
183        );
184
185        let sstable_file_size = register_histogram_with_registry!(opts, registry).unwrap();
186
187        let opts = histogram_opts!(
188            "compactor_sstable_avg_key_size",
189            "Total bytes gotten from sstable_avg_key_size, for observing sstable_avg_key_size",
190            size_buckets.clone()
191        );
192
193        let sstable_avg_key_size = register_histogram_with_registry!(opts, registry).unwrap();
194
195        let opts = histogram_opts!(
196            "compactor_sstable_avg_value_size",
197            "Total bytes gotten from sstable_avg_value_size, for observing sstable_avg_value_size",
198            size_buckets.clone()
199        );
200
201        let sstable_avg_value_size = register_histogram_with_registry!(opts, registry).unwrap();
202
203        let opts = histogram_opts!(
204            "state_store_write_build_l0_sst_duration",
205            "Total time of batch_write_build_table that have been issued to state store",
206            time_buckets.clone()
207        );
208        let write_build_l0_sst_duration =
209            register_histogram_with_registry!(opts, registry).unwrap();
210
211        let iter_scan_key_counts = register_int_counter_vec_with_registry!(
212            "compactor_iter_scan_key_counts",
213            "Total number of keys read by iterator",
214            &["type"],
215            registry
216        )
217        .unwrap();
218
219        let write_build_l0_bytes = register_int_counter_with_registry!(
220            "compactor_write_build_l0_bytes",
221            "Total size of compaction files size that have been written to object store from shared buffer",
222            registry
223        ).unwrap();
224        let compact_fast_runner_bytes = register_int_counter_with_registry!(
225            "compactor_fast_compact_bytes",
226            "Total size of compaction files size of fast compactor runner",
227            registry
228        )
229        .unwrap();
230        let opts = histogram_opts!(
231            "compactor_sstable_distinct_epoch_count",
232            "Total number gotten from sstable_distinct_epoch_count, for observing sstable_distinct_epoch_count",
233            exponential_buckets(1.0, 10.0, 6).unwrap()
234        );
235
236        let sstable_distinct_epoch_count =
237            register_histogram_with_registry!(opts, registry).unwrap();
238
239        let opts = histogram_opts!(
240            "compactor_compaction_event_consumed_latency",
241            "The latency of each event being consumed",
242            time_buckets.clone()
243        );
244        let compaction_event_consumed_latency =
245            register_histogram_with_registry!(opts, registry).unwrap();
246
247        let opts = histogram_opts!(
248            "compactor_compaction_event_loop_iteration_latency",
249            "The latency of each iteration of the compaction event loop",
250            time_buckets
251        );
252        let compaction_event_loop_iteration_latency =
253            register_histogram_with_registry!(opts, registry).unwrap();
254
255        let opts = histogram_opts!(
256            "compactor_sstable_block_size",
257            "Total bytes gotten from sstable_block_size, for observing sstable_block_size",
258            size_buckets,
259        );
260
261        let sstable_block_size = register_histogram_with_registry!(opts, registry).unwrap();
262
263        Self {
264            compaction_upload_sst_counts,
265            compact_fast_runner_bytes,
266            compact_write_bytes,
267            compact_read_current_level,
268            compact_read_next_level,
269            compact_read_sstn_current_level,
270            compact_read_sstn_next_level,
271            compact_write_sstn,
272            compact_sst_duration,
273            compact_task_duration,
274            compact_task_pending_num,
275            compact_task_pending_parallelism,
276            write_build_l0_sst_duration,
277            shared_buffer_to_sstable_size,
278            get_table_id_total_time_duration,
279            remote_read_time,
280            sstable_bloom_filter_size,
281            sstable_file_size,
282            sstable_avg_key_size,
283            sstable_avg_value_size,
284            iter_scan_key_counts,
285            write_build_l0_bytes,
286            sstable_distinct_epoch_count,
287            compaction_event_consumed_latency,
288            compaction_event_loop_iteration_latency,
289            sstable_block_size,
290        }
291    }
292
293    /// Creates a new `HummockStateStoreMetrics` instance used in tests or other places.
294    pub fn unused() -> Self {
295        GLOBAL_COMPACTOR_METRICS.clone()
296    }
297}