risingwave_frontend/scheduler/distributed/
stats.rs1use std::sync::LazyLock;
16
17use prometheus::core::{AtomicU64, GenericCounter};
18use prometheus::{
19    Histogram, IntGauge, Registry, exponential_buckets, histogram_opts,
20    register_histogram_with_registry, register_int_counter_with_registry,
21    register_int_gauge_with_registry,
22};
23use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
24
25#[derive(Clone)]
26pub struct DistributedQueryMetrics {
27    pub running_query_num: IntGauge,
28    pub rejected_query_counter: GenericCounter<AtomicU64>,
29    pub completed_query_counter: GenericCounter<AtomicU64>,
30    pub query_latency: Histogram,
31}
32
33pub static GLOBAL_DISTRIBUTED_QUERY_METRICS: LazyLock<DistributedQueryMetrics> =
34    LazyLock::new(|| DistributedQueryMetrics::new(&GLOBAL_METRICS_REGISTRY));
35
36impl DistributedQueryMetrics {
37    fn new(registry: &Registry) -> Self {
38        let running_query_num = register_int_gauge_with_registry!(
39            "distributed_running_query_num",
40            "The number of running query of distributed execution mode",
41            registry
42        )
43        .unwrap();
44
45        let rejected_query_counter = register_int_counter_with_registry!(
46            "distributed_rejected_query_counter",
47            "The number of rejected query in distributed execution mode. ",
48            registry
49        )
50        .unwrap();
51
52        let completed_query_counter = register_int_counter_with_registry!(
53            "distributed_completed_query_counter",
54            "The number of query ended successfully in distributed execution mode",
55            registry
56        )
57        .unwrap();
58
59        let opts = histogram_opts!(
60            "distributed_query_latency",
61            "latency of query executed successfully in distributed execution mode",
62            exponential_buckets(0.01, 2.0, 23).unwrap()
63        );
64
65        let query_latency = register_histogram_with_registry!(opts, registry).unwrap();
66
67        Self {
68            running_query_num,
69            rejected_query_counter,
70            completed_query_counter,
71            query_latency,
72        }
73    }
74
75    pub fn for_test() -> Self {
77        GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()
78    }
79}