1use std::cell::RefCell;
16use std::collections::HashMap;
17use std::sync::{Arc, OnceLock};
18use std::time::{Duration, Instant};
19
20use prometheus::{
21 Histogram, Registry, exponential_buckets, histogram_opts, linear_buckets,
22 register_histogram_with_registry,
23};
24use risingwave_common::config::MetricLevel;
25use risingwave_common::id::TableId;
26use risingwave_common::metrics::{
27 LabelGuardedIntCounterVec, LabelGuardedIntGauge, LabelGuardedLocalHistogram,
28 LabelGuardedLocalIntCounter, RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec,
29 RelabeledGuardedIntGaugeVec,
30};
31use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
32use risingwave_common::{
33 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
34 register_guarded_int_gauge_vec_with_registry,
35};
36
37use crate::store::{
38 ChangeLogValue, IterItem, StateStoreKeyedRow, StateStoreKeyedRowRef, StateStoreReadLogItem,
39 StateStoreReadLogItemRef,
40};
41
42#[derive(Debug, Clone)]
44pub struct MonitoredStorageMetrics {
45 pub get_duration: RelabeledGuardedHistogramVec,
46 pub get_key_size: RelabeledGuardedHistogramVec,
47 pub get_value_size: RelabeledGuardedHistogramVec,
48
49 pub iter_size: RelabeledGuardedHistogramVec,
51 pub iter_item: RelabeledGuardedHistogramVec,
52 pub iter_init_duration: RelabeledGuardedHistogramVec,
53 pub iter_scan_duration: RelabeledGuardedHistogramVec,
54 pub iter_counts: RelabeledGuardedIntCounterVec,
55 pub iter_in_progress_counts: RelabeledGuardedIntGaugeVec,
56
57 pub iter_log_op_type_counts: LabelGuardedIntCounterVec,
59
60 pub sync_duration: Histogram,
61 pub sync_size: Histogram,
62}
63
64pub static GLOBAL_STORAGE_METRICS: OnceLock<MonitoredStorageMetrics> = OnceLock::new();
65
66pub fn global_storage_metrics(metric_level: MetricLevel) -> MonitoredStorageMetrics {
67 GLOBAL_STORAGE_METRICS
68 .get_or_init(|| MonitoredStorageMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
69 .clone()
70}
71
72impl MonitoredStorageMetrics {
73 pub fn new(registry: &Registry, metric_level: MetricLevel) -> Self {
74 let size_buckets = exponential_buckets(256.0, 16.0, 8).unwrap();
76 let time_buckets = exponential_buckets(0.01, 10.0, 7).unwrap();
78 let opts = histogram_opts!(
80 "state_store_get_key_size",
81 "Total key bytes of get that have been issued to state store",
82 size_buckets.clone()
83 );
84 let get_key_size =
85 register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
86 let get_key_size = RelabeledGuardedHistogramVec::with_metric_level(
87 MetricLevel::Debug,
88 get_key_size,
89 metric_level,
90 );
91
92 let opts = histogram_opts!(
93 "state_store_get_value_size",
94 "Total value bytes that have been requested from remote storage",
95 size_buckets.clone()
96 );
97 let get_value_size =
98 register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
99 let get_value_size = RelabeledGuardedHistogramVec::with_metric_level(
100 MetricLevel::Debug,
101 get_value_size,
102 metric_level,
103 );
104
105 let mut buckets = exponential_buckets(0.000004, 2.0, 4).unwrap(); buckets.extend(linear_buckets(0.00006, 0.00004, 5).unwrap()); buckets.extend(linear_buckets(0.0003, 0.0001, 3).unwrap()); buckets.extend(exponential_buckets(0.001, 2.0, 5).unwrap()); buckets.extend(exponential_buckets(0.05, 4.0, 5).unwrap()); buckets.push(16.0); let mut state_store_read_time_buckets = exponential_buckets(0.001, 10.0, 5).unwrap();
114 state_store_read_time_buckets.push(40.0);
115 state_store_read_time_buckets.push(100.0);
116
117 let get_duration_opts = histogram_opts!(
118 "state_store_get_duration",
119 "Total latency of get that have been issued to state store",
120 state_store_read_time_buckets.clone(),
121 );
122 let get_duration = register_guarded_histogram_vec_with_registry!(
123 get_duration_opts,
124 &["table_id"],
125 registry
126 )
127 .unwrap();
128 let get_duration = RelabeledGuardedHistogramVec::with_metric_level(
129 MetricLevel::Critical,
130 get_duration,
131 metric_level,
132 );
133
134 let opts = histogram_opts!(
135 "state_store_iter_size",
136 "Total bytes gotten from state store scan(), for calculating read throughput",
137 size_buckets.clone()
138 );
139 let iter_size = register_guarded_histogram_vec_with_registry!(
140 opts,
141 &["table_id", "iter_type"],
142 registry
143 )
144 .unwrap();
145 let iter_size = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
146 MetricLevel::Debug,
147 iter_size,
148 metric_level,
149 1,
150 );
151
152 let opts = histogram_opts!(
153 "state_store_iter_item",
154 "Total bytes gotten from state store scan(), for calculating read throughput",
155 size_buckets.clone(),
156 );
157 let iter_item = register_guarded_histogram_vec_with_registry!(
158 opts,
159 &["table_id", "iter_type"],
160 registry
161 )
162 .unwrap();
163 let iter_item = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
164 MetricLevel::Debug,
165 iter_item,
166 metric_level,
167 1,
168 );
169
170 let opts = histogram_opts!(
171 "state_store_iter_init_duration",
172 "Histogram of the time spent on iterator initialization.",
173 state_store_read_time_buckets.clone(),
174 );
175 let iter_init_duration = register_guarded_histogram_vec_with_registry!(
176 opts,
177 &["table_id", "iter_type"],
178 registry
179 )
180 .unwrap();
181 let iter_init_duration = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
182 MetricLevel::Critical,
183 iter_init_duration,
184 metric_level,
185 1,
186 );
187
188 let opts = histogram_opts!(
189 "state_store_iter_scan_duration",
190 "Histogram of the time spent on iterator scanning.",
191 state_store_read_time_buckets.clone(),
192 );
193 let iter_scan_duration = register_guarded_histogram_vec_with_registry!(
194 opts,
195 &["table_id", "iter_type"],
196 registry
197 )
198 .unwrap();
199 let iter_scan_duration = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
200 MetricLevel::Critical,
201 iter_scan_duration,
202 metric_level,
203 1,
204 );
205
206 let iter_counts = register_guarded_int_counter_vec_with_registry!(
207 "state_store_iter_counts",
208 "Total number of iter that have been issued to state store",
209 &["table_id", "iter_type"],
210 registry
211 )
212 .unwrap();
213 let iter_counts = RelabeledGuardedIntCounterVec::with_metric_level_relabel_n(
214 MetricLevel::Debug,
215 iter_counts,
216 metric_level,
217 1,
218 );
219
220 let iter_in_progress_counts = register_guarded_int_gauge_vec_with_registry!(
221 "state_store_iter_in_progress_counts",
222 "Total number of existing iter",
223 &["table_id", "iter_type"],
224 registry
225 )
226 .unwrap();
227 let iter_in_progress_counts = RelabeledGuardedIntGaugeVec::with_metric_level_relabel_n(
228 MetricLevel::Debug,
229 iter_in_progress_counts,
230 metric_level,
231 1,
232 );
233
234 let iter_log_op_type_counts = register_guarded_int_counter_vec_with_registry!(
235 "state_store_iter_log_op_type_counts",
236 "Counter of each op type in iter_log",
237 &["table_id", "op_type"],
238 registry
239 )
240 .unwrap();
241
242 let opts = histogram_opts!(
243 "state_store_sync_duration",
244 "Histogram of time spent on compacting shared buffer to remote storage",
245 time_buckets,
246 );
247 let sync_duration = register_histogram_with_registry!(opts, registry).unwrap();
248
249 let opts = histogram_opts!(
250 "state_store_sync_size",
251 "Total size of upload to l0 every epoch",
252 size_buckets,
253 );
254 let sync_size = register_histogram_with_registry!(opts, registry).unwrap();
255
256 Self {
257 get_duration,
258 get_key_size,
259 get_value_size,
260 iter_size,
261 iter_item,
262 iter_init_duration,
263 iter_scan_duration,
264 iter_counts,
265 iter_in_progress_counts,
266 iter_log_op_type_counts,
267 sync_duration,
268 sync_size,
269 }
270 }
271
272 pub fn unused() -> Self {
273 global_storage_metrics(MetricLevel::Disabled)
274 }
275
276 fn local_iter_metrics(&self, table_label: &str) -> LocalIterMetrics {
277 let inner = self.new_local_iter_metrics_inner(table_label, "iter");
278 LocalIterMetrics {
279 inner,
280 report_count: 0,
281 }
282 }
283
284 fn new_local_iter_metrics_inner(
285 &self,
286 table_label: &str,
287 iter_type: &str,
288 ) -> LocalIterMetricsInner {
289 let iter_init_duration = self
290 .iter_init_duration
291 .with_guarded_label_values(&[table_label, iter_type])
292 .local();
293 let iter_counts = self
294 .iter_counts
295 .with_guarded_label_values(&[table_label, iter_type])
296 .local();
297 let iter_scan_duration = self
298 .iter_scan_duration
299 .with_guarded_label_values(&[table_label, iter_type])
300 .local();
301 let iter_item = self
302 .iter_item
303 .with_guarded_label_values(&[table_label, iter_type])
304 .local();
305 let iter_size = self
306 .iter_size
307 .with_guarded_label_values(&[table_label, iter_type])
308 .local();
309 let iter_in_progress_counts = self
310 .iter_in_progress_counts
311 .with_guarded_label_values(&[table_label, iter_type]);
312
313 LocalIterMetricsInner {
314 iter_init_duration,
315 iter_scan_duration,
316 iter_counts,
317 iter_item,
318 iter_size,
319 iter_in_progress_counts,
320 }
321 }
322
323 fn local_iter_log_metrics(&self, table_label: &str) -> LocalIterLogMetrics {
324 let iter_metrics = self.new_local_iter_metrics_inner(table_label, "iter_log");
325 let insert_count = self
326 .iter_log_op_type_counts
327 .with_guarded_label_values(&[table_label, "INSERT"])
328 .local();
329 let update_count = self
330 .iter_log_op_type_counts
331 .with_guarded_label_values(&[table_label, "UPDATE"])
332 .local();
333 let delete_count = self
334 .iter_log_op_type_counts
335 .with_guarded_label_values(&[table_label, "DELETE"])
336 .local();
337 LocalIterLogMetrics {
338 iter_metrics,
339 insert_count,
340 update_count,
341 delete_count,
342 report_count: 0,
343 }
344 }
345
346 fn local_get_metrics(&self, table_label: &str) -> LocalGetMetrics {
347 let get_duration = self
348 .get_duration
349 .with_guarded_label_values(&[table_label])
350 .local();
351 let get_key_size = self
352 .get_key_size
353 .with_guarded_label_values(&[table_label])
354 .local();
355 let get_value_size = self
356 .get_value_size
357 .with_guarded_label_values(&[table_label])
358 .local();
359
360 LocalGetMetrics {
361 get_duration,
362 get_key_size,
363 get_value_size,
364 report_count: 0,
365 }
366 }
367}
368
369struct LocalIterMetricsInner {
370 iter_init_duration: LabelGuardedLocalHistogram,
371 iter_scan_duration: LabelGuardedLocalHistogram,
372 iter_counts: LabelGuardedLocalIntCounter,
373 iter_item: LabelGuardedLocalHistogram,
374 iter_size: LabelGuardedLocalHistogram,
375 iter_in_progress_counts: LabelGuardedIntGauge,
376}
377
378struct LocalIterMetrics {
379 inner: LocalIterMetricsInner,
380 report_count: usize,
381}
382
383impl LocalIterMetrics {
384 fn may_flush(&mut self) {
385 self.report_count += 1;
386 if self.report_count > MAX_FLUSH_TIMES {
387 self.inner.flush();
388 self.report_count = 0;
389 }
390 }
391}
392
393impl LocalIterMetricsInner {
394 fn flush(&mut self) {
395 self.iter_scan_duration.flush();
396 self.iter_init_duration.flush();
397 self.iter_counts.flush();
398 self.iter_item.flush();
399 self.iter_size.flush();
400 }
401}
402
403struct LocalGetMetrics {
404 get_duration: LabelGuardedLocalHistogram,
405 get_key_size: LabelGuardedLocalHistogram,
406 get_value_size: LabelGuardedLocalHistogram,
407 report_count: usize,
408}
409
410impl LocalGetMetrics {
411 fn may_flush(&mut self) {
412 self.report_count += 1;
413 if self.report_count > MAX_FLUSH_TIMES {
414 self.get_duration.flush();
415 self.get_key_size.flush();
416 self.get_value_size.flush();
417 self.report_count = 0;
418 }
419 }
420}
421
422struct LocalIterLogMetrics {
423 iter_metrics: LocalIterMetricsInner,
424 insert_count: LabelGuardedLocalIntCounter,
425 update_count: LabelGuardedLocalIntCounter,
426 delete_count: LabelGuardedLocalIntCounter,
427 report_count: usize,
428}
429
430impl LocalIterLogMetrics {
431 fn may_flush(&mut self) {
432 self.report_count += 1;
433 if self.report_count > MAX_FLUSH_TIMES {
434 self.iter_metrics.flush();
435 self.insert_count.flush();
436 self.update_count.flush();
437 self.delete_count.flush();
438 self.report_count = 0;
439 }
440 }
441}
442
443pub(crate) trait StateStoreIterStatsTrait: Send {
444 type Item: IterItem;
445 fn new(
446 table_id: TableId,
447 metrics: &MonitoredStorageMetrics,
448 iter_init_duration: Duration,
449 ) -> Self;
450 fn observe(&mut self, item: <Self::Item as IterItem>::ItemRef<'_>);
451 fn report(&mut self, table_id: TableId, metrics: &MonitoredStorageMetrics);
452}
453
454const MAX_FLUSH_TIMES: usize = 64;
455
456struct StateStoreIterStatsInner {
457 pub iter_init_duration: Duration,
458 pub iter_scan_time: Instant,
459 pub total_items: usize,
460 pub total_size: usize,
461}
462
463impl StateStoreIterStatsInner {
464 fn new(iter_init_duration: Duration) -> Self {
465 Self {
466 iter_init_duration,
467 iter_scan_time: Instant::now(),
468 total_items: 0,
469 total_size: 0,
470 }
471 }
472}
473
474pub(crate) struct MonitoredStateStoreIterStats<S: StateStoreIterStatsTrait> {
475 pub inner: S,
476 pub table_id: TableId,
477 pub metrics: Arc<MonitoredStorageMetrics>,
478}
479
480impl<S: StateStoreIterStatsTrait> Drop for MonitoredStateStoreIterStats<S> {
481 fn drop(&mut self) {
482 self.inner.report(self.table_id, &self.metrics)
483 }
484}
485
486pub(crate) struct StateStoreIterStats {
487 inner: StateStoreIterStatsInner,
488}
489
490impl StateStoreIterStats {
491 fn for_table_metrics(
492 table_id: TableId,
493 global_metrics: &MonitoredStorageMetrics,
494 f: impl FnOnce(&mut LocalIterMetrics),
495 ) {
496 thread_local!(static LOCAL_ITER_METRICS: RefCell<HashMap<TableId, LocalIterMetrics>> = RefCell::new(HashMap::default()));
497 LOCAL_ITER_METRICS.with_borrow_mut(|local_metrics| {
498 let table_metrics = local_metrics.entry(table_id).or_insert_with(|| {
499 let table_label = table_id.to_string();
500 global_metrics.local_iter_metrics(&table_label)
501 });
502 f(table_metrics)
503 });
504 }
505}
506
507impl StateStoreIterStatsTrait for StateStoreIterStats {
508 type Item = StateStoreKeyedRow;
509
510 fn new(
511 table_id: TableId,
512 metrics: &MonitoredStorageMetrics,
513 iter_init_duration: Duration,
514 ) -> Self {
515 Self::for_table_metrics(table_id, metrics, |metrics| {
516 metrics.inner.iter_in_progress_counts.inc();
517 });
518 Self {
519 inner: StateStoreIterStatsInner::new(iter_init_duration),
520 }
521 }
522
523 fn observe(&mut self, (key, value): StateStoreKeyedRowRef<'_>) {
524 self.inner.total_items += 1;
525 self.inner.total_size += key.encoded_len() + value.len();
526 }
527
528 fn report(&mut self, table_id: TableId, metrics: &MonitoredStorageMetrics) {
529 Self::for_table_metrics(table_id, metrics, |table_metrics| {
530 self.inner.apply_to_local(&mut table_metrics.inner);
531 table_metrics.may_flush();
532 });
533 }
534}
535
536impl StateStoreIterStatsInner {
537 fn apply_to_local(&self, table_metrics: &mut LocalIterMetricsInner) {
538 {
539 let iter_scan_duration = self.iter_scan_time.elapsed();
540 table_metrics
541 .iter_scan_duration
542 .observe(iter_scan_duration.as_secs_f64());
543 table_metrics
544 .iter_init_duration
545 .observe(self.iter_init_duration.as_secs_f64());
546 table_metrics.iter_counts.inc();
547 table_metrics.iter_item.observe(self.total_items as f64);
548 table_metrics.iter_size.observe(self.total_size as f64);
549 table_metrics.iter_in_progress_counts.dec();
550 }
551 }
552}
553
554pub(crate) struct StateStoreIterLogStats {
555 inner: StateStoreIterStatsInner,
556 insert_count: u64,
557 update_count: u64,
558 delete_count: u64,
559}
560
561impl StateStoreIterLogStats {
562 fn for_table_metrics(
563 table_id: TableId,
564 global_metrics: &MonitoredStorageMetrics,
565 f: impl FnOnce(&mut LocalIterLogMetrics),
566 ) {
567 thread_local!(static LOCAL_ITER_LOG_METRICS: RefCell<HashMap<TableId, LocalIterLogMetrics>> = RefCell::new(HashMap::default()));
568 LOCAL_ITER_LOG_METRICS.with_borrow_mut(|local_metrics| {
569 let table_metrics = local_metrics.entry(table_id).or_insert_with(|| {
570 let table_label = table_id.to_string();
571 global_metrics.local_iter_log_metrics(&table_label)
572 });
573 f(table_metrics)
574 });
575 }
576}
577
578impl StateStoreIterStatsTrait for StateStoreIterLogStats {
579 type Item = StateStoreReadLogItem;
580
581 fn new(
582 table_id: TableId,
583 metrics: &MonitoredStorageMetrics,
584 iter_init_duration: Duration,
585 ) -> Self {
586 Self::for_table_metrics(table_id, metrics, |metrics| {
587 metrics.iter_metrics.iter_in_progress_counts.inc();
588 });
589 Self {
590 inner: StateStoreIterStatsInner::new(iter_init_duration),
591 insert_count: 0,
592 update_count: 0,
593 delete_count: 0,
594 }
595 }
596
597 fn observe(&mut self, (key, change_log): StateStoreReadLogItemRef<'_>) {
598 self.inner.total_items += 1;
599 let value_len = match change_log {
600 ChangeLogValue::Insert(value) => {
601 self.insert_count += 1;
602 value.len()
603 }
604 ChangeLogValue::Update {
605 old_value,
606 new_value,
607 } => {
608 self.update_count += 1;
609 old_value.len() + new_value.len()
610 }
611 ChangeLogValue::Delete(value) => {
612 self.delete_count += 1;
613 value.len()
614 }
615 };
616 self.inner.total_size += key.len() + value_len;
617 }
618
619 fn report(&mut self, table_id: TableId, metrics: &MonitoredStorageMetrics) {
620 Self::for_table_metrics(table_id, metrics, |table_metrics| {
621 self.inner.apply_to_local(&mut table_metrics.iter_metrics);
622 table_metrics.insert_count.inc_by(self.insert_count);
623 table_metrics.update_count.inc_by(self.update_count);
624 table_metrics.delete_count.inc_by(self.delete_count);
625 table_metrics.may_flush();
626 });
627 }
628}
629
630pub(crate) struct MonitoredStateStoreGetStats {
631 pub get_duration: Instant,
632 pub get_key_size: usize,
633 pub get_value_size: usize,
634 pub table_id: TableId,
635 pub metrics: Arc<MonitoredStorageMetrics>,
636}
637
638impl MonitoredStateStoreGetStats {
639 pub(crate) fn new(table_id: TableId, metrics: Arc<MonitoredStorageMetrics>) -> Self {
640 Self {
641 get_duration: Instant::now(),
642 get_key_size: 0,
643 get_value_size: 0,
644 table_id,
645 metrics,
646 }
647 }
648
649 pub(crate) fn report(&self) {
650 thread_local!(static LOCAL_GET_METRICS: RefCell<HashMap<TableId, LocalGetMetrics>> = RefCell::new(HashMap::default()));
651 LOCAL_GET_METRICS.with_borrow_mut(|local_metrics| {
652 let table_metrics = local_metrics.entry(self.table_id).or_insert_with(|| {
653 let table_label = self.table_id.to_string();
654 self.metrics.local_get_metrics(&table_label)
655 });
656 let get_duration = self.get_duration.elapsed();
657 table_metrics
658 .get_duration
659 .observe(get_duration.as_secs_f64());
660 table_metrics.get_key_size.observe(self.get_key_size as _);
661 if self.get_value_size > 0 {
662 table_metrics
663 .get_value_size
664 .observe(self.get_value_size as _);
665 }
666 table_metrics.may_flush();
667 });
668 }
669}