1use std::cell::RefCell;
16use std::collections::HashMap;
17use std::sync::{Arc, OnceLock};
18use std::time::{Duration, Instant};
19
20use prometheus::{
21 Histogram, Registry, exponential_buckets, histogram_opts, linear_buckets,
22 register_histogram_with_registry,
23};
24use risingwave_common::config::MetricLevel;
25use risingwave_common::metrics::{
26 LabelGuardedIntCounterVec, LabelGuardedIntGauge, LabelGuardedLocalHistogram,
27 LabelGuardedLocalIntCounter, RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec,
28 RelabeledGuardedIntGaugeVec,
29};
30use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
31use risingwave_common::{
32 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
33 register_guarded_int_gauge_vec_with_registry,
34};
35
36use crate::store::{
37 ChangeLogValue, IterItem, StateStoreKeyedRow, StateStoreKeyedRowRef, StateStoreReadLogItem,
38 StateStoreReadLogItemRef,
39};
40
41#[derive(Debug, Clone)]
43pub struct MonitoredStorageMetrics {
44 pub get_duration: RelabeledGuardedHistogramVec<1>,
45 pub get_key_size: RelabeledGuardedHistogramVec<1>,
46 pub get_value_size: RelabeledGuardedHistogramVec<1>,
47
48 pub iter_size: RelabeledGuardedHistogramVec<2>,
50 pub iter_item: RelabeledGuardedHistogramVec<2>,
51 pub iter_init_duration: RelabeledGuardedHistogramVec<2>,
52 pub iter_scan_duration: RelabeledGuardedHistogramVec<2>,
53 pub iter_counts: RelabeledGuardedIntCounterVec<2>,
54 pub iter_in_progress_counts: RelabeledGuardedIntGaugeVec<2>,
55
56 pub iter_log_op_type_counts: LabelGuardedIntCounterVec<2>,
58
59 pub sync_duration: Histogram,
60 pub sync_size: Histogram,
61}
62
63pub static GLOBAL_STORAGE_METRICS: OnceLock<MonitoredStorageMetrics> = OnceLock::new();
64
65pub fn global_storage_metrics(metric_level: MetricLevel) -> MonitoredStorageMetrics {
66 GLOBAL_STORAGE_METRICS
67 .get_or_init(|| MonitoredStorageMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
68 .clone()
69}
70
71impl MonitoredStorageMetrics {
72 pub fn new(registry: &Registry, metric_level: MetricLevel) -> Self {
73 let size_buckets = exponential_buckets(256.0, 16.0, 8).unwrap();
75 let time_buckets = exponential_buckets(0.01, 10.0, 7).unwrap();
77 let opts = histogram_opts!(
79 "state_store_get_key_size",
80 "Total key bytes of get that have been issued to state store",
81 size_buckets.clone()
82 );
83 let get_key_size =
84 register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
85 let get_key_size = RelabeledGuardedHistogramVec::with_metric_level(
86 MetricLevel::Debug,
87 get_key_size,
88 metric_level,
89 );
90
91 let opts = histogram_opts!(
92 "state_store_get_value_size",
93 "Total value bytes that have been requested from remote storage",
94 size_buckets.clone()
95 );
96 let get_value_size =
97 register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
98 let get_value_size = RelabeledGuardedHistogramVec::with_metric_level(
99 MetricLevel::Debug,
100 get_value_size,
101 metric_level,
102 );
103
104 let mut buckets = exponential_buckets(0.000004, 2.0, 4).unwrap(); buckets.extend(linear_buckets(0.00006, 0.00004, 5).unwrap()); buckets.extend(linear_buckets(0.0003, 0.0001, 3).unwrap()); buckets.extend(exponential_buckets(0.001, 2.0, 5).unwrap()); buckets.extend(exponential_buckets(0.05, 4.0, 5).unwrap()); buckets.push(16.0); let mut state_store_read_time_buckets = exponential_buckets(0.001, 10.0, 5).unwrap();
113 state_store_read_time_buckets.push(40.0);
114 state_store_read_time_buckets.push(100.0);
115
116 let get_duration_opts = histogram_opts!(
117 "state_store_get_duration",
118 "Total latency of get that have been issued to state store",
119 state_store_read_time_buckets.clone(),
120 );
121 let get_duration = register_guarded_histogram_vec_with_registry!(
122 get_duration_opts,
123 &["table_id"],
124 registry
125 )
126 .unwrap();
127 let get_duration = RelabeledGuardedHistogramVec::with_metric_level(
128 MetricLevel::Critical,
129 get_duration,
130 metric_level,
131 );
132
133 let opts = histogram_opts!(
134 "state_store_iter_size",
135 "Total bytes gotten from state store scan(), for calculating read throughput",
136 size_buckets.clone()
137 );
138 let iter_size = register_guarded_histogram_vec_with_registry!(
139 opts,
140 &["table_id", "iter_type"],
141 registry
142 )
143 .unwrap();
144 let iter_size = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
145 MetricLevel::Debug,
146 iter_size,
147 metric_level,
148 1,
149 );
150
151 let opts = histogram_opts!(
152 "state_store_iter_item",
153 "Total bytes gotten from state store scan(), for calculating read throughput",
154 size_buckets.clone(),
155 );
156 let iter_item = register_guarded_histogram_vec_with_registry!(
157 opts,
158 &["table_id", "iter_type"],
159 registry
160 )
161 .unwrap();
162 let iter_item = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
163 MetricLevel::Debug,
164 iter_item,
165 metric_level,
166 1,
167 );
168
169 let opts = histogram_opts!(
170 "state_store_iter_init_duration",
171 "Histogram of the time spent on iterator initialization.",
172 state_store_read_time_buckets.clone(),
173 );
174 let iter_init_duration = register_guarded_histogram_vec_with_registry!(
175 opts,
176 &["table_id", "iter_type"],
177 registry
178 )
179 .unwrap();
180 let iter_init_duration = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
181 MetricLevel::Critical,
182 iter_init_duration,
183 metric_level,
184 1,
185 );
186
187 let opts = histogram_opts!(
188 "state_store_iter_scan_duration",
189 "Histogram of the time spent on iterator scanning.",
190 state_store_read_time_buckets.clone(),
191 );
192 let iter_scan_duration = register_guarded_histogram_vec_with_registry!(
193 opts,
194 &["table_id", "iter_type"],
195 registry
196 )
197 .unwrap();
198 let iter_scan_duration = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
199 MetricLevel::Critical,
200 iter_scan_duration,
201 metric_level,
202 1,
203 );
204
205 let iter_counts = register_guarded_int_counter_vec_with_registry!(
206 "state_store_iter_counts",
207 "Total number of iter that have been issued to state store",
208 &["table_id", "iter_type"],
209 registry
210 )
211 .unwrap();
212 let iter_counts = RelabeledGuardedIntCounterVec::with_metric_level_relabel_n(
213 MetricLevel::Debug,
214 iter_counts,
215 metric_level,
216 1,
217 );
218
219 let iter_in_progress_counts = register_guarded_int_gauge_vec_with_registry!(
220 "state_store_iter_in_progress_counts",
221 "Total number of existing iter",
222 &["table_id", "iter_type"],
223 registry
224 )
225 .unwrap();
226 let iter_in_progress_counts = RelabeledGuardedIntGaugeVec::with_metric_level_relabel_n(
227 MetricLevel::Debug,
228 iter_in_progress_counts,
229 metric_level,
230 1,
231 );
232
233 let iter_log_op_type_counts = register_guarded_int_counter_vec_with_registry!(
234 "state_store_iter_log_op_type_counts",
235 "Counter of each op type in iter_log",
236 &["table_id", "op_type"],
237 registry
238 )
239 .unwrap();
240
241 let opts = histogram_opts!(
242 "state_store_sync_duration",
243 "Histogram of time spent on compacting shared buffer to remote storage",
244 time_buckets,
245 );
246 let sync_duration = register_histogram_with_registry!(opts, registry).unwrap();
247
248 let opts = histogram_opts!(
249 "state_store_sync_size",
250 "Total size of upload to l0 every epoch",
251 size_buckets,
252 );
253 let sync_size = register_histogram_with_registry!(opts, registry).unwrap();
254
255 Self {
256 get_duration,
257 get_key_size,
258 get_value_size,
259 iter_size,
260 iter_item,
261 iter_init_duration,
262 iter_scan_duration,
263 iter_counts,
264 iter_in_progress_counts,
265 iter_log_op_type_counts,
266 sync_duration,
267 sync_size,
268 }
269 }
270
271 pub fn unused() -> Self {
272 global_storage_metrics(MetricLevel::Disabled)
273 }
274
275 fn local_iter_metrics(&self, table_label: &str) -> LocalIterMetrics {
276 let inner = self.new_local_iter_metrics_inner(table_label, "iter");
277 LocalIterMetrics {
278 inner,
279 report_count: 0,
280 }
281 }
282
283 fn new_local_iter_metrics_inner(
284 &self,
285 table_label: &str,
286 iter_type: &str,
287 ) -> LocalIterMetricsInner {
288 let iter_init_duration = self
289 .iter_init_duration
290 .with_guarded_label_values(&[table_label, iter_type])
291 .local();
292 let iter_counts = self
293 .iter_counts
294 .with_guarded_label_values(&[table_label, iter_type])
295 .local();
296 let iter_scan_duration = self
297 .iter_scan_duration
298 .with_guarded_label_values(&[table_label, iter_type])
299 .local();
300 let iter_item = self
301 .iter_item
302 .with_guarded_label_values(&[table_label, iter_type])
303 .local();
304 let iter_size = self
305 .iter_size
306 .with_guarded_label_values(&[table_label, iter_type])
307 .local();
308 let iter_in_progress_counts = self
309 .iter_in_progress_counts
310 .with_guarded_label_values(&[table_label, iter_type]);
311
312 LocalIterMetricsInner {
313 iter_init_duration,
314 iter_scan_duration,
315 iter_counts,
316 iter_item,
317 iter_size,
318 iter_in_progress_counts,
319 }
320 }
321
322 fn local_iter_log_metrics(&self, table_label: &str) -> LocalIterLogMetrics {
323 let iter_metrics = self.new_local_iter_metrics_inner(table_label, "iter_log");
324 let insert_count = self
325 .iter_log_op_type_counts
326 .with_guarded_label_values(&[table_label, "INSERT"])
327 .local();
328 let update_count = self
329 .iter_log_op_type_counts
330 .with_guarded_label_values(&[table_label, "UPDATE"])
331 .local();
332 let delete_count = self
333 .iter_log_op_type_counts
334 .with_guarded_label_values(&[table_label, "DELETE"])
335 .local();
336 LocalIterLogMetrics {
337 iter_metrics,
338 insert_count,
339 update_count,
340 delete_count,
341 report_count: 0,
342 }
343 }
344
345 fn local_get_metrics(&self, table_label: &str) -> LocalGetMetrics {
346 let get_duration = self
347 .get_duration
348 .with_guarded_label_values(&[table_label])
349 .local();
350 let get_key_size = self
351 .get_key_size
352 .with_guarded_label_values(&[table_label])
353 .local();
354 let get_value_size = self
355 .get_value_size
356 .with_guarded_label_values(&[table_label])
357 .local();
358
359 LocalGetMetrics {
360 get_duration,
361 get_key_size,
362 get_value_size,
363 report_count: 0,
364 }
365 }
366}
367
368struct LocalIterMetricsInner {
369 iter_init_duration: LabelGuardedLocalHistogram<2>,
370 iter_scan_duration: LabelGuardedLocalHistogram<2>,
371 iter_counts: LabelGuardedLocalIntCounter<2>,
372 iter_item: LabelGuardedLocalHistogram<2>,
373 iter_size: LabelGuardedLocalHistogram<2>,
374 iter_in_progress_counts: LabelGuardedIntGauge<2>,
375}
376
377struct LocalIterMetrics {
378 inner: LocalIterMetricsInner,
379 report_count: usize,
380}
381
382impl LocalIterMetrics {
383 fn may_flush(&mut self) {
384 self.report_count += 1;
385 if self.report_count > MAX_FLUSH_TIMES {
386 self.inner.flush();
387 self.report_count = 0;
388 }
389 }
390}
391
392impl LocalIterMetricsInner {
393 fn flush(&mut self) {
394 self.iter_scan_duration.flush();
395 self.iter_init_duration.flush();
396 self.iter_counts.flush();
397 self.iter_item.flush();
398 self.iter_size.flush();
399 }
400}
401
402struct LocalGetMetrics {
403 get_duration: LabelGuardedLocalHistogram<1>,
404 get_key_size: LabelGuardedLocalHistogram<1>,
405 get_value_size: LabelGuardedLocalHistogram<1>,
406 report_count: usize,
407}
408
409impl LocalGetMetrics {
410 fn may_flush(&mut self) {
411 self.report_count += 1;
412 if self.report_count > MAX_FLUSH_TIMES {
413 self.get_duration.flush();
414 self.get_key_size.flush();
415 self.get_value_size.flush();
416 self.report_count = 0;
417 }
418 }
419}
420
421struct LocalIterLogMetrics {
422 iter_metrics: LocalIterMetricsInner,
423 insert_count: LabelGuardedLocalIntCounter<2>,
424 update_count: LabelGuardedLocalIntCounter<2>,
425 delete_count: LabelGuardedLocalIntCounter<2>,
426 report_count: usize,
427}
428
429impl LocalIterLogMetrics {
430 fn may_flush(&mut self) {
431 self.report_count += 1;
432 if self.report_count > MAX_FLUSH_TIMES {
433 self.iter_metrics.flush();
434 self.insert_count.flush();
435 self.update_count.flush();
436 self.delete_count.flush();
437 self.report_count = 0;
438 }
439 }
440}
441
442pub(crate) trait StateStoreIterStatsTrait: Send {
443 type Item: IterItem;
444 fn new(table_id: u32, metrics: &MonitoredStorageMetrics, iter_init_duration: Duration) -> Self;
445 fn observe(&mut self, item: <Self::Item as IterItem>::ItemRef<'_>);
446 fn report(&mut self, table_id: u32, metrics: &MonitoredStorageMetrics);
447}
448
449const MAX_FLUSH_TIMES: usize = 64;
450
451struct StateStoreIterStatsInner {
452 pub iter_init_duration: Duration,
453 pub iter_scan_time: Instant,
454 pub total_items: usize,
455 pub total_size: usize,
456}
457
458impl StateStoreIterStatsInner {
459 fn new(iter_init_duration: Duration) -> Self {
460 Self {
461 iter_init_duration,
462 iter_scan_time: Instant::now(),
463 total_items: 0,
464 total_size: 0,
465 }
466 }
467}
468
469pub(crate) struct MonitoredStateStoreIterStats<S: StateStoreIterStatsTrait> {
470 pub inner: S,
471 pub table_id: u32,
472 pub metrics: Arc<MonitoredStorageMetrics>,
473}
474
475impl<S: StateStoreIterStatsTrait> Drop for MonitoredStateStoreIterStats<S> {
476 fn drop(&mut self) {
477 self.inner.report(self.table_id, &self.metrics)
478 }
479}
480
481pub(crate) struct StateStoreIterStats {
482 inner: StateStoreIterStatsInner,
483}
484
485impl StateStoreIterStats {
486 fn for_table_metrics(
487 table_id: u32,
488 global_metrics: &MonitoredStorageMetrics,
489 f: impl FnOnce(&mut LocalIterMetrics),
490 ) {
491 thread_local!(static LOCAL_ITER_METRICS: RefCell<HashMap<u32, LocalIterMetrics>> = RefCell::new(HashMap::default()));
492 LOCAL_ITER_METRICS.with_borrow_mut(|local_metrics| {
493 let table_metrics = local_metrics.entry(table_id).or_insert_with(|| {
494 let table_label = table_id.to_string();
495 global_metrics.local_iter_metrics(&table_label)
496 });
497 f(table_metrics)
498 });
499 }
500}
501
502impl StateStoreIterStatsTrait for StateStoreIterStats {
503 type Item = StateStoreKeyedRow;
504
505 fn new(table_id: u32, metrics: &MonitoredStorageMetrics, iter_init_duration: Duration) -> Self {
506 Self::for_table_metrics(table_id, metrics, |metrics| {
507 metrics.inner.iter_in_progress_counts.inc();
508 });
509 Self {
510 inner: StateStoreIterStatsInner::new(iter_init_duration),
511 }
512 }
513
514 fn observe(&mut self, (key, value): StateStoreKeyedRowRef<'_>) {
515 self.inner.total_items += 1;
516 self.inner.total_size += key.encoded_len() + value.len();
517 }
518
519 fn report(&mut self, table_id: u32, metrics: &MonitoredStorageMetrics) {
520 Self::for_table_metrics(table_id, metrics, |table_metrics| {
521 self.inner.apply_to_local(&mut table_metrics.inner);
522 table_metrics.may_flush();
523 });
524 }
525}
526
527impl StateStoreIterStatsInner {
528 fn apply_to_local(&self, table_metrics: &mut LocalIterMetricsInner) {
529 {
530 let iter_scan_duration = self.iter_scan_time.elapsed();
531 table_metrics
532 .iter_scan_duration
533 .observe(iter_scan_duration.as_secs_f64());
534 table_metrics
535 .iter_init_duration
536 .observe(self.iter_init_duration.as_secs_f64());
537 table_metrics.iter_counts.inc();
538 table_metrics.iter_item.observe(self.total_items as f64);
539 table_metrics.iter_size.observe(self.total_size as f64);
540 table_metrics.iter_in_progress_counts.dec();
541 }
542 }
543}
544
545pub(crate) struct StateStoreIterLogStats {
546 inner: StateStoreIterStatsInner,
547 insert_count: u64,
548 update_count: u64,
549 delete_count: u64,
550}
551
552impl StateStoreIterLogStats {
553 fn for_table_metrics(
554 table_id: u32,
555 global_metrics: &MonitoredStorageMetrics,
556 f: impl FnOnce(&mut LocalIterLogMetrics),
557 ) {
558 thread_local!(static LOCAL_ITER_LOG_METRICS: RefCell<HashMap<u32, LocalIterLogMetrics>> = RefCell::new(HashMap::default()));
559 LOCAL_ITER_LOG_METRICS.with_borrow_mut(|local_metrics| {
560 let table_metrics = local_metrics.entry(table_id).or_insert_with(|| {
561 let table_label = table_id.to_string();
562 global_metrics.local_iter_log_metrics(&table_label)
563 });
564 f(table_metrics)
565 });
566 }
567}
568
569impl StateStoreIterStatsTrait for StateStoreIterLogStats {
570 type Item = StateStoreReadLogItem;
571
572 fn new(table_id: u32, metrics: &MonitoredStorageMetrics, iter_init_duration: Duration) -> Self {
573 Self::for_table_metrics(table_id, metrics, |metrics| {
574 metrics.iter_metrics.iter_in_progress_counts.inc();
575 });
576 Self {
577 inner: StateStoreIterStatsInner::new(iter_init_duration),
578 insert_count: 0,
579 update_count: 0,
580 delete_count: 0,
581 }
582 }
583
584 fn observe(&mut self, (key, change_log): StateStoreReadLogItemRef<'_>) {
585 self.inner.total_items += 1;
586 let value_len = match change_log {
587 ChangeLogValue::Insert(value) => {
588 self.insert_count += 1;
589 value.len()
590 }
591 ChangeLogValue::Update {
592 old_value,
593 new_value,
594 } => {
595 self.update_count += 1;
596 old_value.len() + new_value.len()
597 }
598 ChangeLogValue::Delete(value) => {
599 self.delete_count += 1;
600 value.len()
601 }
602 };
603 self.inner.total_size += key.len() + value_len;
604 }
605
606 fn report(&mut self, table_id: u32, metrics: &MonitoredStorageMetrics) {
607 Self::for_table_metrics(table_id, metrics, |table_metrics| {
608 self.inner.apply_to_local(&mut table_metrics.iter_metrics);
609 table_metrics.insert_count.inc_by(self.insert_count);
610 table_metrics.update_count.inc_by(self.update_count);
611 table_metrics.delete_count.inc_by(self.delete_count);
612 table_metrics.may_flush();
613 });
614 }
615}
616
617pub(crate) struct MonitoredStateStoreGetStats {
618 pub get_duration: Instant,
619 pub get_key_size: usize,
620 pub get_value_size: usize,
621 pub table_id: u32,
622 pub metrics: Arc<MonitoredStorageMetrics>,
623}
624
625impl MonitoredStateStoreGetStats {
626 pub(crate) fn new(table_id: u32, metrics: Arc<MonitoredStorageMetrics>) -> Self {
627 Self {
628 get_duration: Instant::now(),
629 get_key_size: 0,
630 get_value_size: 0,
631 table_id,
632 metrics,
633 }
634 }
635
636 pub(crate) fn report(&self) {
637 thread_local!(static LOCAL_GET_METRICS: RefCell<HashMap<u32, LocalGetMetrics>> = RefCell::new(HashMap::default()));
638 LOCAL_GET_METRICS.with_borrow_mut(|local_metrics| {
639 let table_metrics = local_metrics.entry(self.table_id).or_insert_with(|| {
640 let table_label = self.table_id.to_string();
641 self.metrics.local_get_metrics(&table_label)
642 });
643 let get_duration = self.get_duration.elapsed();
644 table_metrics
645 .get_duration
646 .observe(get_duration.as_secs_f64());
647 table_metrics.get_key_size.observe(self.get_key_size as _);
648 if self.get_value_size > 0 {
649 table_metrics
650 .get_value_size
651 .observe(self.get_value_size as _);
652 }
653 table_metrics.may_flush();
654 });
655 }
656}