risingwave_frontend/scheduler/distributed/
stats.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::LazyLock;
16
17use prometheus::core::{AtomicU64, GenericCounter};
18use prometheus::{
19    Histogram, IntGauge, Registry, exponential_buckets, histogram_opts,
20    register_histogram_with_registry, register_int_counter_with_registry,
21    register_int_gauge_with_registry,
22};
23use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
24
25#[derive(Clone)]
26pub struct DistributedQueryMetrics {
27    pub running_query_num: IntGauge,
28    pub rejected_query_counter: GenericCounter<AtomicU64>,
29    pub completed_query_counter: GenericCounter<AtomicU64>,
30    pub query_latency: Histogram,
31}
32
33pub static GLOBAL_DISTRIBUTED_QUERY_METRICS: LazyLock<DistributedQueryMetrics> =
34    LazyLock::new(|| DistributedQueryMetrics::new(&GLOBAL_METRICS_REGISTRY));
35
36impl DistributedQueryMetrics {
37    fn new(registry: &Registry) -> Self {
38        let running_query_num = register_int_gauge_with_registry!(
39            "distributed_running_query_num",
40            "The number of running query of distributed execution mode",
41            registry
42        )
43        .unwrap();
44
45        let rejected_query_counter = register_int_counter_with_registry!(
46            "distributed_rejected_query_counter",
47            "The number of rejected query in distributed execution mode. ",
48            registry
49        )
50        .unwrap();
51
52        let completed_query_counter = register_int_counter_with_registry!(
53            "distributed_completed_query_counter",
54            "The number of query ended successfully in distributed execution mode",
55            registry
56        )
57        .unwrap();
58
59        let opts = histogram_opts!(
60            "distributed_query_latency",
61            "latency of query executed successfully in distributed execution mode",
62            exponential_buckets(0.01, 2.0, 23).unwrap()
63        );
64
65        let query_latency = register_histogram_with_registry!(opts, registry).unwrap();
66
67        Self {
68            running_query_num,
69            rejected_query_counter,
70            completed_query_counter,
71            query_latency,
72        }
73    }
74
75    /// Create a new `DistributedQueryMetrics` instance used in tests or other places.
76    pub fn for_test() -> Self {
77        GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()
78    }
79}