risingwave_common/metrics_reader/mod.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use anyhow::Result;
18
19/// Kafka metrics for a single source partition.
20#[derive(Debug, Clone)]
21pub struct KafkaPartitionMetrics {
22 pub source_id: u32,
23 pub partition_id: String,
24 pub high_watermark: Option<i64>,
25 pub latest_offset: Option<i64>,
26}
27
28/// Key for identifying a channel between fragments.
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct ChannelKey {
31 pub upstream_fragment_id: u32,
32 pub downstream_fragment_id: u32,
33}
34
35/// Channel delta statistics for a specific channel.
36#[derive(Debug, Clone)]
37pub struct ChannelDeltaStats {
38 pub backpressure_rate: f64,
39 pub recv_throughput: f64,
40 pub send_throughput: f64,
41}
42
43/// Entry containing channel delta statistics with fragment IDs.
44#[derive(Debug, Clone)]
45pub struct ChannelDeltaStatsEntry {
46 pub upstream_fragment_id: u32,
47 pub downstream_fragment_id: u32,
48 pub backpressure_rate: f64,
49 pub recv_throughput: f64,
50 pub send_throughput: f64,
51}
52
53/// Trait for reading metrics from the meta node via RPC calls.
54#[async_trait::async_trait]
55pub trait MetricsReader: Send + Sync {
56 /// Fetches channel delta statistics from the meta node.
57 ///
58 /// # Arguments
59 /// * `at` - Unix timestamp in seconds for the evaluation time. If None, defaults to current Prometheus server time.
60 /// * `time_offset` - Time offset for throughput and backpressure rate calculation in seconds. If None, defaults to 60s.
61 ///
62 /// # Returns
63 /// * `Result<HashMap<ChannelKey, ChannelDeltaStats>>` - The channel delta stats mapped by channel key or an error
64 async fn get_channel_delta_stats(
65 &self,
66 at: Option<i64>,
67 time_offset: Option<i64>,
68 ) -> Result<HashMap<ChannelKey, ChannelDeltaStats>>;
69
70 /// Fetch Kafka source lag related metrics from Prometheus.
71 ///
72 /// Returns one row per (`source_id`, `partition_id`) with optional high watermark and
73 /// latest consumed offset.
74 async fn get_kafka_source_metrics(&self) -> Result<Vec<KafkaPartitionMetrics>>;
75}