risingwave_storage/monitor/
monitored_storage_metrics.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cell::RefCell;
16use std::collections::HashMap;
17use std::sync::{Arc, OnceLock};
18use std::time::{Duration, Instant};
19
20use prometheus::{
21    Histogram, Registry, exponential_buckets, histogram_opts, linear_buckets,
22    register_histogram_with_registry,
23};
24use risingwave_common::config::MetricLevel;
25use risingwave_common::id::TableId;
26use risingwave_common::metrics::{
27    LabelGuardedIntCounterVec, LabelGuardedIntGauge, LabelGuardedLocalHistogram,
28    LabelGuardedLocalIntCounter, RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec,
29    RelabeledGuardedIntGaugeVec,
30};
31use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
32use risingwave_common::{
33    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
34    register_guarded_int_gauge_vec_with_registry,
35};
36
37use crate::store::{
38    ChangeLogValue, IterItem, StateStoreKeyedRow, StateStoreKeyedRowRef, StateStoreReadLogItem,
39    StateStoreReadLogItemRef,
40};
41
42/// [`MonitoredStorageMetrics`] stores the performance and IO metrics of Storage.
43#[derive(Debug, Clone)]
44pub struct MonitoredStorageMetrics {
45    pub get_duration: RelabeledGuardedHistogramVec,
46    pub get_key_size: RelabeledGuardedHistogramVec,
47    pub get_value_size: RelabeledGuardedHistogramVec,
48
49    // [table_id, iter_type: {"iter", "iter_log"}]
50    pub iter_size: RelabeledGuardedHistogramVec,
51    pub iter_item: RelabeledGuardedHistogramVec,
52    pub iter_init_duration: RelabeledGuardedHistogramVec,
53    pub iter_scan_duration: RelabeledGuardedHistogramVec,
54    pub iter_counts: RelabeledGuardedIntCounterVec,
55    pub iter_in_progress_counts: RelabeledGuardedIntGaugeVec,
56
57    // [table_id, op_type]
58    pub iter_log_op_type_counts: LabelGuardedIntCounterVec,
59
60    pub sync_duration: Histogram,
61    pub sync_size: Histogram,
62}
63
64pub static GLOBAL_STORAGE_METRICS: OnceLock<MonitoredStorageMetrics> = OnceLock::new();
65
66pub fn global_storage_metrics(metric_level: MetricLevel) -> MonitoredStorageMetrics {
67    GLOBAL_STORAGE_METRICS
68        .get_or_init(|| MonitoredStorageMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
69        .clone()
70}
71
72impl MonitoredStorageMetrics {
73    pub fn new(registry: &Registry, metric_level: MetricLevel) -> Self {
74        // 256B ~ max 64GB
75        let size_buckets = exponential_buckets(256.0, 16.0, 8).unwrap();
76        // 10ms ~ max 2.7h
77        let time_buckets = exponential_buckets(0.01, 10.0, 7).unwrap();
78        // ----- get -----
79        let opts = histogram_opts!(
80            "state_store_get_key_size",
81            "Total key bytes of get that have been issued to state store",
82            size_buckets.clone()
83        );
84        let get_key_size =
85            register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
86        let get_key_size = RelabeledGuardedHistogramVec::with_metric_level(
87            MetricLevel::Debug,
88            get_key_size,
89            metric_level,
90        );
91
92        let opts = histogram_opts!(
93            "state_store_get_value_size",
94            "Total value bytes that have been requested from remote storage",
95            size_buckets.clone()
96        );
97        let get_value_size =
98            register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
99        let get_value_size = RelabeledGuardedHistogramVec::with_metric_level(
100            MetricLevel::Debug,
101            get_value_size,
102            metric_level,
103        );
104
105        let mut buckets = exponential_buckets(0.000004, 2.0, 4).unwrap(); // 4 ~ 32us
106        buckets.extend(linear_buckets(0.00006, 0.00004, 5).unwrap()); // 60 ~ 220us.
107        buckets.extend(linear_buckets(0.0003, 0.0001, 3).unwrap()); // 300 ~ 500us.
108        buckets.extend(exponential_buckets(0.001, 2.0, 5).unwrap()); // 1 ~ 16ms.
109        buckets.extend(exponential_buckets(0.05, 4.0, 5).unwrap()); // 0.05 ~ 1.28s.
110        buckets.push(16.0); // 16s
111
112        // 1ms - 100s
113        let mut state_store_read_time_buckets = exponential_buckets(0.001, 10.0, 5).unwrap();
114        state_store_read_time_buckets.push(40.0);
115        state_store_read_time_buckets.push(100.0);
116
117        let get_duration_opts = histogram_opts!(
118            "state_store_get_duration",
119            "Total latency of get that have been issued to state store",
120            state_store_read_time_buckets.clone(),
121        );
122        let get_duration = register_guarded_histogram_vec_with_registry!(
123            get_duration_opts,
124            &["table_id"],
125            registry
126        )
127        .unwrap();
128        let get_duration = RelabeledGuardedHistogramVec::with_metric_level(
129            MetricLevel::Critical,
130            get_duration,
131            metric_level,
132        );
133
134        let opts = histogram_opts!(
135            "state_store_iter_size",
136            "Total bytes gotten from state store scan(), for calculating read throughput",
137            size_buckets.clone()
138        );
139        let iter_size = register_guarded_histogram_vec_with_registry!(
140            opts,
141            &["table_id", "iter_type"],
142            registry
143        )
144        .unwrap();
145        let iter_size = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
146            MetricLevel::Debug,
147            iter_size,
148            metric_level,
149            1,
150        );
151
152        let opts = histogram_opts!(
153            "state_store_iter_item",
154            "Total bytes gotten from state store scan(), for calculating read throughput",
155            size_buckets.clone(),
156        );
157        let iter_item = register_guarded_histogram_vec_with_registry!(
158            opts,
159            &["table_id", "iter_type"],
160            registry
161        )
162        .unwrap();
163        let iter_item = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
164            MetricLevel::Debug,
165            iter_item,
166            metric_level,
167            1,
168        );
169
170        let opts = histogram_opts!(
171            "state_store_iter_init_duration",
172            "Histogram of the time spent on iterator initialization.",
173            state_store_read_time_buckets.clone(),
174        );
175        let iter_init_duration = register_guarded_histogram_vec_with_registry!(
176            opts,
177            &["table_id", "iter_type"],
178            registry
179        )
180        .unwrap();
181        let iter_init_duration = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
182            MetricLevel::Critical,
183            iter_init_duration,
184            metric_level,
185            1,
186        );
187
188        let opts = histogram_opts!(
189            "state_store_iter_scan_duration",
190            "Histogram of the time spent on iterator scanning.",
191            state_store_read_time_buckets.clone(),
192        );
193        let iter_scan_duration = register_guarded_histogram_vec_with_registry!(
194            opts,
195            &["table_id", "iter_type"],
196            registry
197        )
198        .unwrap();
199        let iter_scan_duration = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
200            MetricLevel::Critical,
201            iter_scan_duration,
202            metric_level,
203            1,
204        );
205
206        let iter_counts = register_guarded_int_counter_vec_with_registry!(
207            "state_store_iter_counts",
208            "Total number of iter that have been issued to state store",
209            &["table_id", "iter_type"],
210            registry
211        )
212        .unwrap();
213        let iter_counts = RelabeledGuardedIntCounterVec::with_metric_level_relabel_n(
214            MetricLevel::Debug,
215            iter_counts,
216            metric_level,
217            1,
218        );
219
220        let iter_in_progress_counts = register_guarded_int_gauge_vec_with_registry!(
221            "state_store_iter_in_progress_counts",
222            "Total number of existing iter",
223            &["table_id", "iter_type"],
224            registry
225        )
226        .unwrap();
227        let iter_in_progress_counts = RelabeledGuardedIntGaugeVec::with_metric_level_relabel_n(
228            MetricLevel::Debug,
229            iter_in_progress_counts,
230            metric_level,
231            1,
232        );
233
234        let iter_log_op_type_counts = register_guarded_int_counter_vec_with_registry!(
235            "state_store_iter_log_op_type_counts",
236            "Counter of each op type in iter_log",
237            &["table_id", "op_type"],
238            registry
239        )
240        .unwrap();
241
242        let opts = histogram_opts!(
243            "state_store_sync_duration",
244            "Histogram of time spent on compacting shared buffer to remote storage",
245            time_buckets,
246        );
247        let sync_duration = register_histogram_with_registry!(opts, registry).unwrap();
248
249        let opts = histogram_opts!(
250            "state_store_sync_size",
251            "Total size of upload to l0 every epoch",
252            size_buckets,
253        );
254        let sync_size = register_histogram_with_registry!(opts, registry).unwrap();
255
256        Self {
257            get_duration,
258            get_key_size,
259            get_value_size,
260            iter_size,
261            iter_item,
262            iter_init_duration,
263            iter_scan_duration,
264            iter_counts,
265            iter_in_progress_counts,
266            iter_log_op_type_counts,
267            sync_duration,
268            sync_size,
269        }
270    }
271
272    pub fn unused() -> Self {
273        global_storage_metrics(MetricLevel::Disabled)
274    }
275
276    fn local_iter_metrics(&self, table_label: &str) -> LocalIterMetrics {
277        let inner = self.new_local_iter_metrics_inner(table_label, "iter");
278        LocalIterMetrics {
279            inner,
280            report_count: 0,
281        }
282    }
283
284    fn new_local_iter_metrics_inner(
285        &self,
286        table_label: &str,
287        iter_type: &str,
288    ) -> LocalIterMetricsInner {
289        let iter_init_duration = self
290            .iter_init_duration
291            .with_guarded_label_values(&[table_label, iter_type])
292            .local();
293        let iter_counts = self
294            .iter_counts
295            .with_guarded_label_values(&[table_label, iter_type])
296            .local();
297        let iter_scan_duration = self
298            .iter_scan_duration
299            .with_guarded_label_values(&[table_label, iter_type])
300            .local();
301        let iter_item = self
302            .iter_item
303            .with_guarded_label_values(&[table_label, iter_type])
304            .local();
305        let iter_size = self
306            .iter_size
307            .with_guarded_label_values(&[table_label, iter_type])
308            .local();
309        let iter_in_progress_counts = self
310            .iter_in_progress_counts
311            .with_guarded_label_values(&[table_label, iter_type]);
312
313        LocalIterMetricsInner {
314            iter_init_duration,
315            iter_scan_duration,
316            iter_counts,
317            iter_item,
318            iter_size,
319            iter_in_progress_counts,
320        }
321    }
322
323    fn local_iter_log_metrics(&self, table_label: &str) -> LocalIterLogMetrics {
324        let iter_metrics = self.new_local_iter_metrics_inner(table_label, "iter_log");
325        let insert_count = self
326            .iter_log_op_type_counts
327            .with_guarded_label_values(&[table_label, "INSERT"])
328            .local();
329        let update_count = self
330            .iter_log_op_type_counts
331            .with_guarded_label_values(&[table_label, "UPDATE"])
332            .local();
333        let delete_count = self
334            .iter_log_op_type_counts
335            .with_guarded_label_values(&[table_label, "DELETE"])
336            .local();
337        LocalIterLogMetrics {
338            iter_metrics,
339            insert_count,
340            update_count,
341            delete_count,
342            report_count: 0,
343        }
344    }
345
346    fn local_get_metrics(&self, table_label: &str) -> LocalGetMetrics {
347        let get_duration = self
348            .get_duration
349            .with_guarded_label_values(&[table_label])
350            .local();
351        let get_key_size = self
352            .get_key_size
353            .with_guarded_label_values(&[table_label])
354            .local();
355        let get_value_size = self
356            .get_value_size
357            .with_guarded_label_values(&[table_label])
358            .local();
359
360        LocalGetMetrics {
361            get_duration,
362            get_key_size,
363            get_value_size,
364            report_count: 0,
365        }
366    }
367}
368
369struct LocalIterMetricsInner {
370    iter_init_duration: LabelGuardedLocalHistogram,
371    iter_scan_duration: LabelGuardedLocalHistogram,
372    iter_counts: LabelGuardedLocalIntCounter,
373    iter_item: LabelGuardedLocalHistogram,
374    iter_size: LabelGuardedLocalHistogram,
375    iter_in_progress_counts: LabelGuardedIntGauge,
376}
377
378struct LocalIterMetrics {
379    inner: LocalIterMetricsInner,
380    report_count: usize,
381}
382
383impl LocalIterMetrics {
384    fn may_flush(&mut self) {
385        self.report_count += 1;
386        if self.report_count > MAX_FLUSH_TIMES {
387            self.inner.flush();
388            self.report_count = 0;
389        }
390    }
391}
392
393impl LocalIterMetricsInner {
394    fn flush(&mut self) {
395        self.iter_scan_duration.flush();
396        self.iter_init_duration.flush();
397        self.iter_counts.flush();
398        self.iter_item.flush();
399        self.iter_size.flush();
400    }
401}
402
403struct LocalGetMetrics {
404    get_duration: LabelGuardedLocalHistogram,
405    get_key_size: LabelGuardedLocalHistogram,
406    get_value_size: LabelGuardedLocalHistogram,
407    report_count: usize,
408}
409
410impl LocalGetMetrics {
411    fn may_flush(&mut self) {
412        self.report_count += 1;
413        if self.report_count > MAX_FLUSH_TIMES {
414            self.get_duration.flush();
415            self.get_key_size.flush();
416            self.get_value_size.flush();
417            self.report_count = 0;
418        }
419    }
420}
421
422struct LocalIterLogMetrics {
423    iter_metrics: LocalIterMetricsInner,
424    insert_count: LabelGuardedLocalIntCounter,
425    update_count: LabelGuardedLocalIntCounter,
426    delete_count: LabelGuardedLocalIntCounter,
427    report_count: usize,
428}
429
430impl LocalIterLogMetrics {
431    fn may_flush(&mut self) {
432        self.report_count += 1;
433        if self.report_count > MAX_FLUSH_TIMES {
434            self.iter_metrics.flush();
435            self.insert_count.flush();
436            self.update_count.flush();
437            self.delete_count.flush();
438            self.report_count = 0;
439        }
440    }
441}
442
443pub(crate) trait StateStoreIterStatsTrait: Send {
444    type Item: IterItem;
445    fn new(
446        table_id: TableId,
447        metrics: &MonitoredStorageMetrics,
448        iter_init_duration: Duration,
449    ) -> Self;
450    fn observe(&mut self, item: <Self::Item as IterItem>::ItemRef<'_>);
451    fn report(&mut self, table_id: TableId, metrics: &MonitoredStorageMetrics);
452}
453
454const MAX_FLUSH_TIMES: usize = 64;
455
456struct StateStoreIterStatsInner {
457    pub iter_init_duration: Duration,
458    pub iter_scan_time: Instant,
459    pub total_items: usize,
460    pub total_size: usize,
461}
462
463impl StateStoreIterStatsInner {
464    fn new(iter_init_duration: Duration) -> Self {
465        Self {
466            iter_init_duration,
467            iter_scan_time: Instant::now(),
468            total_items: 0,
469            total_size: 0,
470        }
471    }
472}
473
474pub(crate) struct MonitoredStateStoreIterStats<S: StateStoreIterStatsTrait> {
475    pub inner: S,
476    pub table_id: TableId,
477    pub metrics: Arc<MonitoredStorageMetrics>,
478}
479
480impl<S: StateStoreIterStatsTrait> Drop for MonitoredStateStoreIterStats<S> {
481    fn drop(&mut self) {
482        self.inner.report(self.table_id, &self.metrics)
483    }
484}
485
486pub(crate) struct StateStoreIterStats {
487    inner: StateStoreIterStatsInner,
488}
489
490impl StateStoreIterStats {
491    fn for_table_metrics(
492        table_id: TableId,
493        global_metrics: &MonitoredStorageMetrics,
494        f: impl FnOnce(&mut LocalIterMetrics),
495    ) {
496        thread_local!(static LOCAL_ITER_METRICS: RefCell<HashMap<TableId, LocalIterMetrics>> = RefCell::new(HashMap::default()));
497        LOCAL_ITER_METRICS.with_borrow_mut(|local_metrics| {
498            let table_metrics = local_metrics.entry(table_id).or_insert_with(|| {
499                let table_label = table_id.to_string();
500                global_metrics.local_iter_metrics(&table_label)
501            });
502            f(table_metrics)
503        });
504    }
505}
506
507impl StateStoreIterStatsTrait for StateStoreIterStats {
508    type Item = StateStoreKeyedRow;
509
510    fn new(
511        table_id: TableId,
512        metrics: &MonitoredStorageMetrics,
513        iter_init_duration: Duration,
514    ) -> Self {
515        Self::for_table_metrics(table_id, metrics, |metrics| {
516            metrics.inner.iter_in_progress_counts.inc();
517        });
518        Self {
519            inner: StateStoreIterStatsInner::new(iter_init_duration),
520        }
521    }
522
523    fn observe(&mut self, (key, value): StateStoreKeyedRowRef<'_>) {
524        self.inner.total_items += 1;
525        self.inner.total_size += key.encoded_len() + value.len();
526    }
527
528    fn report(&mut self, table_id: TableId, metrics: &MonitoredStorageMetrics) {
529        Self::for_table_metrics(table_id, metrics, |table_metrics| {
530            self.inner.apply_to_local(&mut table_metrics.inner);
531            table_metrics.may_flush();
532        });
533    }
534}
535
536impl StateStoreIterStatsInner {
537    fn apply_to_local(&self, table_metrics: &mut LocalIterMetricsInner) {
538        {
539            let iter_scan_duration = self.iter_scan_time.elapsed();
540            table_metrics
541                .iter_scan_duration
542                .observe(iter_scan_duration.as_secs_f64());
543            table_metrics
544                .iter_init_duration
545                .observe(self.iter_init_duration.as_secs_f64());
546            table_metrics.iter_counts.inc();
547            table_metrics.iter_item.observe(self.total_items as f64);
548            table_metrics.iter_size.observe(self.total_size as f64);
549            table_metrics.iter_in_progress_counts.dec();
550        }
551    }
552}
553
554pub(crate) struct StateStoreIterLogStats {
555    inner: StateStoreIterStatsInner,
556    insert_count: u64,
557    update_count: u64,
558    delete_count: u64,
559}
560
561impl StateStoreIterLogStats {
562    fn for_table_metrics(
563        table_id: TableId,
564        global_metrics: &MonitoredStorageMetrics,
565        f: impl FnOnce(&mut LocalIterLogMetrics),
566    ) {
567        thread_local!(static LOCAL_ITER_LOG_METRICS: RefCell<HashMap<TableId, LocalIterLogMetrics>> = RefCell::new(HashMap::default()));
568        LOCAL_ITER_LOG_METRICS.with_borrow_mut(|local_metrics| {
569            let table_metrics = local_metrics.entry(table_id).or_insert_with(|| {
570                let table_label = table_id.to_string();
571                global_metrics.local_iter_log_metrics(&table_label)
572            });
573            f(table_metrics)
574        });
575    }
576}
577
578impl StateStoreIterStatsTrait for StateStoreIterLogStats {
579    type Item = StateStoreReadLogItem;
580
581    fn new(
582        table_id: TableId,
583        metrics: &MonitoredStorageMetrics,
584        iter_init_duration: Duration,
585    ) -> Self {
586        Self::for_table_metrics(table_id, metrics, |metrics| {
587            metrics.iter_metrics.iter_in_progress_counts.inc();
588        });
589        Self {
590            inner: StateStoreIterStatsInner::new(iter_init_duration),
591            insert_count: 0,
592            update_count: 0,
593            delete_count: 0,
594        }
595    }
596
597    fn observe(&mut self, (key, change_log): StateStoreReadLogItemRef<'_>) {
598        self.inner.total_items += 1;
599        let value_len = match change_log {
600            ChangeLogValue::Insert(value) => {
601                self.insert_count += 1;
602                value.len()
603            }
604            ChangeLogValue::Update {
605                old_value,
606                new_value,
607            } => {
608                self.update_count += 1;
609                old_value.len() + new_value.len()
610            }
611            ChangeLogValue::Delete(value) => {
612                self.delete_count += 1;
613                value.len()
614            }
615        };
616        self.inner.total_size += key.len() + value_len;
617    }
618
619    fn report(&mut self, table_id: TableId, metrics: &MonitoredStorageMetrics) {
620        Self::for_table_metrics(table_id, metrics, |table_metrics| {
621            self.inner.apply_to_local(&mut table_metrics.iter_metrics);
622            table_metrics.insert_count.inc_by(self.insert_count);
623            table_metrics.update_count.inc_by(self.update_count);
624            table_metrics.delete_count.inc_by(self.delete_count);
625            table_metrics.may_flush();
626        });
627    }
628}
629
630pub(crate) struct MonitoredStateStoreGetStats {
631    pub get_duration: Instant,
632    pub get_key_size: usize,
633    pub get_value_size: usize,
634    pub table_id: TableId,
635    pub metrics: Arc<MonitoredStorageMetrics>,
636}
637
638impl MonitoredStateStoreGetStats {
639    pub(crate) fn new(table_id: TableId, metrics: Arc<MonitoredStorageMetrics>) -> Self {
640        Self {
641            get_duration: Instant::now(),
642            get_key_size: 0,
643            get_value_size: 0,
644            table_id,
645            metrics,
646        }
647    }
648
649    pub(crate) fn report(&self) {
650        thread_local!(static LOCAL_GET_METRICS: RefCell<HashMap<TableId, LocalGetMetrics>> = RefCell::new(HashMap::default()));
651        LOCAL_GET_METRICS.with_borrow_mut(|local_metrics| {
652            let table_metrics = local_metrics.entry(self.table_id).or_insert_with(|| {
653                let table_label = self.table_id.to_string();
654                self.metrics.local_get_metrics(&table_label)
655            });
656            let get_duration = self.get_duration.elapsed();
657            table_metrics
658                .get_duration
659                .observe(get_duration.as_secs_f64());
660            table_metrics.get_key_size.observe(self.get_key_size as _);
661            if self.get_value_size > 0 {
662                table_metrics
663                    .get_value_size
664                    .observe(self.get_value_size as _);
665            }
666            table_metrics.may_flush();
667        });
668    }
669}