risingwave_storage/monitor/
monitored_storage_metrics.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cell::RefCell;
16use std::collections::HashMap;
17use std::sync::{Arc, OnceLock};
18use std::time::{Duration, Instant};
19
20use prometheus::{
21    Histogram, Registry, exponential_buckets, histogram_opts, linear_buckets,
22    register_histogram_with_registry,
23};
24use risingwave_common::config::MetricLevel;
25use risingwave_common::id::TableId;
26use risingwave_common::metrics::{
27    LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGauge,
28    LabelGuardedLocalHistogram, LabelGuardedLocalIntCounter, RelabeledGuardedHistogramVec,
29    RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
30};
31use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
32use risingwave_common::{
33    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
34    register_guarded_int_gauge_vec_with_registry,
35};
36
37use crate::store::{
38    ChangeLogValue, IterItem, StateStoreKeyedRow, StateStoreKeyedRowRef, StateStoreReadLogItem,
39    StateStoreReadLogItemRef,
40};
41
42/// [`MonitoredStorageMetrics`] stores the performance and IO metrics of Storage.
43#[derive(Debug, Clone)]
44pub struct MonitoredStorageMetrics {
45    pub get_duration: RelabeledGuardedHistogramVec,
46    pub get_key_size: RelabeledGuardedHistogramVec,
47    pub get_value_size: RelabeledGuardedHistogramVec,
48
49    // [table_id, iter_type: {"iter", "iter_log"}]
50    pub iter_size: RelabeledGuardedHistogramVec,
51    pub iter_item: RelabeledGuardedHistogramVec,
52    pub iter_init_duration: RelabeledGuardedHistogramVec,
53    pub iter_scan_duration: RelabeledGuardedHistogramVec,
54    pub iter_counts: RelabeledGuardedIntCounterVec,
55    pub iter_in_progress_counts: RelabeledGuardedIntGaugeVec,
56
57    // [table_id, op_type]
58    pub iter_log_op_type_counts: LabelGuardedIntCounterVec,
59
60    // [table_id, top_n, ef_search]
61    pub vector_nearest_duration: LabelGuardedHistogramVec,
62
63    pub sync_duration: Histogram,
64    pub sync_size: Histogram,
65}
66
67pub static GLOBAL_STORAGE_METRICS: OnceLock<MonitoredStorageMetrics> = OnceLock::new();
68
69pub fn global_storage_metrics(metric_level: MetricLevel) -> MonitoredStorageMetrics {
70    GLOBAL_STORAGE_METRICS
71        .get_or_init(|| MonitoredStorageMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
72        .clone()
73}
74
75impl MonitoredStorageMetrics {
76    pub fn new(registry: &Registry, metric_level: MetricLevel) -> Self {
77        // 256B ~ max 64GB
78        let size_buckets = exponential_buckets(256.0, 16.0, 8).unwrap();
79        // 10ms ~ max 2.7h
80        let time_buckets = exponential_buckets(0.01, 10.0, 7).unwrap();
81        // ----- get -----
82        let opts = histogram_opts!(
83            "state_store_get_key_size",
84            "Total key bytes of get that have been issued to state store",
85            size_buckets.clone()
86        );
87        let get_key_size =
88            register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
89        let get_key_size = RelabeledGuardedHistogramVec::with_metric_level(
90            MetricLevel::Debug,
91            get_key_size,
92            metric_level,
93        );
94
95        let opts = histogram_opts!(
96            "state_store_get_value_size",
97            "Total value bytes that have been requested from remote storage",
98            size_buckets.clone()
99        );
100        let get_value_size =
101            register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
102        let get_value_size = RelabeledGuardedHistogramVec::with_metric_level(
103            MetricLevel::Debug,
104            get_value_size,
105            metric_level,
106        );
107
108        let mut buckets = exponential_buckets(0.000004, 2.0, 4).unwrap(); // 4 ~ 32us
109        buckets.extend(linear_buckets(0.00006, 0.00004, 5).unwrap()); // 60 ~ 220us.
110        buckets.extend(linear_buckets(0.0003, 0.0001, 3).unwrap()); // 300 ~ 500us.
111        buckets.extend(exponential_buckets(0.001, 2.0, 5).unwrap()); // 1 ~ 16ms.
112        buckets.extend(exponential_buckets(0.05, 4.0, 5).unwrap()); // 0.05 ~ 1.28s.
113        buckets.push(16.0); // 16s
114
115        // 1ms - 100s
116        let mut state_store_read_time_buckets = exponential_buckets(0.001, 10.0, 5).unwrap();
117        state_store_read_time_buckets.push(40.0);
118        state_store_read_time_buckets.push(100.0);
119
120        let get_duration_opts = histogram_opts!(
121            "state_store_get_duration",
122            "Total latency of get that have been issued to state store",
123            state_store_read_time_buckets.clone(),
124        );
125        let get_duration = register_guarded_histogram_vec_with_registry!(
126            get_duration_opts,
127            &["table_id"],
128            registry
129        )
130        .unwrap();
131        let get_duration = RelabeledGuardedHistogramVec::with_metric_level(
132            MetricLevel::Critical,
133            get_duration,
134            metric_level,
135        );
136
137        let opts = histogram_opts!(
138            "state_store_iter_size",
139            "Total bytes gotten from state store scan(), for calculating read throughput",
140            size_buckets.clone()
141        );
142        let iter_size = register_guarded_histogram_vec_with_registry!(
143            opts,
144            &["table_id", "iter_type"],
145            registry
146        )
147        .unwrap();
148        let iter_size = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
149            MetricLevel::Debug,
150            iter_size,
151            metric_level,
152            1,
153        );
154
155        let opts = histogram_opts!(
156            "state_store_iter_item",
157            "Total bytes gotten from state store scan(), for calculating read throughput",
158            size_buckets.clone(),
159        );
160        let iter_item = register_guarded_histogram_vec_with_registry!(
161            opts,
162            &["table_id", "iter_type"],
163            registry
164        )
165        .unwrap();
166        let iter_item = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
167            MetricLevel::Debug,
168            iter_item,
169            metric_level,
170            1,
171        );
172
173        let opts = histogram_opts!(
174            "state_store_iter_init_duration",
175            "Histogram of the time spent on iterator initialization.",
176            state_store_read_time_buckets.clone(),
177        );
178        let iter_init_duration = register_guarded_histogram_vec_with_registry!(
179            opts,
180            &["table_id", "iter_type"],
181            registry
182        )
183        .unwrap();
184        let iter_init_duration = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
185            MetricLevel::Critical,
186            iter_init_duration,
187            metric_level,
188            1,
189        );
190
191        let opts = histogram_opts!(
192            "state_store_iter_scan_duration",
193            "Histogram of the time spent on iterator scanning.",
194            state_store_read_time_buckets.clone(),
195        );
196        let iter_scan_duration = register_guarded_histogram_vec_with_registry!(
197            opts,
198            &["table_id", "iter_type"],
199            registry
200        )
201        .unwrap();
202        let iter_scan_duration = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
203            MetricLevel::Critical,
204            iter_scan_duration,
205            metric_level,
206            1,
207        );
208
209        let iter_counts = register_guarded_int_counter_vec_with_registry!(
210            "state_store_iter_counts",
211            "Total number of iter that have been issued to state store",
212            &["table_id", "iter_type"],
213            registry
214        )
215        .unwrap();
216        let iter_counts = RelabeledGuardedIntCounterVec::with_metric_level_relabel_n(
217            MetricLevel::Debug,
218            iter_counts,
219            metric_level,
220            1,
221        );
222
223        let iter_in_progress_counts = register_guarded_int_gauge_vec_with_registry!(
224            "state_store_iter_in_progress_counts",
225            "Total number of existing iter",
226            &["table_id", "iter_type"],
227            registry
228        )
229        .unwrap();
230        let iter_in_progress_counts = RelabeledGuardedIntGaugeVec::with_metric_level_relabel_n(
231            MetricLevel::Debug,
232            iter_in_progress_counts,
233            metric_level,
234            1,
235        );
236
237        let iter_log_op_type_counts = register_guarded_int_counter_vec_with_registry!(
238            "state_store_iter_log_op_type_counts",
239            "Counter of each op type in iter_log",
240            &["table_id", "op_type"],
241            registry
242        )
243        .unwrap();
244
245        let opts = histogram_opts!(
246            "state_store_sync_duration",
247            "Histogram of time spent on compacting shared buffer to remote storage",
248            time_buckets,
249        );
250        let sync_duration = register_histogram_with_registry!(opts, registry).unwrap();
251
252        let opts = histogram_opts!(
253            "state_store_sync_size",
254            "Total size of upload to l0 every epoch",
255            size_buckets,
256        );
257        let sync_size = register_histogram_with_registry!(opts, registry).unwrap();
258
259        let vector_nearest_duration_opts = histogram_opts!(
260            "state_store_vector_nearest_duration",
261            "Total latency of vector nearest that have been issued to state store",
262            state_store_read_time_buckets.clone(),
263        );
264        let vector_nearest_duration = register_guarded_histogram_vec_with_registry!(
265            vector_nearest_duration_opts,
266            &["table_id", "top_n", "ef_search"],
267            registry
268        )
269        .unwrap();
270
271        Self {
272            get_duration,
273            get_key_size,
274            get_value_size,
275            iter_size,
276            iter_item,
277            iter_init_duration,
278            iter_scan_duration,
279            iter_counts,
280            iter_in_progress_counts,
281            iter_log_op_type_counts,
282            vector_nearest_duration,
283            sync_duration,
284            sync_size,
285        }
286    }
287
288    pub fn unused() -> Self {
289        global_storage_metrics(MetricLevel::Disabled)
290    }
291
292    fn local_iter_metrics(&self, table_label: &str) -> LocalIterMetrics {
293        let inner = self.new_local_iter_metrics_inner(table_label, "iter");
294        LocalIterMetrics {
295            inner,
296            report_count: 0,
297        }
298    }
299
300    fn new_local_iter_metrics_inner(
301        &self,
302        table_label: &str,
303        iter_type: &str,
304    ) -> LocalIterMetricsInner {
305        let iter_init_duration = self
306            .iter_init_duration
307            .with_guarded_label_values(&[table_label, iter_type])
308            .local();
309        let iter_counts = self
310            .iter_counts
311            .with_guarded_label_values(&[table_label, iter_type])
312            .local();
313        let iter_scan_duration = self
314            .iter_scan_duration
315            .with_guarded_label_values(&[table_label, iter_type])
316            .local();
317        let iter_item = self
318            .iter_item
319            .with_guarded_label_values(&[table_label, iter_type])
320            .local();
321        let iter_size = self
322            .iter_size
323            .with_guarded_label_values(&[table_label, iter_type])
324            .local();
325        let iter_in_progress_counts = self
326            .iter_in_progress_counts
327            .with_guarded_label_values(&[table_label, iter_type]);
328
329        LocalIterMetricsInner {
330            iter_init_duration,
331            iter_scan_duration,
332            iter_counts,
333            iter_item,
334            iter_size,
335            iter_in_progress_counts,
336        }
337    }
338
339    fn local_iter_log_metrics(&self, table_label: &str) -> LocalIterLogMetrics {
340        let iter_metrics = self.new_local_iter_metrics_inner(table_label, "iter_log");
341        let insert_count = self
342            .iter_log_op_type_counts
343            .with_guarded_label_values(&[table_label, "INSERT"])
344            .local();
345        let update_count = self
346            .iter_log_op_type_counts
347            .with_guarded_label_values(&[table_label, "UPDATE"])
348            .local();
349        let delete_count = self
350            .iter_log_op_type_counts
351            .with_guarded_label_values(&[table_label, "DELETE"])
352            .local();
353        LocalIterLogMetrics {
354            iter_metrics,
355            insert_count,
356            update_count,
357            delete_count,
358            report_count: 0,
359        }
360    }
361
362    fn local_get_metrics(&self, table_label: &str) -> LocalGetMetrics {
363        let get_duration = self
364            .get_duration
365            .with_guarded_label_values(&[table_label])
366            .local();
367        let get_key_size = self
368            .get_key_size
369            .with_guarded_label_values(&[table_label])
370            .local();
371        let get_value_size = self
372            .get_value_size
373            .with_guarded_label_values(&[table_label])
374            .local();
375
376        LocalGetMetrics {
377            get_duration,
378            get_key_size,
379            get_value_size,
380            report_count: 0,
381        }
382    }
383}
384
385struct LocalIterMetricsInner {
386    iter_init_duration: LabelGuardedLocalHistogram,
387    iter_scan_duration: LabelGuardedLocalHistogram,
388    iter_counts: LabelGuardedLocalIntCounter,
389    iter_item: LabelGuardedLocalHistogram,
390    iter_size: LabelGuardedLocalHistogram,
391    iter_in_progress_counts: LabelGuardedIntGauge,
392}
393
394struct LocalIterMetrics {
395    inner: LocalIterMetricsInner,
396    report_count: usize,
397}
398
399impl LocalIterMetrics {
400    fn may_flush(&mut self) {
401        self.report_count += 1;
402        if self.report_count > MAX_FLUSH_TIMES {
403            self.inner.flush();
404            self.report_count = 0;
405        }
406    }
407}
408
409impl LocalIterMetricsInner {
410    fn flush(&mut self) {
411        self.iter_scan_duration.flush();
412        self.iter_init_duration.flush();
413        self.iter_counts.flush();
414        self.iter_item.flush();
415        self.iter_size.flush();
416    }
417}
418
419struct LocalGetMetrics {
420    get_duration: LabelGuardedLocalHistogram,
421    get_key_size: LabelGuardedLocalHistogram,
422    get_value_size: LabelGuardedLocalHistogram,
423    report_count: usize,
424}
425
426impl LocalGetMetrics {
427    fn may_flush(&mut self) {
428        self.report_count += 1;
429        if self.report_count > MAX_FLUSH_TIMES {
430            self.get_duration.flush();
431            self.get_key_size.flush();
432            self.get_value_size.flush();
433            self.report_count = 0;
434        }
435    }
436}
437
438struct LocalIterLogMetrics {
439    iter_metrics: LocalIterMetricsInner,
440    insert_count: LabelGuardedLocalIntCounter,
441    update_count: LabelGuardedLocalIntCounter,
442    delete_count: LabelGuardedLocalIntCounter,
443    report_count: usize,
444}
445
446impl LocalIterLogMetrics {
447    fn may_flush(&mut self) {
448        self.report_count += 1;
449        if self.report_count > MAX_FLUSH_TIMES {
450            self.iter_metrics.flush();
451            self.insert_count.flush();
452            self.update_count.flush();
453            self.delete_count.flush();
454            self.report_count = 0;
455        }
456    }
457}
458
459pub(crate) trait StateStoreIterStatsTrait: Send {
460    type Item: IterItem;
461    fn new(
462        table_id: TableId,
463        metrics: &MonitoredStorageMetrics,
464        iter_init_duration: Duration,
465    ) -> Self;
466    fn observe(&mut self, item: <Self::Item as IterItem>::ItemRef<'_>);
467    fn report(&mut self, table_id: TableId, metrics: &MonitoredStorageMetrics);
468}
469
470const MAX_FLUSH_TIMES: usize = 64;
471
472struct StateStoreIterStatsInner {
473    pub iter_init_duration: Duration,
474    pub iter_scan_time: Instant,
475    pub total_items: usize,
476    pub total_size: usize,
477}
478
479impl StateStoreIterStatsInner {
480    fn new(iter_init_duration: Duration) -> Self {
481        Self {
482            iter_init_duration,
483            iter_scan_time: Instant::now(),
484            total_items: 0,
485            total_size: 0,
486        }
487    }
488}
489
490pub(crate) struct MonitoredStateStoreIterStats<S: StateStoreIterStatsTrait> {
491    pub inner: S,
492    pub table_id: TableId,
493    pub metrics: Arc<MonitoredStorageMetrics>,
494}
495
496impl<S: StateStoreIterStatsTrait> Drop for MonitoredStateStoreIterStats<S> {
497    fn drop(&mut self) {
498        self.inner.report(self.table_id, &self.metrics)
499    }
500}
501
502pub(crate) struct StateStoreIterStats {
503    inner: StateStoreIterStatsInner,
504}
505
506impl StateStoreIterStats {
507    fn for_table_metrics(
508        table_id: TableId,
509        global_metrics: &MonitoredStorageMetrics,
510        f: impl FnOnce(&mut LocalIterMetrics),
511    ) {
512        thread_local!(static LOCAL_ITER_METRICS: RefCell<HashMap<TableId, LocalIterMetrics>> = RefCell::new(HashMap::default()));
513        LOCAL_ITER_METRICS.with_borrow_mut(|local_metrics| {
514            let table_metrics = local_metrics.entry(table_id).or_insert_with(|| {
515                let table_label = table_id.to_string();
516                global_metrics.local_iter_metrics(&table_label)
517            });
518            f(table_metrics)
519        });
520    }
521}
522
523impl StateStoreIterStatsTrait for StateStoreIterStats {
524    type Item = StateStoreKeyedRow;
525
526    fn new(
527        table_id: TableId,
528        metrics: &MonitoredStorageMetrics,
529        iter_init_duration: Duration,
530    ) -> Self {
531        Self::for_table_metrics(table_id, metrics, |metrics| {
532            metrics.inner.iter_in_progress_counts.inc();
533        });
534        Self {
535            inner: StateStoreIterStatsInner::new(iter_init_duration),
536        }
537    }
538
539    fn observe(&mut self, (key, value): StateStoreKeyedRowRef<'_>) {
540        self.inner.total_items += 1;
541        self.inner.total_size += key.encoded_len() + value.len();
542    }
543
544    fn report(&mut self, table_id: TableId, metrics: &MonitoredStorageMetrics) {
545        Self::for_table_metrics(table_id, metrics, |table_metrics| {
546            self.inner.apply_to_local(&mut table_metrics.inner);
547            table_metrics.may_flush();
548        });
549    }
550}
551
552impl StateStoreIterStatsInner {
553    fn apply_to_local(&self, table_metrics: &mut LocalIterMetricsInner) {
554        {
555            let iter_scan_duration = self.iter_scan_time.elapsed();
556            table_metrics
557                .iter_scan_duration
558                .observe(iter_scan_duration.as_secs_f64());
559            table_metrics
560                .iter_init_duration
561                .observe(self.iter_init_duration.as_secs_f64());
562            table_metrics.iter_counts.inc();
563            table_metrics.iter_item.observe(self.total_items as f64);
564            table_metrics.iter_size.observe(self.total_size as f64);
565            table_metrics.iter_in_progress_counts.dec();
566        }
567    }
568}
569
570pub(crate) struct StateStoreIterLogStats {
571    inner: StateStoreIterStatsInner,
572    insert_count: u64,
573    update_count: u64,
574    delete_count: u64,
575}
576
577impl StateStoreIterLogStats {
578    fn for_table_metrics(
579        table_id: TableId,
580        global_metrics: &MonitoredStorageMetrics,
581        f: impl FnOnce(&mut LocalIterLogMetrics),
582    ) {
583        thread_local!(static LOCAL_ITER_LOG_METRICS: RefCell<HashMap<TableId, LocalIterLogMetrics>> = RefCell::new(HashMap::default()));
584        LOCAL_ITER_LOG_METRICS.with_borrow_mut(|local_metrics| {
585            let table_metrics = local_metrics.entry(table_id).or_insert_with(|| {
586                let table_label = table_id.to_string();
587                global_metrics.local_iter_log_metrics(&table_label)
588            });
589            f(table_metrics)
590        });
591    }
592}
593
594impl StateStoreIterStatsTrait for StateStoreIterLogStats {
595    type Item = StateStoreReadLogItem;
596
597    fn new(
598        table_id: TableId,
599        metrics: &MonitoredStorageMetrics,
600        iter_init_duration: Duration,
601    ) -> Self {
602        Self::for_table_metrics(table_id, metrics, |metrics| {
603            metrics.iter_metrics.iter_in_progress_counts.inc();
604        });
605        Self {
606            inner: StateStoreIterStatsInner::new(iter_init_duration),
607            insert_count: 0,
608            update_count: 0,
609            delete_count: 0,
610        }
611    }
612
613    fn observe(&mut self, (key, change_log): StateStoreReadLogItemRef<'_>) {
614        self.inner.total_items += 1;
615        let value_len = match change_log {
616            ChangeLogValue::Insert(value) => {
617                self.insert_count += 1;
618                value.len()
619            }
620            ChangeLogValue::Update {
621                old_value,
622                new_value,
623            } => {
624                self.update_count += 1;
625                old_value.len() + new_value.len()
626            }
627            ChangeLogValue::Delete(value) => {
628                self.delete_count += 1;
629                value.len()
630            }
631        };
632        self.inner.total_size += key.len() + value_len;
633    }
634
635    fn report(&mut self, table_id: TableId, metrics: &MonitoredStorageMetrics) {
636        Self::for_table_metrics(table_id, metrics, |table_metrics| {
637            self.inner.apply_to_local(&mut table_metrics.iter_metrics);
638            table_metrics.insert_count.inc_by(self.insert_count);
639            table_metrics.update_count.inc_by(self.update_count);
640            table_metrics.delete_count.inc_by(self.delete_count);
641            table_metrics.may_flush();
642        });
643    }
644}
645
646pub(crate) struct MonitoredStateStoreGetStats {
647    pub get_duration: Instant,
648    pub get_key_size: usize,
649    pub get_value_size: usize,
650    pub table_id: TableId,
651    pub metrics: Arc<MonitoredStorageMetrics>,
652}
653
654impl MonitoredStateStoreGetStats {
655    pub(crate) fn new(table_id: TableId, metrics: Arc<MonitoredStorageMetrics>) -> Self {
656        Self {
657            get_duration: Instant::now(),
658            get_key_size: 0,
659            get_value_size: 0,
660            table_id,
661            metrics,
662        }
663    }
664
665    pub(crate) fn report(&self) {
666        thread_local!(static LOCAL_GET_METRICS: RefCell<HashMap<TableId, LocalGetMetrics>> = RefCell::new(HashMap::default()));
667        LOCAL_GET_METRICS.with_borrow_mut(|local_metrics| {
668            let table_metrics = local_metrics.entry(self.table_id).or_insert_with(|| {
669                let table_label = self.table_id.to_string();
670                self.metrics.local_get_metrics(&table_label)
671            });
672            let get_duration = self.get_duration.elapsed();
673            table_metrics
674                .get_duration
675                .observe(get_duration.as_secs_f64());
676            table_metrics.get_key_size.observe(self.get_key_size as _);
677            if self.get_value_size > 0 {
678                table_metrics
679                    .get_value_size
680                    .observe(self.get_value_size as _);
681            }
682            table_metrics.may_flush();
683        });
684    }
685}