risingwave_storage/monitor/
monitored_storage_metrics.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cell::RefCell;
16use std::collections::HashMap;
17use std::sync::{Arc, OnceLock};
18use std::time::{Duration, Instant};
19
20use prometheus::{
21    Histogram, Registry, exponential_buckets, histogram_opts, linear_buckets,
22    register_histogram_with_registry,
23};
24use risingwave_common::config::MetricLevel;
25use risingwave_common::id::TableId;
26use risingwave_common::metrics::{
27    LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGauge,
28    LabelGuardedLocalHistogram, LabelGuardedLocalIntCounter, RelabeledGuardedHistogramVec,
29    RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
30};
31use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
32use risingwave_common::{
33    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
34    register_guarded_int_gauge_vec_with_registry,
35};
36
37use crate::store::{
38    ChangeLogValue, IterItem, StateStoreKeyedRow, StateStoreKeyedRowRef, StateStoreReadLogItem,
39    StateStoreReadLogItemRef,
40};
41
42/// [`MonitoredStorageMetrics`] stores the performance and IO metrics of Storage.
43#[derive(Debug, Clone)]
44pub struct MonitoredStorageMetrics {
45    pub get_duration: RelabeledGuardedHistogramVec,
46    pub get_key_size: RelabeledGuardedHistogramVec,
47    pub get_value_size: RelabeledGuardedHistogramVec,
48
49    // [table_id, iter_type: {"iter", "iter_log"}]
50    pub iter_size: RelabeledGuardedHistogramVec,
51    pub iter_item: RelabeledGuardedHistogramVec,
52    pub iter_init_duration: RelabeledGuardedHistogramVec,
53    pub iter_scan_duration: RelabeledGuardedHistogramVec,
54    pub iter_counts: RelabeledGuardedIntCounterVec,
55    pub iter_in_progress_counts: RelabeledGuardedIntGaugeVec,
56
57    // [table_id, op_type]
58    pub iter_log_op_type_counts: LabelGuardedIntCounterVec,
59
60    // [table_id, top_n, ef_search]
61    pub vector_nearest_duration: LabelGuardedHistogramVec,
62
63    pub sync_duration: Histogram,
64    pub sync_size: Histogram,
65}
66
67pub static GLOBAL_STORAGE_METRICS: OnceLock<MonitoredStorageMetrics> = OnceLock::new();
68
69pub fn global_storage_metrics(metric_level: MetricLevel) -> MonitoredStorageMetrics {
70    GLOBAL_STORAGE_METRICS
71        .get_or_init(|| MonitoredStorageMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
72        .clone()
73}
74
75impl MonitoredStorageMetrics {
76    pub fn new(registry: &Registry, metric_level: MetricLevel) -> Self {
77        // 256B ~ max 64GB
78        let size_buckets = exponential_buckets(256.0, 16.0, 8).unwrap();
79        // Dedicated buckets for sync size: 16MB ~ 1TB (17 buckets, x2 growth)
80        let sync_size_buckets = exponential_buckets(16.0 * 1024.0 * 1024.0, 2.0, 17).unwrap();
81        // 10ms ~ max 2.7h
82        let time_buckets = exponential_buckets(0.01, 10.0, 7).unwrap();
83        // ----- get -----
84        let opts = histogram_opts!(
85            "state_store_get_key_size",
86            "Total key bytes of get that have been issued to state store",
87            size_buckets.clone()
88        );
89        let get_key_size =
90            register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
91        let get_key_size = RelabeledGuardedHistogramVec::with_metric_level(
92            MetricLevel::Debug,
93            get_key_size,
94            metric_level,
95        );
96
97        let opts = histogram_opts!(
98            "state_store_get_value_size",
99            "Total value bytes that have been requested from remote storage",
100            size_buckets.clone()
101        );
102        let get_value_size =
103            register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
104        let get_value_size = RelabeledGuardedHistogramVec::with_metric_level(
105            MetricLevel::Debug,
106            get_value_size,
107            metric_level,
108        );
109
110        let mut buckets = exponential_buckets(0.000004, 2.0, 4).unwrap(); // 4 ~ 32us
111        buckets.extend(linear_buckets(0.00006, 0.00004, 5).unwrap()); // 60 ~ 220us.
112        buckets.extend(linear_buckets(0.0003, 0.0001, 3).unwrap()); // 300 ~ 500us.
113        buckets.extend(exponential_buckets(0.001, 2.0, 5).unwrap()); // 1 ~ 16ms.
114        buckets.extend(exponential_buckets(0.05, 4.0, 5).unwrap()); // 0.05 ~ 1.28s.
115        buckets.push(16.0); // 16s
116
117        // 1ms - 100s
118        let mut state_store_read_time_buckets = exponential_buckets(0.001, 10.0, 5).unwrap();
119        state_store_read_time_buckets.push(40.0);
120        state_store_read_time_buckets.push(100.0);
121
122        let get_duration_opts = histogram_opts!(
123            "state_store_get_duration",
124            "Total latency of get that have been issued to state store",
125            state_store_read_time_buckets.clone(),
126        );
127        let get_duration = register_guarded_histogram_vec_with_registry!(
128            get_duration_opts,
129            &["table_id"],
130            registry
131        )
132        .unwrap();
133        let get_duration = RelabeledGuardedHistogramVec::with_metric_level(
134            MetricLevel::Critical,
135            get_duration,
136            metric_level,
137        );
138
139        let opts = histogram_opts!(
140            "state_store_iter_size",
141            "Total bytes gotten from state store scan(), for calculating read throughput",
142            size_buckets.clone()
143        );
144        let iter_size = register_guarded_histogram_vec_with_registry!(
145            opts,
146            &["table_id", "iter_type"],
147            registry
148        )
149        .unwrap();
150        let iter_size = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
151            MetricLevel::Debug,
152            iter_size,
153            metric_level,
154            1,
155        );
156
157        let opts = histogram_opts!(
158            "state_store_iter_item",
159            "Total bytes gotten from state store scan(), for calculating read throughput",
160            size_buckets,
161        );
162        let iter_item = register_guarded_histogram_vec_with_registry!(
163            opts,
164            &["table_id", "iter_type"],
165            registry
166        )
167        .unwrap();
168        let iter_item = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
169            MetricLevel::Debug,
170            iter_item,
171            metric_level,
172            1,
173        );
174
175        let opts = histogram_opts!(
176            "state_store_iter_init_duration",
177            "Histogram of the time spent on iterator initialization.",
178            state_store_read_time_buckets.clone(),
179        );
180        let iter_init_duration = register_guarded_histogram_vec_with_registry!(
181            opts,
182            &["table_id", "iter_type"],
183            registry
184        )
185        .unwrap();
186        let iter_init_duration = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
187            MetricLevel::Critical,
188            iter_init_duration,
189            metric_level,
190            1,
191        );
192
193        let opts = histogram_opts!(
194            "state_store_iter_scan_duration",
195            "Histogram of the time spent on iterator scanning.",
196            state_store_read_time_buckets.clone(),
197        );
198        let iter_scan_duration = register_guarded_histogram_vec_with_registry!(
199            opts,
200            &["table_id", "iter_type"],
201            registry
202        )
203        .unwrap();
204        let iter_scan_duration = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
205            MetricLevel::Critical,
206            iter_scan_duration,
207            metric_level,
208            1,
209        );
210
211        let iter_counts = register_guarded_int_counter_vec_with_registry!(
212            "state_store_iter_counts",
213            "Total number of iter that have been issued to state store",
214            &["table_id", "iter_type"],
215            registry
216        )
217        .unwrap();
218        let iter_counts = RelabeledGuardedIntCounterVec::with_metric_level_relabel_n(
219            MetricLevel::Debug,
220            iter_counts,
221            metric_level,
222            1,
223        );
224
225        let iter_in_progress_counts = register_guarded_int_gauge_vec_with_registry!(
226            "state_store_iter_in_progress_counts",
227            "Total number of existing iter",
228            &["table_id", "iter_type"],
229            registry
230        )
231        .unwrap();
232        let iter_in_progress_counts = RelabeledGuardedIntGaugeVec::with_metric_level_relabel_n(
233            MetricLevel::Debug,
234            iter_in_progress_counts,
235            metric_level,
236            1,
237        );
238
239        let iter_log_op_type_counts = register_guarded_int_counter_vec_with_registry!(
240            "state_store_iter_log_op_type_counts",
241            "Counter of each op type in iter_log",
242            &["table_id", "op_type"],
243            registry
244        )
245        .unwrap();
246
247        let opts = histogram_opts!(
248            "state_store_sync_duration",
249            "Histogram of time spent on compacting shared buffer to remote storage",
250            time_buckets,
251        );
252        let sync_duration = register_histogram_with_registry!(opts, registry).unwrap();
253
254        let opts = histogram_opts!(
255            "state_store_sync_size",
256            "Total size of upload to l0 every epoch",
257            sync_size_buckets,
258        );
259        let sync_size = register_histogram_with_registry!(opts, registry).unwrap();
260
261        let vector_nearest_duration_opts = histogram_opts!(
262            "state_store_vector_nearest_duration",
263            "Total latency of vector nearest that have been issued to state store",
264            state_store_read_time_buckets.clone(),
265        );
266        let vector_nearest_duration = register_guarded_histogram_vec_with_registry!(
267            vector_nearest_duration_opts,
268            &["table_id", "top_n", "ef_search"],
269            registry
270        )
271        .unwrap();
272
273        Self {
274            get_duration,
275            get_key_size,
276            get_value_size,
277            iter_size,
278            iter_item,
279            iter_init_duration,
280            iter_scan_duration,
281            iter_counts,
282            iter_in_progress_counts,
283            iter_log_op_type_counts,
284            vector_nearest_duration,
285            sync_duration,
286            sync_size,
287        }
288    }
289
290    pub fn unused() -> Self {
291        global_storage_metrics(MetricLevel::Disabled)
292    }
293
294    fn local_iter_metrics(&self, table_label: &str) -> LocalIterMetrics {
295        let inner = self.new_local_iter_metrics_inner(table_label, "iter");
296        LocalIterMetrics {
297            inner,
298            report_count: 0,
299        }
300    }
301
302    fn new_local_iter_metrics_inner(
303        &self,
304        table_label: &str,
305        iter_type: &str,
306    ) -> LocalIterMetricsInner {
307        let iter_init_duration = self
308            .iter_init_duration
309            .with_guarded_label_values(&[table_label, iter_type])
310            .local();
311        let iter_counts = self
312            .iter_counts
313            .with_guarded_label_values(&[table_label, iter_type])
314            .local();
315        let iter_scan_duration = self
316            .iter_scan_duration
317            .with_guarded_label_values(&[table_label, iter_type])
318            .local();
319        let iter_item = self
320            .iter_item
321            .with_guarded_label_values(&[table_label, iter_type])
322            .local();
323        let iter_size = self
324            .iter_size
325            .with_guarded_label_values(&[table_label, iter_type])
326            .local();
327        let iter_in_progress_counts = self
328            .iter_in_progress_counts
329            .with_guarded_label_values(&[table_label, iter_type]);
330
331        LocalIterMetricsInner {
332            iter_init_duration,
333            iter_scan_duration,
334            iter_counts,
335            iter_item,
336            iter_size,
337            iter_in_progress_counts,
338        }
339    }
340
341    fn local_iter_log_metrics(&self, table_label: &str) -> LocalIterLogMetrics {
342        let iter_metrics = self.new_local_iter_metrics_inner(table_label, "iter_log");
343        let insert_count = self
344            .iter_log_op_type_counts
345            .with_guarded_label_values(&[table_label, "INSERT"])
346            .local();
347        let update_count = self
348            .iter_log_op_type_counts
349            .with_guarded_label_values(&[table_label, "UPDATE"])
350            .local();
351        let delete_count = self
352            .iter_log_op_type_counts
353            .with_guarded_label_values(&[table_label, "DELETE"])
354            .local();
355        LocalIterLogMetrics {
356            iter_metrics,
357            insert_count,
358            update_count,
359            delete_count,
360            report_count: 0,
361        }
362    }
363
364    fn local_get_metrics(&self, table_label: &str) -> LocalGetMetrics {
365        let get_duration = self
366            .get_duration
367            .with_guarded_label_values(&[table_label])
368            .local();
369        let get_key_size = self
370            .get_key_size
371            .with_guarded_label_values(&[table_label])
372            .local();
373        let get_value_size = self
374            .get_value_size
375            .with_guarded_label_values(&[table_label])
376            .local();
377
378        LocalGetMetrics {
379            get_duration,
380            get_key_size,
381            get_value_size,
382            report_count: 0,
383        }
384    }
385}
386
387struct LocalIterMetricsInner {
388    iter_init_duration: LabelGuardedLocalHistogram,
389    iter_scan_duration: LabelGuardedLocalHistogram,
390    iter_counts: LabelGuardedLocalIntCounter,
391    iter_item: LabelGuardedLocalHistogram,
392    iter_size: LabelGuardedLocalHistogram,
393    iter_in_progress_counts: LabelGuardedIntGauge,
394}
395
396struct LocalIterMetrics {
397    inner: LocalIterMetricsInner,
398    report_count: usize,
399}
400
401impl LocalIterMetrics {
402    fn may_flush(&mut self) {
403        self.report_count += 1;
404        if self.report_count > MAX_FLUSH_TIMES {
405            self.inner.flush();
406            self.report_count = 0;
407        }
408    }
409}
410
411impl LocalIterMetricsInner {
412    fn flush(&mut self) {
413        self.iter_scan_duration.flush();
414        self.iter_init_duration.flush();
415        self.iter_counts.flush();
416        self.iter_item.flush();
417        self.iter_size.flush();
418    }
419}
420
421struct LocalGetMetrics {
422    get_duration: LabelGuardedLocalHistogram,
423    get_key_size: LabelGuardedLocalHistogram,
424    get_value_size: LabelGuardedLocalHistogram,
425    report_count: usize,
426}
427
428impl LocalGetMetrics {
429    fn may_flush(&mut self) {
430        self.report_count += 1;
431        if self.report_count > MAX_FLUSH_TIMES {
432            self.get_duration.flush();
433            self.get_key_size.flush();
434            self.get_value_size.flush();
435            self.report_count = 0;
436        }
437    }
438}
439
440struct LocalIterLogMetrics {
441    iter_metrics: LocalIterMetricsInner,
442    insert_count: LabelGuardedLocalIntCounter,
443    update_count: LabelGuardedLocalIntCounter,
444    delete_count: LabelGuardedLocalIntCounter,
445    report_count: usize,
446}
447
448impl LocalIterLogMetrics {
449    fn may_flush(&mut self) {
450        self.report_count += 1;
451        if self.report_count > MAX_FLUSH_TIMES {
452            self.iter_metrics.flush();
453            self.insert_count.flush();
454            self.update_count.flush();
455            self.delete_count.flush();
456            self.report_count = 0;
457        }
458    }
459}
460
461pub(crate) trait StateStoreIterStatsTrait: Send {
462    type Item: IterItem;
463    fn new(
464        table_id: TableId,
465        metrics: &MonitoredStorageMetrics,
466        iter_init_duration: Duration,
467    ) -> Self;
468    fn observe(&mut self, item: <Self::Item as IterItem>::ItemRef<'_>);
469    fn report(&mut self, table_id: TableId, metrics: &MonitoredStorageMetrics);
470}
471
472const MAX_FLUSH_TIMES: usize = 64;
473
474struct StateStoreIterStatsInner {
475    pub iter_init_duration: Duration,
476    pub iter_scan_time: Instant,
477    pub total_items: usize,
478    pub total_size: usize,
479}
480
481impl StateStoreIterStatsInner {
482    fn new(iter_init_duration: Duration) -> Self {
483        Self {
484            iter_init_duration,
485            iter_scan_time: Instant::now(),
486            total_items: 0,
487            total_size: 0,
488        }
489    }
490}
491
492pub(crate) struct MonitoredStateStoreIterStats<S: StateStoreIterStatsTrait> {
493    pub inner: S,
494    pub table_id: TableId,
495    pub metrics: Arc<MonitoredStorageMetrics>,
496}
497
498impl<S: StateStoreIterStatsTrait> Drop for MonitoredStateStoreIterStats<S> {
499    fn drop(&mut self) {
500        self.inner.report(self.table_id, &self.metrics)
501    }
502}
503
504pub(crate) struct StateStoreIterStats {
505    inner: StateStoreIterStatsInner,
506}
507
508impl StateStoreIterStats {
509    fn for_table_metrics(
510        table_id: TableId,
511        global_metrics: &MonitoredStorageMetrics,
512        f: impl FnOnce(&mut LocalIterMetrics),
513    ) {
514        thread_local!(static LOCAL_ITER_METRICS: RefCell<HashMap<TableId, LocalIterMetrics>> = RefCell::new(HashMap::default()));
515        LOCAL_ITER_METRICS.with_borrow_mut(|local_metrics| {
516            let table_metrics = local_metrics.entry(table_id).or_insert_with(|| {
517                let table_label = table_id.to_string();
518                global_metrics.local_iter_metrics(&table_label)
519            });
520            f(table_metrics)
521        });
522    }
523}
524
525impl StateStoreIterStatsTrait for StateStoreIterStats {
526    type Item = StateStoreKeyedRow;
527
528    fn new(
529        table_id: TableId,
530        metrics: &MonitoredStorageMetrics,
531        iter_init_duration: Duration,
532    ) -> Self {
533        Self::for_table_metrics(table_id, metrics, |metrics| {
534            metrics.inner.iter_in_progress_counts.inc();
535        });
536        Self {
537            inner: StateStoreIterStatsInner::new(iter_init_duration),
538        }
539    }
540
541    fn observe(&mut self, (key, value): StateStoreKeyedRowRef<'_>) {
542        self.inner.total_items += 1;
543        self.inner.total_size += key.encoded_len() + value.len();
544    }
545
546    fn report(&mut self, table_id: TableId, metrics: &MonitoredStorageMetrics) {
547        Self::for_table_metrics(table_id, metrics, |table_metrics| {
548            self.inner.apply_to_local(&mut table_metrics.inner);
549            table_metrics.may_flush();
550        });
551    }
552}
553
554impl StateStoreIterStatsInner {
555    fn apply_to_local(&self, table_metrics: &mut LocalIterMetricsInner) {
556        {
557            let iter_scan_duration = self.iter_scan_time.elapsed();
558            table_metrics
559                .iter_scan_duration
560                .observe(iter_scan_duration.as_secs_f64());
561            table_metrics
562                .iter_init_duration
563                .observe(self.iter_init_duration.as_secs_f64());
564            table_metrics.iter_counts.inc();
565            table_metrics.iter_item.observe(self.total_items as f64);
566            table_metrics.iter_size.observe(self.total_size as f64);
567            table_metrics.iter_in_progress_counts.dec();
568        }
569    }
570}
571
572pub(crate) struct StateStoreIterLogStats {
573    inner: StateStoreIterStatsInner,
574    insert_count: u64,
575    update_count: u64,
576    delete_count: u64,
577}
578
579impl StateStoreIterLogStats {
580    fn for_table_metrics(
581        table_id: TableId,
582        global_metrics: &MonitoredStorageMetrics,
583        f: impl FnOnce(&mut LocalIterLogMetrics),
584    ) {
585        thread_local!(static LOCAL_ITER_LOG_METRICS: RefCell<HashMap<TableId, LocalIterLogMetrics>> = RefCell::new(HashMap::default()));
586        LOCAL_ITER_LOG_METRICS.with_borrow_mut(|local_metrics| {
587            let table_metrics = local_metrics.entry(table_id).or_insert_with(|| {
588                let table_label = table_id.to_string();
589                global_metrics.local_iter_log_metrics(&table_label)
590            });
591            f(table_metrics)
592        });
593    }
594}
595
596impl StateStoreIterStatsTrait for StateStoreIterLogStats {
597    type Item = StateStoreReadLogItem;
598
599    fn new(
600        table_id: TableId,
601        metrics: &MonitoredStorageMetrics,
602        iter_init_duration: Duration,
603    ) -> Self {
604        Self::for_table_metrics(table_id, metrics, |metrics| {
605            metrics.iter_metrics.iter_in_progress_counts.inc();
606        });
607        Self {
608            inner: StateStoreIterStatsInner::new(iter_init_duration),
609            insert_count: 0,
610            update_count: 0,
611            delete_count: 0,
612        }
613    }
614
615    fn observe(&mut self, (key, change_log): StateStoreReadLogItemRef<'_>) {
616        self.inner.total_items += 1;
617        let value_len = match change_log {
618            ChangeLogValue::Insert(value) => {
619                self.insert_count += 1;
620                value.len()
621            }
622            ChangeLogValue::Update {
623                old_value,
624                new_value,
625            } => {
626                self.update_count += 1;
627                old_value.len() + new_value.len()
628            }
629            ChangeLogValue::Delete(value) => {
630                self.delete_count += 1;
631                value.len()
632            }
633        };
634        self.inner.total_size += key.len() + value_len;
635    }
636
637    fn report(&mut self, table_id: TableId, metrics: &MonitoredStorageMetrics) {
638        Self::for_table_metrics(table_id, metrics, |table_metrics| {
639            self.inner.apply_to_local(&mut table_metrics.iter_metrics);
640            table_metrics.insert_count.inc_by(self.insert_count);
641            table_metrics.update_count.inc_by(self.update_count);
642            table_metrics.delete_count.inc_by(self.delete_count);
643            table_metrics.may_flush();
644        });
645    }
646}
647
648pub(crate) struct MonitoredStateStoreGetStats {
649    pub get_duration: Instant,
650    pub get_key_size: usize,
651    pub get_value_size: usize,
652    pub table_id: TableId,
653    pub metrics: Arc<MonitoredStorageMetrics>,
654}
655
656impl MonitoredStateStoreGetStats {
657    pub(crate) fn new(table_id: TableId, metrics: Arc<MonitoredStorageMetrics>) -> Self {
658        Self {
659            get_duration: Instant::now(),
660            get_key_size: 0,
661            get_value_size: 0,
662            table_id,
663            metrics,
664        }
665    }
666
667    pub(crate) fn report(&self) {
668        thread_local!(static LOCAL_GET_METRICS: RefCell<HashMap<TableId, LocalGetMetrics>> = RefCell::new(HashMap::default()));
669        LOCAL_GET_METRICS.with_borrow_mut(|local_metrics| {
670            let table_metrics = local_metrics.entry(self.table_id).or_insert_with(|| {
671                let table_label = self.table_id.to_string();
672                self.metrics.local_get_metrics(&table_label)
673            });
674            let get_duration = self.get_duration.elapsed();
675            table_metrics
676                .get_duration
677                .observe(get_duration.as_secs_f64());
678            table_metrics.get_key_size.observe(self.get_key_size as _);
679            if self.get_value_size > 0 {
680                table_metrics
681                    .get_value_size
682                    .observe(self.get_value_size as _);
683            }
684            table_metrics.may_flush();
685        });
686    }
687}