1use std::cell::RefCell;
16use std::collections::HashMap;
17use std::sync::{Arc, OnceLock};
18use std::time::{Duration, Instant};
19
20use prometheus::{
21 Histogram, Registry, exponential_buckets, histogram_opts, linear_buckets,
22 register_histogram_with_registry,
23};
24use risingwave_common::config::MetricLevel;
25use risingwave_common::id::TableId;
26use risingwave_common::metrics::{
27 LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGauge,
28 LabelGuardedLocalHistogram, LabelGuardedLocalIntCounter, RelabeledGuardedHistogramVec,
29 RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec,
30};
31use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
32use risingwave_common::{
33 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
34 register_guarded_int_gauge_vec_with_registry,
35};
36
37use crate::store::{
38 ChangeLogValue, IterItem, StateStoreKeyedRow, StateStoreKeyedRowRef, StateStoreReadLogItem,
39 StateStoreReadLogItemRef,
40};
41
42#[derive(Debug, Clone)]
44pub struct MonitoredStorageMetrics {
45 pub get_duration: RelabeledGuardedHistogramVec,
46 pub get_key_size: RelabeledGuardedHistogramVec,
47 pub get_value_size: RelabeledGuardedHistogramVec,
48
49 pub iter_size: RelabeledGuardedHistogramVec,
51 pub iter_item: RelabeledGuardedHistogramVec,
52 pub iter_init_duration: RelabeledGuardedHistogramVec,
53 pub iter_scan_duration: RelabeledGuardedHistogramVec,
54 pub iter_counts: RelabeledGuardedIntCounterVec,
55 pub iter_in_progress_counts: RelabeledGuardedIntGaugeVec,
56
57 pub iter_log_op_type_counts: LabelGuardedIntCounterVec,
59
60 pub vector_nearest_duration: LabelGuardedHistogramVec,
62
63 pub sync_duration: Histogram,
64 pub sync_size: Histogram,
65}
66
67pub static GLOBAL_STORAGE_METRICS: OnceLock<MonitoredStorageMetrics> = OnceLock::new();
68
69pub fn global_storage_metrics(metric_level: MetricLevel) -> MonitoredStorageMetrics {
70 GLOBAL_STORAGE_METRICS
71 .get_or_init(|| MonitoredStorageMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
72 .clone()
73}
74
75impl MonitoredStorageMetrics {
76 pub fn new(registry: &Registry, metric_level: MetricLevel) -> Self {
77 let size_buckets = exponential_buckets(256.0, 16.0, 8).unwrap();
79 let sync_size_buckets = exponential_buckets(16.0 * 1024.0 * 1024.0, 2.0, 17).unwrap();
81 let time_buckets = exponential_buckets(0.01, 10.0, 7).unwrap();
83 let opts = histogram_opts!(
85 "state_store_get_key_size",
86 "Total key bytes of get that have been issued to state store",
87 size_buckets.clone()
88 );
89 let get_key_size =
90 register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
91 let get_key_size = RelabeledGuardedHistogramVec::with_metric_level(
92 MetricLevel::Debug,
93 get_key_size,
94 metric_level,
95 );
96
97 let opts = histogram_opts!(
98 "state_store_get_value_size",
99 "Total value bytes that have been requested from remote storage",
100 size_buckets.clone()
101 );
102 let get_value_size =
103 register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
104 let get_value_size = RelabeledGuardedHistogramVec::with_metric_level(
105 MetricLevel::Debug,
106 get_value_size,
107 metric_level,
108 );
109
110 let mut buckets = exponential_buckets(0.000004, 2.0, 4).unwrap(); buckets.extend(linear_buckets(0.00006, 0.00004, 5).unwrap()); buckets.extend(linear_buckets(0.0003, 0.0001, 3).unwrap()); buckets.extend(exponential_buckets(0.001, 2.0, 5).unwrap()); buckets.extend(exponential_buckets(0.05, 4.0, 5).unwrap()); buckets.push(16.0); let mut state_store_read_time_buckets = exponential_buckets(0.001, 10.0, 5).unwrap();
119 state_store_read_time_buckets.push(40.0);
120 state_store_read_time_buckets.push(100.0);
121
122 let get_duration_opts = histogram_opts!(
123 "state_store_get_duration",
124 "Total latency of get that have been issued to state store",
125 state_store_read_time_buckets.clone(),
126 );
127 let get_duration = register_guarded_histogram_vec_with_registry!(
128 get_duration_opts,
129 &["table_id"],
130 registry
131 )
132 .unwrap();
133 let get_duration = RelabeledGuardedHistogramVec::with_metric_level(
134 MetricLevel::Critical,
135 get_duration,
136 metric_level,
137 );
138
139 let opts = histogram_opts!(
140 "state_store_iter_size",
141 "Total bytes gotten from state store scan(), for calculating read throughput",
142 size_buckets.clone()
143 );
144 let iter_size = register_guarded_histogram_vec_with_registry!(
145 opts,
146 &["table_id", "iter_type"],
147 registry
148 )
149 .unwrap();
150 let iter_size = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
151 MetricLevel::Debug,
152 iter_size,
153 metric_level,
154 1,
155 );
156
157 let opts = histogram_opts!(
158 "state_store_iter_item",
159 "Total bytes gotten from state store scan(), for calculating read throughput",
160 size_buckets,
161 );
162 let iter_item = register_guarded_histogram_vec_with_registry!(
163 opts,
164 &["table_id", "iter_type"],
165 registry
166 )
167 .unwrap();
168 let iter_item = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
169 MetricLevel::Debug,
170 iter_item,
171 metric_level,
172 1,
173 );
174
175 let opts = histogram_opts!(
176 "state_store_iter_init_duration",
177 "Histogram of the time spent on iterator initialization.",
178 state_store_read_time_buckets.clone(),
179 );
180 let iter_init_duration = register_guarded_histogram_vec_with_registry!(
181 opts,
182 &["table_id", "iter_type"],
183 registry
184 )
185 .unwrap();
186 let iter_init_duration = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
187 MetricLevel::Critical,
188 iter_init_duration,
189 metric_level,
190 1,
191 );
192
193 let opts = histogram_opts!(
194 "state_store_iter_scan_duration",
195 "Histogram of the time spent on iterator scanning.",
196 state_store_read_time_buckets.clone(),
197 );
198 let iter_scan_duration = register_guarded_histogram_vec_with_registry!(
199 opts,
200 &["table_id", "iter_type"],
201 registry
202 )
203 .unwrap();
204 let iter_scan_duration = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
205 MetricLevel::Critical,
206 iter_scan_duration,
207 metric_level,
208 1,
209 );
210
211 let iter_counts = register_guarded_int_counter_vec_with_registry!(
212 "state_store_iter_counts",
213 "Total number of iter that have been issued to state store",
214 &["table_id", "iter_type"],
215 registry
216 )
217 .unwrap();
218 let iter_counts = RelabeledGuardedIntCounterVec::with_metric_level_relabel_n(
219 MetricLevel::Debug,
220 iter_counts,
221 metric_level,
222 1,
223 );
224
225 let iter_in_progress_counts = register_guarded_int_gauge_vec_with_registry!(
226 "state_store_iter_in_progress_counts",
227 "Total number of existing iter",
228 &["table_id", "iter_type"],
229 registry
230 )
231 .unwrap();
232 let iter_in_progress_counts = RelabeledGuardedIntGaugeVec::with_metric_level_relabel_n(
233 MetricLevel::Debug,
234 iter_in_progress_counts,
235 metric_level,
236 1,
237 );
238
239 let iter_log_op_type_counts = register_guarded_int_counter_vec_with_registry!(
240 "state_store_iter_log_op_type_counts",
241 "Counter of each op type in iter_log",
242 &["table_id", "op_type"],
243 registry
244 )
245 .unwrap();
246
247 let opts = histogram_opts!(
248 "state_store_sync_duration",
249 "Histogram of time spent on compacting shared buffer to remote storage",
250 time_buckets,
251 );
252 let sync_duration = register_histogram_with_registry!(opts, registry).unwrap();
253
254 let opts = histogram_opts!(
255 "state_store_sync_size",
256 "Total size of upload to l0 every epoch",
257 sync_size_buckets,
258 );
259 let sync_size = register_histogram_with_registry!(opts, registry).unwrap();
260
261 let vector_nearest_duration_opts = histogram_opts!(
262 "state_store_vector_nearest_duration",
263 "Total latency of vector nearest that have been issued to state store",
264 state_store_read_time_buckets.clone(),
265 );
266 let vector_nearest_duration = register_guarded_histogram_vec_with_registry!(
267 vector_nearest_duration_opts,
268 &["table_id", "top_n", "ef_search"],
269 registry
270 )
271 .unwrap();
272
273 Self {
274 get_duration,
275 get_key_size,
276 get_value_size,
277 iter_size,
278 iter_item,
279 iter_init_duration,
280 iter_scan_duration,
281 iter_counts,
282 iter_in_progress_counts,
283 iter_log_op_type_counts,
284 vector_nearest_duration,
285 sync_duration,
286 sync_size,
287 }
288 }
289
290 pub fn unused() -> Self {
291 global_storage_metrics(MetricLevel::Disabled)
292 }
293
294 fn local_iter_metrics(&self, table_label: &str) -> LocalIterMetrics {
295 let inner = self.new_local_iter_metrics_inner(table_label, "iter");
296 LocalIterMetrics {
297 inner,
298 report_count: 0,
299 }
300 }
301
302 fn new_local_iter_metrics_inner(
303 &self,
304 table_label: &str,
305 iter_type: &str,
306 ) -> LocalIterMetricsInner {
307 let iter_init_duration = self
308 .iter_init_duration
309 .with_guarded_label_values(&[table_label, iter_type])
310 .local();
311 let iter_counts = self
312 .iter_counts
313 .with_guarded_label_values(&[table_label, iter_type])
314 .local();
315 let iter_scan_duration = self
316 .iter_scan_duration
317 .with_guarded_label_values(&[table_label, iter_type])
318 .local();
319 let iter_item = self
320 .iter_item
321 .with_guarded_label_values(&[table_label, iter_type])
322 .local();
323 let iter_size = self
324 .iter_size
325 .with_guarded_label_values(&[table_label, iter_type])
326 .local();
327 let iter_in_progress_counts = self
328 .iter_in_progress_counts
329 .with_guarded_label_values(&[table_label, iter_type]);
330
331 LocalIterMetricsInner {
332 iter_init_duration,
333 iter_scan_duration,
334 iter_counts,
335 iter_item,
336 iter_size,
337 iter_in_progress_counts,
338 }
339 }
340
341 fn local_iter_log_metrics(&self, table_label: &str) -> LocalIterLogMetrics {
342 let iter_metrics = self.new_local_iter_metrics_inner(table_label, "iter_log");
343 let insert_count = self
344 .iter_log_op_type_counts
345 .with_guarded_label_values(&[table_label, "INSERT"])
346 .local();
347 let update_count = self
348 .iter_log_op_type_counts
349 .with_guarded_label_values(&[table_label, "UPDATE"])
350 .local();
351 let delete_count = self
352 .iter_log_op_type_counts
353 .with_guarded_label_values(&[table_label, "DELETE"])
354 .local();
355 LocalIterLogMetrics {
356 iter_metrics,
357 insert_count,
358 update_count,
359 delete_count,
360 report_count: 0,
361 }
362 }
363
364 fn local_get_metrics(&self, table_label: &str) -> LocalGetMetrics {
365 let get_duration = self
366 .get_duration
367 .with_guarded_label_values(&[table_label])
368 .local();
369 let get_key_size = self
370 .get_key_size
371 .with_guarded_label_values(&[table_label])
372 .local();
373 let get_value_size = self
374 .get_value_size
375 .with_guarded_label_values(&[table_label])
376 .local();
377
378 LocalGetMetrics {
379 get_duration,
380 get_key_size,
381 get_value_size,
382 report_count: 0,
383 }
384 }
385}
386
387struct LocalIterMetricsInner {
388 iter_init_duration: LabelGuardedLocalHistogram,
389 iter_scan_duration: LabelGuardedLocalHistogram,
390 iter_counts: LabelGuardedLocalIntCounter,
391 iter_item: LabelGuardedLocalHistogram,
392 iter_size: LabelGuardedLocalHistogram,
393 iter_in_progress_counts: LabelGuardedIntGauge,
394}
395
396struct LocalIterMetrics {
397 inner: LocalIterMetricsInner,
398 report_count: usize,
399}
400
401impl LocalIterMetrics {
402 fn may_flush(&mut self) {
403 self.report_count += 1;
404 if self.report_count > MAX_FLUSH_TIMES {
405 self.inner.flush();
406 self.report_count = 0;
407 }
408 }
409}
410
411impl LocalIterMetricsInner {
412 fn flush(&mut self) {
413 self.iter_scan_duration.flush();
414 self.iter_init_duration.flush();
415 self.iter_counts.flush();
416 self.iter_item.flush();
417 self.iter_size.flush();
418 }
419}
420
421struct LocalGetMetrics {
422 get_duration: LabelGuardedLocalHistogram,
423 get_key_size: LabelGuardedLocalHistogram,
424 get_value_size: LabelGuardedLocalHistogram,
425 report_count: usize,
426}
427
428impl LocalGetMetrics {
429 fn may_flush(&mut self) {
430 self.report_count += 1;
431 if self.report_count > MAX_FLUSH_TIMES {
432 self.get_duration.flush();
433 self.get_key_size.flush();
434 self.get_value_size.flush();
435 self.report_count = 0;
436 }
437 }
438}
439
440struct LocalIterLogMetrics {
441 iter_metrics: LocalIterMetricsInner,
442 insert_count: LabelGuardedLocalIntCounter,
443 update_count: LabelGuardedLocalIntCounter,
444 delete_count: LabelGuardedLocalIntCounter,
445 report_count: usize,
446}
447
448impl LocalIterLogMetrics {
449 fn may_flush(&mut self) {
450 self.report_count += 1;
451 if self.report_count > MAX_FLUSH_TIMES {
452 self.iter_metrics.flush();
453 self.insert_count.flush();
454 self.update_count.flush();
455 self.delete_count.flush();
456 self.report_count = 0;
457 }
458 }
459}
460
461pub(crate) trait StateStoreIterStatsTrait: Send {
462 type Item: IterItem;
463 fn new(
464 table_id: TableId,
465 metrics: &MonitoredStorageMetrics,
466 iter_init_duration: Duration,
467 ) -> Self;
468 fn observe(&mut self, item: <Self::Item as IterItem>::ItemRef<'_>);
469 fn report(&mut self, table_id: TableId, metrics: &MonitoredStorageMetrics);
470}
471
472const MAX_FLUSH_TIMES: usize = 64;
473
474struct StateStoreIterStatsInner {
475 pub iter_init_duration: Duration,
476 pub iter_scan_time: Instant,
477 pub total_items: usize,
478 pub total_size: usize,
479}
480
481impl StateStoreIterStatsInner {
482 fn new(iter_init_duration: Duration) -> Self {
483 Self {
484 iter_init_duration,
485 iter_scan_time: Instant::now(),
486 total_items: 0,
487 total_size: 0,
488 }
489 }
490}
491
492pub(crate) struct MonitoredStateStoreIterStats<S: StateStoreIterStatsTrait> {
493 pub inner: S,
494 pub table_id: TableId,
495 pub metrics: Arc<MonitoredStorageMetrics>,
496}
497
498impl<S: StateStoreIterStatsTrait> Drop for MonitoredStateStoreIterStats<S> {
499 fn drop(&mut self) {
500 self.inner.report(self.table_id, &self.metrics)
501 }
502}
503
504pub(crate) struct StateStoreIterStats {
505 inner: StateStoreIterStatsInner,
506}
507
508impl StateStoreIterStats {
509 fn for_table_metrics(
510 table_id: TableId,
511 global_metrics: &MonitoredStorageMetrics,
512 f: impl FnOnce(&mut LocalIterMetrics),
513 ) {
514 thread_local!(static LOCAL_ITER_METRICS: RefCell<HashMap<TableId, LocalIterMetrics>> = RefCell::new(HashMap::default()));
515 LOCAL_ITER_METRICS.with_borrow_mut(|local_metrics| {
516 let table_metrics = local_metrics.entry(table_id).or_insert_with(|| {
517 let table_label = table_id.to_string();
518 global_metrics.local_iter_metrics(&table_label)
519 });
520 f(table_metrics)
521 });
522 }
523}
524
525impl StateStoreIterStatsTrait for StateStoreIterStats {
526 type Item = StateStoreKeyedRow;
527
528 fn new(
529 table_id: TableId,
530 metrics: &MonitoredStorageMetrics,
531 iter_init_duration: Duration,
532 ) -> Self {
533 Self::for_table_metrics(table_id, metrics, |metrics| {
534 metrics.inner.iter_in_progress_counts.inc();
535 });
536 Self {
537 inner: StateStoreIterStatsInner::new(iter_init_duration),
538 }
539 }
540
541 fn observe(&mut self, (key, value): StateStoreKeyedRowRef<'_>) {
542 self.inner.total_items += 1;
543 self.inner.total_size += key.encoded_len() + value.len();
544 }
545
546 fn report(&mut self, table_id: TableId, metrics: &MonitoredStorageMetrics) {
547 Self::for_table_metrics(table_id, metrics, |table_metrics| {
548 self.inner.apply_to_local(&mut table_metrics.inner);
549 table_metrics.may_flush();
550 });
551 }
552}
553
554impl StateStoreIterStatsInner {
555 fn apply_to_local(&self, table_metrics: &mut LocalIterMetricsInner) {
556 {
557 let iter_scan_duration = self.iter_scan_time.elapsed();
558 table_metrics
559 .iter_scan_duration
560 .observe(iter_scan_duration.as_secs_f64());
561 table_metrics
562 .iter_init_duration
563 .observe(self.iter_init_duration.as_secs_f64());
564 table_metrics.iter_counts.inc();
565 table_metrics.iter_item.observe(self.total_items as f64);
566 table_metrics.iter_size.observe(self.total_size as f64);
567 table_metrics.iter_in_progress_counts.dec();
568 }
569 }
570}
571
572pub(crate) struct StateStoreIterLogStats {
573 inner: StateStoreIterStatsInner,
574 insert_count: u64,
575 update_count: u64,
576 delete_count: u64,
577}
578
579impl StateStoreIterLogStats {
580 fn for_table_metrics(
581 table_id: TableId,
582 global_metrics: &MonitoredStorageMetrics,
583 f: impl FnOnce(&mut LocalIterLogMetrics),
584 ) {
585 thread_local!(static LOCAL_ITER_LOG_METRICS: RefCell<HashMap<TableId, LocalIterLogMetrics>> = RefCell::new(HashMap::default()));
586 LOCAL_ITER_LOG_METRICS.with_borrow_mut(|local_metrics| {
587 let table_metrics = local_metrics.entry(table_id).or_insert_with(|| {
588 let table_label = table_id.to_string();
589 global_metrics.local_iter_log_metrics(&table_label)
590 });
591 f(table_metrics)
592 });
593 }
594}
595
596impl StateStoreIterStatsTrait for StateStoreIterLogStats {
597 type Item = StateStoreReadLogItem;
598
599 fn new(
600 table_id: TableId,
601 metrics: &MonitoredStorageMetrics,
602 iter_init_duration: Duration,
603 ) -> Self {
604 Self::for_table_metrics(table_id, metrics, |metrics| {
605 metrics.iter_metrics.iter_in_progress_counts.inc();
606 });
607 Self {
608 inner: StateStoreIterStatsInner::new(iter_init_duration),
609 insert_count: 0,
610 update_count: 0,
611 delete_count: 0,
612 }
613 }
614
615 fn observe(&mut self, (key, change_log): StateStoreReadLogItemRef<'_>) {
616 self.inner.total_items += 1;
617 let value_len = match change_log {
618 ChangeLogValue::Insert(value) => {
619 self.insert_count += 1;
620 value.len()
621 }
622 ChangeLogValue::Update {
623 old_value,
624 new_value,
625 } => {
626 self.update_count += 1;
627 old_value.len() + new_value.len()
628 }
629 ChangeLogValue::Delete(value) => {
630 self.delete_count += 1;
631 value.len()
632 }
633 };
634 self.inner.total_size += key.len() + value_len;
635 }
636
637 fn report(&mut self, table_id: TableId, metrics: &MonitoredStorageMetrics) {
638 Self::for_table_metrics(table_id, metrics, |table_metrics| {
639 self.inner.apply_to_local(&mut table_metrics.iter_metrics);
640 table_metrics.insert_count.inc_by(self.insert_count);
641 table_metrics.update_count.inc_by(self.update_count);
642 table_metrics.delete_count.inc_by(self.delete_count);
643 table_metrics.may_flush();
644 });
645 }
646}
647
648pub(crate) struct MonitoredStateStoreGetStats {
649 pub get_duration: Instant,
650 pub get_key_size: usize,
651 pub get_value_size: usize,
652 pub table_id: TableId,
653 pub metrics: Arc<MonitoredStorageMetrics>,
654}
655
656impl MonitoredStateStoreGetStats {
657 pub(crate) fn new(table_id: TableId, metrics: Arc<MonitoredStorageMetrics>) -> Self {
658 Self {
659 get_duration: Instant::now(),
660 get_key_size: 0,
661 get_value_size: 0,
662 table_id,
663 metrics,
664 }
665 }
666
667 pub(crate) fn report(&self) {
668 thread_local!(static LOCAL_GET_METRICS: RefCell<HashMap<TableId, LocalGetMetrics>> = RefCell::new(HashMap::default()));
669 LOCAL_GET_METRICS.with_borrow_mut(|local_metrics| {
670 let table_metrics = local_metrics.entry(self.table_id).or_insert_with(|| {
671 let table_label = self.table_id.to_string();
672 self.metrics.local_get_metrics(&table_label)
673 });
674 let get_duration = self.get_duration.elapsed();
675 table_metrics
676 .get_duration
677 .observe(get_duration.as_secs_f64());
678 table_metrics.get_key_size.observe(self.get_key_size as _);
679 if self.get_value_size > 0 {
680 table_metrics
681 .get_value_size
682 .observe(self.get_value_size as _);
683 }
684 table_metrics.may_flush();
685 });
686 }
687}