risingwave_common/metrics_reader/mod.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use anyhow::Result;
18
19/// Key for identifying a channel between fragments.
20#[derive(Debug, Clone, PartialEq, Eq, Hash)]
21pub struct ChannelKey {
22 pub upstream_fragment_id: u32,
23 pub downstream_fragment_id: u32,
24}
25
26/// Channel delta statistics for a specific channel.
27#[derive(Debug, Clone)]
28pub struct ChannelDeltaStats {
29 pub backpressure_rate: f64,
30 pub recv_throughput: f64,
31 pub send_throughput: f64,
32}
33
34/// Entry containing channel delta statistics with fragment IDs.
35#[derive(Debug, Clone)]
36pub struct ChannelDeltaStatsEntry {
37 pub upstream_fragment_id: u32,
38 pub downstream_fragment_id: u32,
39 pub backpressure_rate: f64,
40 pub recv_throughput: f64,
41 pub send_throughput: f64,
42}
43
44/// Trait for reading metrics from the meta node via RPC calls.
45#[async_trait::async_trait]
46pub trait MetricsReader: Send + Sync {
47 /// Fetches channel delta statistics from the meta node.
48 ///
49 /// # Arguments
50 /// * `at` - Unix timestamp in seconds for the evaluation time. If None, defaults to current Prometheus server time.
51 /// * `time_offset` - Time offset for throughput and backpressure rate calculation in seconds. If None, defaults to 60s.
52 ///
53 /// # Returns
54 /// * `Result<HashMap<ChannelKey, ChannelDeltaStats>>` - The channel delta stats mapped by channel key or an error
55 async fn get_channel_delta_stats(
56 &self,
57 at: Option<i64>,
58 time_offset: Option<i64>,
59 ) -> Result<HashMap<ChannelKey, ChannelDeltaStats>>;
60}