risingwave_storage/monitor/
monitored_storage_metrics.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cell::RefCell;
16use std::collections::HashMap;
17use std::sync::{Arc, OnceLock};
18use std::time::{Duration, Instant};
19
20use prometheus::{
21    Histogram, Registry, exponential_buckets, histogram_opts, linear_buckets,
22    register_histogram_with_registry,
23};
24use risingwave_common::config::MetricLevel;
25use risingwave_common::metrics::{
26    LabelGuardedIntCounterVec, LabelGuardedIntGauge, LabelGuardedLocalHistogram,
27    LabelGuardedLocalIntCounter, RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec,
28    RelabeledGuardedIntGaugeVec,
29};
30use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
31use risingwave_common::{
32    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
33    register_guarded_int_gauge_vec_with_registry,
34};
35
36use crate::store::{
37    ChangeLogValue, IterItem, StateStoreKeyedRow, StateStoreKeyedRowRef, StateStoreReadLogItem,
38    StateStoreReadLogItemRef,
39};
40
41/// [`MonitoredStorageMetrics`] stores the performance and IO metrics of Storage.
42#[derive(Debug, Clone)]
43pub struct MonitoredStorageMetrics {
44    pub get_duration: RelabeledGuardedHistogramVec<1>,
45    pub get_key_size: RelabeledGuardedHistogramVec<1>,
46    pub get_value_size: RelabeledGuardedHistogramVec<1>,
47
48    // [table_id, iter_type: {"iter", "iter_log"}]
49    pub iter_size: RelabeledGuardedHistogramVec<2>,
50    pub iter_item: RelabeledGuardedHistogramVec<2>,
51    pub iter_init_duration: RelabeledGuardedHistogramVec<2>,
52    pub iter_scan_duration: RelabeledGuardedHistogramVec<2>,
53    pub iter_counts: RelabeledGuardedIntCounterVec<2>,
54    pub iter_in_progress_counts: RelabeledGuardedIntGaugeVec<2>,
55
56    // [table_id, op_type]
57    pub iter_log_op_type_counts: LabelGuardedIntCounterVec<2>,
58
59    pub sync_duration: Histogram,
60    pub sync_size: Histogram,
61}
62
63pub static GLOBAL_STORAGE_METRICS: OnceLock<MonitoredStorageMetrics> = OnceLock::new();
64
65pub fn global_storage_metrics(metric_level: MetricLevel) -> MonitoredStorageMetrics {
66    GLOBAL_STORAGE_METRICS
67        .get_or_init(|| MonitoredStorageMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
68        .clone()
69}
70
71impl MonitoredStorageMetrics {
72    pub fn new(registry: &Registry, metric_level: MetricLevel) -> Self {
73        // 256B ~ max 64GB
74        let size_buckets = exponential_buckets(256.0, 16.0, 8).unwrap();
75        // 10ms ~ max 2.7h
76        let time_buckets = exponential_buckets(0.01, 10.0, 7).unwrap();
77        // ----- get -----
78        let opts = histogram_opts!(
79            "state_store_get_key_size",
80            "Total key bytes of get that have been issued to state store",
81            size_buckets.clone()
82        );
83        let get_key_size =
84            register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
85        let get_key_size = RelabeledGuardedHistogramVec::with_metric_level(
86            MetricLevel::Debug,
87            get_key_size,
88            metric_level,
89        );
90
91        let opts = histogram_opts!(
92            "state_store_get_value_size",
93            "Total value bytes that have been requested from remote storage",
94            size_buckets.clone()
95        );
96        let get_value_size =
97            register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
98        let get_value_size = RelabeledGuardedHistogramVec::with_metric_level(
99            MetricLevel::Debug,
100            get_value_size,
101            metric_level,
102        );
103
104        let mut buckets = exponential_buckets(0.000004, 2.0, 4).unwrap(); // 4 ~ 32us
105        buckets.extend(linear_buckets(0.00006, 0.00004, 5).unwrap()); // 60 ~ 220us.
106        buckets.extend(linear_buckets(0.0003, 0.0001, 3).unwrap()); // 300 ~ 500us.
107        buckets.extend(exponential_buckets(0.001, 2.0, 5).unwrap()); // 1 ~ 16ms.
108        buckets.extend(exponential_buckets(0.05, 4.0, 5).unwrap()); // 0.05 ~ 1.28s.
109        buckets.push(16.0); // 16s
110
111        // 1ms - 100s
112        let mut state_store_read_time_buckets = exponential_buckets(0.001, 10.0, 5).unwrap();
113        state_store_read_time_buckets.push(40.0);
114        state_store_read_time_buckets.push(100.0);
115
116        let get_duration_opts = histogram_opts!(
117            "state_store_get_duration",
118            "Total latency of get that have been issued to state store",
119            state_store_read_time_buckets.clone(),
120        );
121        let get_duration = register_guarded_histogram_vec_with_registry!(
122            get_duration_opts,
123            &["table_id"],
124            registry
125        )
126        .unwrap();
127        let get_duration = RelabeledGuardedHistogramVec::with_metric_level(
128            MetricLevel::Critical,
129            get_duration,
130            metric_level,
131        );
132
133        let opts = histogram_opts!(
134            "state_store_iter_size",
135            "Total bytes gotten from state store scan(), for calculating read throughput",
136            size_buckets.clone()
137        );
138        let iter_size = register_guarded_histogram_vec_with_registry!(
139            opts,
140            &["table_id", "iter_type"],
141            registry
142        )
143        .unwrap();
144        let iter_size = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
145            MetricLevel::Debug,
146            iter_size,
147            metric_level,
148            1,
149        );
150
151        let opts = histogram_opts!(
152            "state_store_iter_item",
153            "Total bytes gotten from state store scan(), for calculating read throughput",
154            size_buckets.clone(),
155        );
156        let iter_item = register_guarded_histogram_vec_with_registry!(
157            opts,
158            &["table_id", "iter_type"],
159            registry
160        )
161        .unwrap();
162        let iter_item = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
163            MetricLevel::Debug,
164            iter_item,
165            metric_level,
166            1,
167        );
168
169        let opts = histogram_opts!(
170            "state_store_iter_init_duration",
171            "Histogram of the time spent on iterator initialization.",
172            state_store_read_time_buckets.clone(),
173        );
174        let iter_init_duration = register_guarded_histogram_vec_with_registry!(
175            opts,
176            &["table_id", "iter_type"],
177            registry
178        )
179        .unwrap();
180        let iter_init_duration = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
181            MetricLevel::Critical,
182            iter_init_duration,
183            metric_level,
184            1,
185        );
186
187        let opts = histogram_opts!(
188            "state_store_iter_scan_duration",
189            "Histogram of the time spent on iterator scanning.",
190            state_store_read_time_buckets.clone(),
191        );
192        let iter_scan_duration = register_guarded_histogram_vec_with_registry!(
193            opts,
194            &["table_id", "iter_type"],
195            registry
196        )
197        .unwrap();
198        let iter_scan_duration = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
199            MetricLevel::Critical,
200            iter_scan_duration,
201            metric_level,
202            1,
203        );
204
205        let iter_counts = register_guarded_int_counter_vec_with_registry!(
206            "state_store_iter_counts",
207            "Total number of iter that have been issued to state store",
208            &["table_id", "iter_type"],
209            registry
210        )
211        .unwrap();
212        let iter_counts = RelabeledGuardedIntCounterVec::with_metric_level_relabel_n(
213            MetricLevel::Debug,
214            iter_counts,
215            metric_level,
216            1,
217        );
218
219        let iter_in_progress_counts = register_guarded_int_gauge_vec_with_registry!(
220            "state_store_iter_in_progress_counts",
221            "Total number of existing iter",
222            &["table_id", "iter_type"],
223            registry
224        )
225        .unwrap();
226        let iter_in_progress_counts = RelabeledGuardedIntGaugeVec::with_metric_level_relabel_n(
227            MetricLevel::Debug,
228            iter_in_progress_counts,
229            metric_level,
230            1,
231        );
232
233        let iter_log_op_type_counts = register_guarded_int_counter_vec_with_registry!(
234            "state_store_iter_log_op_type_counts",
235            "Counter of each op type in iter_log",
236            &["table_id", "op_type"],
237            registry
238        )
239        .unwrap();
240
241        let opts = histogram_opts!(
242            "state_store_sync_duration",
243            "Histogram of time spent on compacting shared buffer to remote storage",
244            time_buckets,
245        );
246        let sync_duration = register_histogram_with_registry!(opts, registry).unwrap();
247
248        let opts = histogram_opts!(
249            "state_store_sync_size",
250            "Total size of upload to l0 every epoch",
251            size_buckets,
252        );
253        let sync_size = register_histogram_with_registry!(opts, registry).unwrap();
254
255        Self {
256            get_duration,
257            get_key_size,
258            get_value_size,
259            iter_size,
260            iter_item,
261            iter_init_duration,
262            iter_scan_duration,
263            iter_counts,
264            iter_in_progress_counts,
265            iter_log_op_type_counts,
266            sync_duration,
267            sync_size,
268        }
269    }
270
271    pub fn unused() -> Self {
272        global_storage_metrics(MetricLevel::Disabled)
273    }
274
275    fn local_iter_metrics(&self, table_label: &str) -> LocalIterMetrics {
276        let inner = self.new_local_iter_metrics_inner(table_label, "iter");
277        LocalIterMetrics {
278            inner,
279            report_count: 0,
280        }
281    }
282
283    fn new_local_iter_metrics_inner(
284        &self,
285        table_label: &str,
286        iter_type: &str,
287    ) -> LocalIterMetricsInner {
288        let iter_init_duration = self
289            .iter_init_duration
290            .with_guarded_label_values(&[table_label, iter_type])
291            .local();
292        let iter_counts = self
293            .iter_counts
294            .with_guarded_label_values(&[table_label, iter_type])
295            .local();
296        let iter_scan_duration = self
297            .iter_scan_duration
298            .with_guarded_label_values(&[table_label, iter_type])
299            .local();
300        let iter_item = self
301            .iter_item
302            .with_guarded_label_values(&[table_label, iter_type])
303            .local();
304        let iter_size = self
305            .iter_size
306            .with_guarded_label_values(&[table_label, iter_type])
307            .local();
308        let iter_in_progress_counts = self
309            .iter_in_progress_counts
310            .with_guarded_label_values(&[table_label, iter_type]);
311
312        LocalIterMetricsInner {
313            iter_init_duration,
314            iter_scan_duration,
315            iter_counts,
316            iter_item,
317            iter_size,
318            iter_in_progress_counts,
319        }
320    }
321
322    fn local_iter_log_metrics(&self, table_label: &str) -> LocalIterLogMetrics {
323        let iter_metrics = self.new_local_iter_metrics_inner(table_label, "iter_log");
324        let insert_count = self
325            .iter_log_op_type_counts
326            .with_guarded_label_values(&[table_label, "INSERT"])
327            .local();
328        let update_count = self
329            .iter_log_op_type_counts
330            .with_guarded_label_values(&[table_label, "UPDATE"])
331            .local();
332        let delete_count = self
333            .iter_log_op_type_counts
334            .with_guarded_label_values(&[table_label, "DELETE"])
335            .local();
336        LocalIterLogMetrics {
337            iter_metrics,
338            insert_count,
339            update_count,
340            delete_count,
341            report_count: 0,
342        }
343    }
344
345    fn local_get_metrics(&self, table_label: &str) -> LocalGetMetrics {
346        let get_duration = self
347            .get_duration
348            .with_guarded_label_values(&[table_label])
349            .local();
350        let get_key_size = self
351            .get_key_size
352            .with_guarded_label_values(&[table_label])
353            .local();
354        let get_value_size = self
355            .get_value_size
356            .with_guarded_label_values(&[table_label])
357            .local();
358
359        LocalGetMetrics {
360            get_duration,
361            get_key_size,
362            get_value_size,
363            report_count: 0,
364        }
365    }
366}
367
368struct LocalIterMetricsInner {
369    iter_init_duration: LabelGuardedLocalHistogram<2>,
370    iter_scan_duration: LabelGuardedLocalHistogram<2>,
371    iter_counts: LabelGuardedLocalIntCounter<2>,
372    iter_item: LabelGuardedLocalHistogram<2>,
373    iter_size: LabelGuardedLocalHistogram<2>,
374    iter_in_progress_counts: LabelGuardedIntGauge<2>,
375}
376
377struct LocalIterMetrics {
378    inner: LocalIterMetricsInner,
379    report_count: usize,
380}
381
382impl LocalIterMetrics {
383    fn may_flush(&mut self) {
384        self.report_count += 1;
385        if self.report_count > MAX_FLUSH_TIMES {
386            self.inner.flush();
387            self.report_count = 0;
388        }
389    }
390}
391
392impl LocalIterMetricsInner {
393    fn flush(&mut self) {
394        self.iter_scan_duration.flush();
395        self.iter_init_duration.flush();
396        self.iter_counts.flush();
397        self.iter_item.flush();
398        self.iter_size.flush();
399    }
400}
401
402struct LocalGetMetrics {
403    get_duration: LabelGuardedLocalHistogram<1>,
404    get_key_size: LabelGuardedLocalHistogram<1>,
405    get_value_size: LabelGuardedLocalHistogram<1>,
406    report_count: usize,
407}
408
409impl LocalGetMetrics {
410    fn may_flush(&mut self) {
411        self.report_count += 1;
412        if self.report_count > MAX_FLUSH_TIMES {
413            self.get_duration.flush();
414            self.get_key_size.flush();
415            self.get_value_size.flush();
416            self.report_count = 0;
417        }
418    }
419}
420
421struct LocalIterLogMetrics {
422    iter_metrics: LocalIterMetricsInner,
423    insert_count: LabelGuardedLocalIntCounter<2>,
424    update_count: LabelGuardedLocalIntCounter<2>,
425    delete_count: LabelGuardedLocalIntCounter<2>,
426    report_count: usize,
427}
428
429impl LocalIterLogMetrics {
430    fn may_flush(&mut self) {
431        self.report_count += 1;
432        if self.report_count > MAX_FLUSH_TIMES {
433            self.iter_metrics.flush();
434            self.insert_count.flush();
435            self.update_count.flush();
436            self.delete_count.flush();
437            self.report_count = 0;
438        }
439    }
440}
441
442pub(crate) trait StateStoreIterStatsTrait: Send {
443    type Item: IterItem;
444    fn new(table_id: u32, metrics: &MonitoredStorageMetrics, iter_init_duration: Duration) -> Self;
445    fn observe(&mut self, item: <Self::Item as IterItem>::ItemRef<'_>);
446    fn report(&mut self, table_id: u32, metrics: &MonitoredStorageMetrics);
447}
448
449const MAX_FLUSH_TIMES: usize = 64;
450
451struct StateStoreIterStatsInner {
452    pub iter_init_duration: Duration,
453    pub iter_scan_time: Instant,
454    pub total_items: usize,
455    pub total_size: usize,
456}
457
458impl StateStoreIterStatsInner {
459    fn new(iter_init_duration: Duration) -> Self {
460        Self {
461            iter_init_duration,
462            iter_scan_time: Instant::now(),
463            total_items: 0,
464            total_size: 0,
465        }
466    }
467}
468
469pub(crate) struct MonitoredStateStoreIterStats<S: StateStoreIterStatsTrait> {
470    pub inner: S,
471    pub table_id: u32,
472    pub metrics: Arc<MonitoredStorageMetrics>,
473}
474
475impl<S: StateStoreIterStatsTrait> Drop for MonitoredStateStoreIterStats<S> {
476    fn drop(&mut self) {
477        self.inner.report(self.table_id, &self.metrics)
478    }
479}
480
481pub(crate) struct StateStoreIterStats {
482    inner: StateStoreIterStatsInner,
483}
484
485impl StateStoreIterStats {
486    fn for_table_metrics(
487        table_id: u32,
488        global_metrics: &MonitoredStorageMetrics,
489        f: impl FnOnce(&mut LocalIterMetrics),
490    ) {
491        thread_local!(static LOCAL_ITER_METRICS: RefCell<HashMap<u32, LocalIterMetrics>> = RefCell::new(HashMap::default()));
492        LOCAL_ITER_METRICS.with_borrow_mut(|local_metrics| {
493            let table_metrics = local_metrics.entry(table_id).or_insert_with(|| {
494                let table_label = table_id.to_string();
495                global_metrics.local_iter_metrics(&table_label)
496            });
497            f(table_metrics)
498        });
499    }
500}
501
502impl StateStoreIterStatsTrait for StateStoreIterStats {
503    type Item = StateStoreKeyedRow;
504
505    fn new(table_id: u32, metrics: &MonitoredStorageMetrics, iter_init_duration: Duration) -> Self {
506        Self::for_table_metrics(table_id, metrics, |metrics| {
507            metrics.inner.iter_in_progress_counts.inc();
508        });
509        Self {
510            inner: StateStoreIterStatsInner::new(iter_init_duration),
511        }
512    }
513
514    fn observe(&mut self, (key, value): StateStoreKeyedRowRef<'_>) {
515        self.inner.total_items += 1;
516        self.inner.total_size += key.encoded_len() + value.len();
517    }
518
519    fn report(&mut self, table_id: u32, metrics: &MonitoredStorageMetrics) {
520        Self::for_table_metrics(table_id, metrics, |table_metrics| {
521            self.inner.apply_to_local(&mut table_metrics.inner);
522            table_metrics.may_flush();
523        });
524    }
525}
526
527impl StateStoreIterStatsInner {
528    fn apply_to_local(&self, table_metrics: &mut LocalIterMetricsInner) {
529        {
530            let iter_scan_duration = self.iter_scan_time.elapsed();
531            table_metrics
532                .iter_scan_duration
533                .observe(iter_scan_duration.as_secs_f64());
534            table_metrics
535                .iter_init_duration
536                .observe(self.iter_init_duration.as_secs_f64());
537            table_metrics.iter_counts.inc();
538            table_metrics.iter_item.observe(self.total_items as f64);
539            table_metrics.iter_size.observe(self.total_size as f64);
540            table_metrics.iter_in_progress_counts.dec();
541        }
542    }
543}
544
545pub(crate) struct StateStoreIterLogStats {
546    inner: StateStoreIterStatsInner,
547    insert_count: u64,
548    update_count: u64,
549    delete_count: u64,
550}
551
552impl StateStoreIterLogStats {
553    fn for_table_metrics(
554        table_id: u32,
555        global_metrics: &MonitoredStorageMetrics,
556        f: impl FnOnce(&mut LocalIterLogMetrics),
557    ) {
558        thread_local!(static LOCAL_ITER_LOG_METRICS: RefCell<HashMap<u32, LocalIterLogMetrics>> = RefCell::new(HashMap::default()));
559        LOCAL_ITER_LOG_METRICS.with_borrow_mut(|local_metrics| {
560            let table_metrics = local_metrics.entry(table_id).or_insert_with(|| {
561                let table_label = table_id.to_string();
562                global_metrics.local_iter_log_metrics(&table_label)
563            });
564            f(table_metrics)
565        });
566    }
567}
568
569impl StateStoreIterStatsTrait for StateStoreIterLogStats {
570    type Item = StateStoreReadLogItem;
571
572    fn new(table_id: u32, metrics: &MonitoredStorageMetrics, iter_init_duration: Duration) -> Self {
573        Self::for_table_metrics(table_id, metrics, |metrics| {
574            metrics.iter_metrics.iter_in_progress_counts.inc();
575        });
576        Self {
577            inner: StateStoreIterStatsInner::new(iter_init_duration),
578            insert_count: 0,
579            update_count: 0,
580            delete_count: 0,
581        }
582    }
583
584    fn observe(&mut self, (key, change_log): StateStoreReadLogItemRef<'_>) {
585        self.inner.total_items += 1;
586        let value_len = match change_log {
587            ChangeLogValue::Insert(value) => {
588                self.insert_count += 1;
589                value.len()
590            }
591            ChangeLogValue::Update {
592                old_value,
593                new_value,
594            } => {
595                self.update_count += 1;
596                old_value.len() + new_value.len()
597            }
598            ChangeLogValue::Delete(value) => {
599                self.delete_count += 1;
600                value.len()
601            }
602        };
603        self.inner.total_size += key.len() + value_len;
604    }
605
606    fn report(&mut self, table_id: u32, metrics: &MonitoredStorageMetrics) {
607        Self::for_table_metrics(table_id, metrics, |table_metrics| {
608            self.inner.apply_to_local(&mut table_metrics.iter_metrics);
609            table_metrics.insert_count.inc_by(self.insert_count);
610            table_metrics.update_count.inc_by(self.update_count);
611            table_metrics.delete_count.inc_by(self.delete_count);
612            table_metrics.may_flush();
613        });
614    }
615}
616
617pub(crate) struct MonitoredStateStoreGetStats {
618    pub get_duration: Instant,
619    pub get_key_size: usize,
620    pub get_value_size: usize,
621    pub table_id: u32,
622    pub metrics: Arc<MonitoredStorageMetrics>,
623}
624
625impl MonitoredStateStoreGetStats {
626    pub(crate) fn new(table_id: u32, metrics: Arc<MonitoredStorageMetrics>) -> Self {
627        Self {
628            get_duration: Instant::now(),
629            get_key_size: 0,
630            get_value_size: 0,
631            table_id,
632            metrics,
633        }
634    }
635
636    pub(crate) fn report(&self) {
637        thread_local!(static LOCAL_GET_METRICS: RefCell<HashMap<u32, LocalGetMetrics>> = RefCell::new(HashMap::default()));
638        LOCAL_GET_METRICS.with_borrow_mut(|local_metrics| {
639            let table_metrics = local_metrics.entry(self.table_id).or_insert_with(|| {
640                let table_label = self.table_id.to_string();
641                self.metrics.local_get_metrics(&table_label)
642            });
643            let get_duration = self.get_duration.elapsed();
644            table_metrics
645                .get_duration
646                .observe(get_duration.as_secs_f64());
647            table_metrics.get_key_size.observe(self.get_key_size as _);
648            if self.get_value_size > 0 {
649                table_metrics
650                    .get_value_size
651                    .observe(self.get_value_size as _);
652            }
653            table_metrics.may_flush();
654        });
655    }
656}