risingwave_frontend/scheduler/distributed/
stats.rs1use std::sync::LazyLock;
16
17use prometheus::core::{AtomicU64, GenericCounter};
18use prometheus::{
19 Histogram, IntGauge, Registry, exponential_buckets, histogram_opts,
20 register_histogram_with_registry, register_int_counter_with_registry,
21 register_int_gauge_with_registry,
22};
23use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
24
25#[derive(Clone)]
26pub struct DistributedQueryMetrics {
27 pub running_query_num: IntGauge,
28 pub rejected_query_counter: GenericCounter<AtomicU64>,
29 pub completed_query_counter: GenericCounter<AtomicU64>,
30 pub query_latency: Histogram,
31}
32
33pub static GLOBAL_DISTRIBUTED_QUERY_METRICS: LazyLock<DistributedQueryMetrics> =
34 LazyLock::new(|| DistributedQueryMetrics::new(&GLOBAL_METRICS_REGISTRY));
35
36impl DistributedQueryMetrics {
37 fn new(registry: &Registry) -> Self {
38 let running_query_num = register_int_gauge_with_registry!(
39 "distributed_running_query_num",
40 "The number of running query of distributed execution mode",
41 registry
42 )
43 .unwrap();
44
45 let rejected_query_counter = register_int_counter_with_registry!(
46 "distributed_rejected_query_counter",
47 "The number of rejected query in distributed execution mode. ",
48 registry
49 )
50 .unwrap();
51
52 let completed_query_counter = register_int_counter_with_registry!(
53 "distributed_completed_query_counter",
54 "The number of query ended successfully in distributed execution mode",
55 registry
56 )
57 .unwrap();
58
59 let opts = histogram_opts!(
60 "distributed_query_latency",
61 "latency of query executed successfully in distributed execution mode",
62 exponential_buckets(0.01, 2.0, 23).unwrap()
63 );
64
65 let query_latency = register_histogram_with_registry!(opts, registry).unwrap();
66
67 Self {
68 running_query_num,
69 rejected_query_counter,
70 completed_query_counter,
71 query_latency,
72 }
73 }
74
75 pub fn for_test() -> Self {
77 GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()
78 }
79}