1use std::cell::RefCell;
16use std::collections::HashMap;
17use std::sync::{Arc, OnceLock};
18use std::time::{Duration, Instant};
19
20use prometheus::{
21 Histogram, Registry, exponential_buckets, histogram_opts, linear_buckets,
22 register_histogram_with_registry,
23};
24use risingwave_common::config::MetricLevel;
25use risingwave_common::id::TableId;
26use risingwave_common::metrics::{
27 LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGauge,
28 LabelGuardedLocalHistogram, LabelGuardedLocalIntCounter, RelabeledGuardedHistogramVec,
29 RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
30};
31use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
32use risingwave_common::{
33 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
34 register_guarded_int_gauge_vec_with_registry,
35};
36
37use crate::store::{
38 ChangeLogValue, IterItem, StateStoreKeyedRow, StateStoreKeyedRowRef, StateStoreReadLogItem,
39 StateStoreReadLogItemRef,
40};
41
42#[derive(Debug, Clone)]
44pub struct MonitoredStorageMetrics {
45 pub get_duration: RelabeledGuardedHistogramVec,
46 pub get_key_size: RelabeledGuardedHistogramVec,
47 pub get_value_size: RelabeledGuardedHistogramVec,
48
49 pub iter_size: RelabeledGuardedHistogramVec,
51 pub iter_item: RelabeledGuardedHistogramVec,
52 pub iter_init_duration: RelabeledGuardedHistogramVec,
53 pub iter_scan_duration: RelabeledGuardedHistogramVec,
54 pub iter_counts: RelabeledGuardedIntCounterVec,
55 pub iter_in_progress_counts: RelabeledGuardedIntGaugeVec,
56
57 pub iter_log_op_type_counts: LabelGuardedIntCounterVec,
59
60 pub vector_nearest_duration: LabelGuardedHistogramVec,
62
63 pub sync_duration: Histogram,
64 pub sync_size: Histogram,
65}
66
67pub static GLOBAL_STORAGE_METRICS: OnceLock<MonitoredStorageMetrics> = OnceLock::new();
68
69pub fn global_storage_metrics(metric_level: MetricLevel) -> MonitoredStorageMetrics {
70 GLOBAL_STORAGE_METRICS
71 .get_or_init(|| MonitoredStorageMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
72 .clone()
73}
74
75impl MonitoredStorageMetrics {
76 pub fn new(registry: &Registry, metric_level: MetricLevel) -> Self {
77 let size_buckets = exponential_buckets(256.0, 16.0, 8).unwrap();
79 let time_buckets = exponential_buckets(0.01, 10.0, 7).unwrap();
81 let opts = histogram_opts!(
83 "state_store_get_key_size",
84 "Total key bytes of get that have been issued to state store",
85 size_buckets.clone()
86 );
87 let get_key_size =
88 register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
89 let get_key_size = RelabeledGuardedHistogramVec::with_metric_level(
90 MetricLevel::Debug,
91 get_key_size,
92 metric_level,
93 );
94
95 let opts = histogram_opts!(
96 "state_store_get_value_size",
97 "Total value bytes that have been requested from remote storage",
98 size_buckets.clone()
99 );
100 let get_value_size =
101 register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
102 let get_value_size = RelabeledGuardedHistogramVec::with_metric_level(
103 MetricLevel::Debug,
104 get_value_size,
105 metric_level,
106 );
107
108 let mut buckets = exponential_buckets(0.000004, 2.0, 4).unwrap(); buckets.extend(linear_buckets(0.00006, 0.00004, 5).unwrap()); buckets.extend(linear_buckets(0.0003, 0.0001, 3).unwrap()); buckets.extend(exponential_buckets(0.001, 2.0, 5).unwrap()); buckets.extend(exponential_buckets(0.05, 4.0, 5).unwrap()); buckets.push(16.0); let mut state_store_read_time_buckets = exponential_buckets(0.001, 10.0, 5).unwrap();
117 state_store_read_time_buckets.push(40.0);
118 state_store_read_time_buckets.push(100.0);
119
120 let get_duration_opts = histogram_opts!(
121 "state_store_get_duration",
122 "Total latency of get that have been issued to state store",
123 state_store_read_time_buckets.clone(),
124 );
125 let get_duration = register_guarded_histogram_vec_with_registry!(
126 get_duration_opts,
127 &["table_id"],
128 registry
129 )
130 .unwrap();
131 let get_duration = RelabeledGuardedHistogramVec::with_metric_level(
132 MetricLevel::Critical,
133 get_duration,
134 metric_level,
135 );
136
137 let opts = histogram_opts!(
138 "state_store_iter_size",
139 "Total bytes gotten from state store scan(), for calculating read throughput",
140 size_buckets.clone()
141 );
142 let iter_size = register_guarded_histogram_vec_with_registry!(
143 opts,
144 &["table_id", "iter_type"],
145 registry
146 )
147 .unwrap();
148 let iter_size = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
149 MetricLevel::Debug,
150 iter_size,
151 metric_level,
152 1,
153 );
154
155 let opts = histogram_opts!(
156 "state_store_iter_item",
157 "Total bytes gotten from state store scan(), for calculating read throughput",
158 size_buckets.clone(),
159 );
160 let iter_item = register_guarded_histogram_vec_with_registry!(
161 opts,
162 &["table_id", "iter_type"],
163 registry
164 )
165 .unwrap();
166 let iter_item = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
167 MetricLevel::Debug,
168 iter_item,
169 metric_level,
170 1,
171 );
172
173 let opts = histogram_opts!(
174 "state_store_iter_init_duration",
175 "Histogram of the time spent on iterator initialization.",
176 state_store_read_time_buckets.clone(),
177 );
178 let iter_init_duration = register_guarded_histogram_vec_with_registry!(
179 opts,
180 &["table_id", "iter_type"],
181 registry
182 )
183 .unwrap();
184 let iter_init_duration = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
185 MetricLevel::Critical,
186 iter_init_duration,
187 metric_level,
188 1,
189 );
190
191 let opts = histogram_opts!(
192 "state_store_iter_scan_duration",
193 "Histogram of the time spent on iterator scanning.",
194 state_store_read_time_buckets.clone(),
195 );
196 let iter_scan_duration = register_guarded_histogram_vec_with_registry!(
197 opts,
198 &["table_id", "iter_type"],
199 registry
200 )
201 .unwrap();
202 let iter_scan_duration = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
203 MetricLevel::Critical,
204 iter_scan_duration,
205 metric_level,
206 1,
207 );
208
209 let iter_counts = register_guarded_int_counter_vec_with_registry!(
210 "state_store_iter_counts",
211 "Total number of iter that have been issued to state store",
212 &["table_id", "iter_type"],
213 registry
214 )
215 .unwrap();
216 let iter_counts = RelabeledGuardedIntCounterVec::with_metric_level_relabel_n(
217 MetricLevel::Debug,
218 iter_counts,
219 metric_level,
220 1,
221 );
222
223 let iter_in_progress_counts = register_guarded_int_gauge_vec_with_registry!(
224 "state_store_iter_in_progress_counts",
225 "Total number of existing iter",
226 &["table_id", "iter_type"],
227 registry
228 )
229 .unwrap();
230 let iter_in_progress_counts = RelabeledGuardedIntGaugeVec::with_metric_level_relabel_n(
231 MetricLevel::Debug,
232 iter_in_progress_counts,
233 metric_level,
234 1,
235 );
236
237 let iter_log_op_type_counts = register_guarded_int_counter_vec_with_registry!(
238 "state_store_iter_log_op_type_counts",
239 "Counter of each op type in iter_log",
240 &["table_id", "op_type"],
241 registry
242 )
243 .unwrap();
244
245 let opts = histogram_opts!(
246 "state_store_sync_duration",
247 "Histogram of time spent on compacting shared buffer to remote storage",
248 time_buckets,
249 );
250 let sync_duration = register_histogram_with_registry!(opts, registry).unwrap();
251
252 let opts = histogram_opts!(
253 "state_store_sync_size",
254 "Total size of upload to l0 every epoch",
255 size_buckets,
256 );
257 let sync_size = register_histogram_with_registry!(opts, registry).unwrap();
258
259 let vector_nearest_duration_opts = histogram_opts!(
260 "state_store_vector_nearest_duration",
261 "Total latency of vector nearest that have been issued to state store",
262 state_store_read_time_buckets.clone(),
263 );
264 let vector_nearest_duration = register_guarded_histogram_vec_with_registry!(
265 vector_nearest_duration_opts,
266 &["table_id", "top_n", "ef_search"],
267 registry
268 )
269 .unwrap();
270
271 Self {
272 get_duration,
273 get_key_size,
274 get_value_size,
275 iter_size,
276 iter_item,
277 iter_init_duration,
278 iter_scan_duration,
279 iter_counts,
280 iter_in_progress_counts,
281 iter_log_op_type_counts,
282 vector_nearest_duration,
283 sync_duration,
284 sync_size,
285 }
286 }
287
288 pub fn unused() -> Self {
289 global_storage_metrics(MetricLevel::Disabled)
290 }
291
292 fn local_iter_metrics(&self, table_label: &str) -> LocalIterMetrics {
293 let inner = self.new_local_iter_metrics_inner(table_label, "iter");
294 LocalIterMetrics {
295 inner,
296 report_count: 0,
297 }
298 }
299
300 fn new_local_iter_metrics_inner(
301 &self,
302 table_label: &str,
303 iter_type: &str,
304 ) -> LocalIterMetricsInner {
305 let iter_init_duration = self
306 .iter_init_duration
307 .with_guarded_label_values(&[table_label, iter_type])
308 .local();
309 let iter_counts = self
310 .iter_counts
311 .with_guarded_label_values(&[table_label, iter_type])
312 .local();
313 let iter_scan_duration = self
314 .iter_scan_duration
315 .with_guarded_label_values(&[table_label, iter_type])
316 .local();
317 let iter_item = self
318 .iter_item
319 .with_guarded_label_values(&[table_label, iter_type])
320 .local();
321 let iter_size = self
322 .iter_size
323 .with_guarded_label_values(&[table_label, iter_type])
324 .local();
325 let iter_in_progress_counts = self
326 .iter_in_progress_counts
327 .with_guarded_label_values(&[table_label, iter_type]);
328
329 LocalIterMetricsInner {
330 iter_init_duration,
331 iter_scan_duration,
332 iter_counts,
333 iter_item,
334 iter_size,
335 iter_in_progress_counts,
336 }
337 }
338
339 fn local_iter_log_metrics(&self, table_label: &str) -> LocalIterLogMetrics {
340 let iter_metrics = self.new_local_iter_metrics_inner(table_label, "iter_log");
341 let insert_count = self
342 .iter_log_op_type_counts
343 .with_guarded_label_values(&[table_label, "INSERT"])
344 .local();
345 let update_count = self
346 .iter_log_op_type_counts
347 .with_guarded_label_values(&[table_label, "UPDATE"])
348 .local();
349 let delete_count = self
350 .iter_log_op_type_counts
351 .with_guarded_label_values(&[table_label, "DELETE"])
352 .local();
353 LocalIterLogMetrics {
354 iter_metrics,
355 insert_count,
356 update_count,
357 delete_count,
358 report_count: 0,
359 }
360 }
361
362 fn local_get_metrics(&self, table_label: &str) -> LocalGetMetrics {
363 let get_duration = self
364 .get_duration
365 .with_guarded_label_values(&[table_label])
366 .local();
367 let get_key_size = self
368 .get_key_size
369 .with_guarded_label_values(&[table_label])
370 .local();
371 let get_value_size = self
372 .get_value_size
373 .with_guarded_label_values(&[table_label])
374 .local();
375
376 LocalGetMetrics {
377 get_duration,
378 get_key_size,
379 get_value_size,
380 report_count: 0,
381 }
382 }
383}
384
385struct LocalIterMetricsInner {
386 iter_init_duration: LabelGuardedLocalHistogram,
387 iter_scan_duration: LabelGuardedLocalHistogram,
388 iter_counts: LabelGuardedLocalIntCounter,
389 iter_item: LabelGuardedLocalHistogram,
390 iter_size: LabelGuardedLocalHistogram,
391 iter_in_progress_counts: LabelGuardedIntGauge,
392}
393
394struct LocalIterMetrics {
395 inner: LocalIterMetricsInner,
396 report_count: usize,
397}
398
399impl LocalIterMetrics {
400 fn may_flush(&mut self) {
401 self.report_count += 1;
402 if self.report_count > MAX_FLUSH_TIMES {
403 self.inner.flush();
404 self.report_count = 0;
405 }
406 }
407}
408
409impl LocalIterMetricsInner {
410 fn flush(&mut self) {
411 self.iter_scan_duration.flush();
412 self.iter_init_duration.flush();
413 self.iter_counts.flush();
414 self.iter_item.flush();
415 self.iter_size.flush();
416 }
417}
418
419struct LocalGetMetrics {
420 get_duration: LabelGuardedLocalHistogram,
421 get_key_size: LabelGuardedLocalHistogram,
422 get_value_size: LabelGuardedLocalHistogram,
423 report_count: usize,
424}
425
426impl LocalGetMetrics {
427 fn may_flush(&mut self) {
428 self.report_count += 1;
429 if self.report_count > MAX_FLUSH_TIMES {
430 self.get_duration.flush();
431 self.get_key_size.flush();
432 self.get_value_size.flush();
433 self.report_count = 0;
434 }
435 }
436}
437
438struct LocalIterLogMetrics {
439 iter_metrics: LocalIterMetricsInner,
440 insert_count: LabelGuardedLocalIntCounter,
441 update_count: LabelGuardedLocalIntCounter,
442 delete_count: LabelGuardedLocalIntCounter,
443 report_count: usize,
444}
445
446impl LocalIterLogMetrics {
447 fn may_flush(&mut self) {
448 self.report_count += 1;
449 if self.report_count > MAX_FLUSH_TIMES {
450 self.iter_metrics.flush();
451 self.insert_count.flush();
452 self.update_count.flush();
453 self.delete_count.flush();
454 self.report_count = 0;
455 }
456 }
457}
458
459pub(crate) trait StateStoreIterStatsTrait: Send {
460 type Item: IterItem;
461 fn new(
462 table_id: TableId,
463 metrics: &MonitoredStorageMetrics,
464 iter_init_duration: Duration,
465 ) -> Self;
466 fn observe(&mut self, item: <Self::Item as IterItem>::ItemRef<'_>);
467 fn report(&mut self, table_id: TableId, metrics: &MonitoredStorageMetrics);
468}
469
470const MAX_FLUSH_TIMES: usize = 64;
471
472struct StateStoreIterStatsInner {
473 pub iter_init_duration: Duration,
474 pub iter_scan_time: Instant,
475 pub total_items: usize,
476 pub total_size: usize,
477}
478
479impl StateStoreIterStatsInner {
480 fn new(iter_init_duration: Duration) -> Self {
481 Self {
482 iter_init_duration,
483 iter_scan_time: Instant::now(),
484 total_items: 0,
485 total_size: 0,
486 }
487 }
488}
489
490pub(crate) struct MonitoredStateStoreIterStats<S: StateStoreIterStatsTrait> {
491 pub inner: S,
492 pub table_id: TableId,
493 pub metrics: Arc<MonitoredStorageMetrics>,
494}
495
496impl<S: StateStoreIterStatsTrait> Drop for MonitoredStateStoreIterStats<S> {
497 fn drop(&mut self) {
498 self.inner.report(self.table_id, &self.metrics)
499 }
500}
501
502pub(crate) struct StateStoreIterStats {
503 inner: StateStoreIterStatsInner,
504}
505
506impl StateStoreIterStats {
507 fn for_table_metrics(
508 table_id: TableId,
509 global_metrics: &MonitoredStorageMetrics,
510 f: impl FnOnce(&mut LocalIterMetrics),
511 ) {
512 thread_local!(static LOCAL_ITER_METRICS: RefCell<HashMap<TableId, LocalIterMetrics>> = RefCell::new(HashMap::default()));
513 LOCAL_ITER_METRICS.with_borrow_mut(|local_metrics| {
514 let table_metrics = local_metrics.entry(table_id).or_insert_with(|| {
515 let table_label = table_id.to_string();
516 global_metrics.local_iter_metrics(&table_label)
517 });
518 f(table_metrics)
519 });
520 }
521}
522
523impl StateStoreIterStatsTrait for StateStoreIterStats {
524 type Item = StateStoreKeyedRow;
525
526 fn new(
527 table_id: TableId,
528 metrics: &MonitoredStorageMetrics,
529 iter_init_duration: Duration,
530 ) -> Self {
531 Self::for_table_metrics(table_id, metrics, |metrics| {
532 metrics.inner.iter_in_progress_counts.inc();
533 });
534 Self {
535 inner: StateStoreIterStatsInner::new(iter_init_duration),
536 }
537 }
538
539 fn observe(&mut self, (key, value): StateStoreKeyedRowRef<'_>) {
540 self.inner.total_items += 1;
541 self.inner.total_size += key.encoded_len() + value.len();
542 }
543
544 fn report(&mut self, table_id: TableId, metrics: &MonitoredStorageMetrics) {
545 Self::for_table_metrics(table_id, metrics, |table_metrics| {
546 self.inner.apply_to_local(&mut table_metrics.inner);
547 table_metrics.may_flush();
548 });
549 }
550}
551
552impl StateStoreIterStatsInner {
553 fn apply_to_local(&self, table_metrics: &mut LocalIterMetricsInner) {
554 {
555 let iter_scan_duration = self.iter_scan_time.elapsed();
556 table_metrics
557 .iter_scan_duration
558 .observe(iter_scan_duration.as_secs_f64());
559 table_metrics
560 .iter_init_duration
561 .observe(self.iter_init_duration.as_secs_f64());
562 table_metrics.iter_counts.inc();
563 table_metrics.iter_item.observe(self.total_items as f64);
564 table_metrics.iter_size.observe(self.total_size as f64);
565 table_metrics.iter_in_progress_counts.dec();
566 }
567 }
568}
569
570pub(crate) struct StateStoreIterLogStats {
571 inner: StateStoreIterStatsInner,
572 insert_count: u64,
573 update_count: u64,
574 delete_count: u64,
575}
576
577impl StateStoreIterLogStats {
578 fn for_table_metrics(
579 table_id: TableId,
580 global_metrics: &MonitoredStorageMetrics,
581 f: impl FnOnce(&mut LocalIterLogMetrics),
582 ) {
583 thread_local!(static LOCAL_ITER_LOG_METRICS: RefCell<HashMap<TableId, LocalIterLogMetrics>> = RefCell::new(HashMap::default()));
584 LOCAL_ITER_LOG_METRICS.with_borrow_mut(|local_metrics| {
585 let table_metrics = local_metrics.entry(table_id).or_insert_with(|| {
586 let table_label = table_id.to_string();
587 global_metrics.local_iter_log_metrics(&table_label)
588 });
589 f(table_metrics)
590 });
591 }
592}
593
594impl StateStoreIterStatsTrait for StateStoreIterLogStats {
595 type Item = StateStoreReadLogItem;
596
597 fn new(
598 table_id: TableId,
599 metrics: &MonitoredStorageMetrics,
600 iter_init_duration: Duration,
601 ) -> Self {
602 Self::for_table_metrics(table_id, metrics, |metrics| {
603 metrics.iter_metrics.iter_in_progress_counts.inc();
604 });
605 Self {
606 inner: StateStoreIterStatsInner::new(iter_init_duration),
607 insert_count: 0,
608 update_count: 0,
609 delete_count: 0,
610 }
611 }
612
613 fn observe(&mut self, (key, change_log): StateStoreReadLogItemRef<'_>) {
614 self.inner.total_items += 1;
615 let value_len = match change_log {
616 ChangeLogValue::Insert(value) => {
617 self.insert_count += 1;
618 value.len()
619 }
620 ChangeLogValue::Update {
621 old_value,
622 new_value,
623 } => {
624 self.update_count += 1;
625 old_value.len() + new_value.len()
626 }
627 ChangeLogValue::Delete(value) => {
628 self.delete_count += 1;
629 value.len()
630 }
631 };
632 self.inner.total_size += key.len() + value_len;
633 }
634
635 fn report(&mut self, table_id: TableId, metrics: &MonitoredStorageMetrics) {
636 Self::for_table_metrics(table_id, metrics, |table_metrics| {
637 self.inner.apply_to_local(&mut table_metrics.iter_metrics);
638 table_metrics.insert_count.inc_by(self.insert_count);
639 table_metrics.update_count.inc_by(self.update_count);
640 table_metrics.delete_count.inc_by(self.delete_count);
641 table_metrics.may_flush();
642 });
643 }
644}
645
646pub(crate) struct MonitoredStateStoreGetStats {
647 pub get_duration: Instant,
648 pub get_key_size: usize,
649 pub get_value_size: usize,
650 pub table_id: TableId,
651 pub metrics: Arc<MonitoredStorageMetrics>,
652}
653
654impl MonitoredStateStoreGetStats {
655 pub(crate) fn new(table_id: TableId, metrics: Arc<MonitoredStorageMetrics>) -> Self {
656 Self {
657 get_duration: Instant::now(),
658 get_key_size: 0,
659 get_value_size: 0,
660 table_id,
661 metrics,
662 }
663 }
664
665 pub(crate) fn report(&self) {
666 thread_local!(static LOCAL_GET_METRICS: RefCell<HashMap<TableId, LocalGetMetrics>> = RefCell::new(HashMap::default()));
667 LOCAL_GET_METRICS.with_borrow_mut(|local_metrics| {
668 let table_metrics = local_metrics.entry(self.table_id).or_insert_with(|| {
669 let table_label = self.table_id.to_string();
670 self.metrics.local_get_metrics(&table_label)
671 });
672 let get_duration = self.get_duration.elapsed();
673 table_metrics
674 .get_duration
675 .observe(get_duration.as_secs_f64());
676 table_metrics.get_key_size.observe(self.get_key_size as _);
677 if self.get_value_size > 0 {
678 table_metrics
679 .get_value_size
680 .observe(self.get_value_size as _);
681 }
682 table_metrics.may_flush();
683 });
684 }
685}