risingwave_compute/rpc/service/
exchange_metrics.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::LazyLock;
16
17use prometheus::core::{AtomicU64, GenericCounterVec};
18use prometheus::{Registry, register_int_counter_vec_with_registry};
19use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
20
21#[derive(Clone)]
22pub struct ExchangeServiceMetrics {
23    pub stream_fragment_exchange_bytes: GenericCounterVec<AtomicU64>,
24}
25
26pub static GLOBAL_EXCHANGE_SERVICE_METRICS: LazyLock<ExchangeServiceMetrics> =
27    LazyLock::new(|| ExchangeServiceMetrics::new(&GLOBAL_METRICS_REGISTRY));
28
29impl ExchangeServiceMetrics {
30    fn new(registry: &Registry) -> Self {
31        let stream_fragment_exchange_bytes = register_int_counter_vec_with_registry!(
32            "stream_exchange_frag_send_size",
33            "Total size of messages that have been send to downstream Fragment",
34            &["up_fragment_id", "down_fragment_id"],
35            registry
36        )
37        .unwrap();
38
39        Self {
40            stream_fragment_exchange_bytes,
41        }
42    }
43}