risingwave_common/metrics_reader/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use anyhow::Result;
18
19/// Kafka metrics for a single source partition.
20#[derive(Debug, Clone)]
21pub struct KafkaPartitionMetrics {
22    pub source_id: u32,
23    pub partition_id: String,
24    pub high_watermark: Option<i64>,
25    pub latest_offset: Option<i64>,
26}
27
28/// Key for identifying a channel between fragments.
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct ChannelKey {
31    pub upstream_fragment_id: u32,
32    pub downstream_fragment_id: u32,
33}
34
35/// Channel delta statistics for a specific channel.
36#[derive(Debug, Clone)]
37pub struct ChannelDeltaStats {
38    pub backpressure_rate: f64,
39    pub recv_throughput: f64,
40    pub send_throughput: f64,
41}
42
43/// Entry containing channel delta statistics with fragment IDs.
44#[derive(Debug, Clone)]
45pub struct ChannelDeltaStatsEntry {
46    pub upstream_fragment_id: u32,
47    pub downstream_fragment_id: u32,
48    pub backpressure_rate: f64,
49    pub recv_throughput: f64,
50    pub send_throughput: f64,
51}
52
53/// Trait for reading metrics from the meta node via RPC calls.
54#[async_trait::async_trait]
55pub trait MetricsReader: Send + Sync {
56    /// Fetches channel delta statistics from the meta node.
57    ///
58    /// # Arguments
59    /// * `at` - Unix timestamp in seconds for the evaluation time. If None, defaults to current Prometheus server time.
60    /// * `time_offset` - Time offset for throughput and backpressure rate calculation in seconds. If None, defaults to 60s.
61    ///
62    /// # Returns
63    /// * `Result<HashMap<ChannelKey, ChannelDeltaStats>>` - The channel delta stats mapped by channel key or an error
64    async fn get_channel_delta_stats(
65        &self,
66        at: Option<i64>,
67        time_offset: Option<i64>,
68    ) -> Result<HashMap<ChannelKey, ChannelDeltaStats>>;
69
70    /// Fetch Kafka source lag related metrics from Prometheus.
71    ///
72    /// Returns one row per (`source_id`, `partition_id`) with optional high watermark and
73    /// latest consumed offset.
74    async fn get_kafka_source_metrics(&self) -> Result<Vec<KafkaPartitionMetrics>>;
75}