risingwave_common/metrics_reader/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use anyhow::Result;
18
19/// Key for identifying a channel between fragments.
20#[derive(Debug, Clone, PartialEq, Eq, Hash)]
21pub struct ChannelKey {
22    pub upstream_fragment_id: u32,
23    pub downstream_fragment_id: u32,
24}
25
26/// Channel delta statistics for a specific channel.
27#[derive(Debug, Clone)]
28pub struct ChannelDeltaStats {
29    pub backpressure_rate: f64,
30    pub recv_throughput: f64,
31    pub send_throughput: f64,
32}
33
34/// Entry containing channel delta statistics with fragment IDs.
35#[derive(Debug, Clone)]
36pub struct ChannelDeltaStatsEntry {
37    pub upstream_fragment_id: u32,
38    pub downstream_fragment_id: u32,
39    pub backpressure_rate: f64,
40    pub recv_throughput: f64,
41    pub send_throughput: f64,
42}
43
44/// Trait for reading metrics from the meta node via RPC calls.
45#[async_trait::async_trait]
46pub trait MetricsReader: Send + Sync {
47    /// Fetches channel delta statistics from the meta node.
48    ///
49    /// # Arguments
50    /// * `at` - Unix timestamp in seconds for the evaluation time. If None, defaults to current Prometheus server time.
51    /// * `time_offset` - Time offset for throughput and backpressure rate calculation in seconds. If None, defaults to 60s.
52    ///
53    /// # Returns
54    /// * `Result<HashMap<ChannelKey, ChannelDeltaStats>>` - The channel delta stats mapped by channel key or an error
55    async fn get_channel_delta_stats(
56        &self,
57        at: Option<i64>,
58        time_offset: Option<i64>,
59    ) -> Result<HashMap<ChannelKey, ChannelDeltaStats>>;
60}