risingwave_meta/hummock/manager/
table_write_throughput_statistic.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, VecDeque};
16
17#[derive(Debug, Clone)]
18pub struct TableWriteThroughputStatistic {
19    pub throughput: u64,
20    pub timestamp_secs: i64,
21}
22
23impl AsRef<TableWriteThroughputStatistic> for TableWriteThroughputStatistic {
24    fn as_ref(&self) -> &TableWriteThroughputStatistic {
25        self
26    }
27}
28
29impl TableWriteThroughputStatistic {
30    pub fn is_expired(&self, max_statistic_expired_secs: i64, timestamp_secs: i64) -> bool {
31        // max(0) is used to avoid overflow
32        (timestamp_secs - self.timestamp_secs).max(0) > max_statistic_expired_secs
33    }
34}
35
36#[derive(Debug, Clone)]
37pub struct TableWriteThroughputStatisticManager {
38    table_throughput: HashMap<u32, VecDeque<TableWriteThroughputStatistic>>,
39    max_statistic_expired_secs: i64,
40}
41
42impl TableWriteThroughputStatisticManager {
43    pub fn new(max_statistic_expired_secs: i64) -> Self {
44        Self {
45            table_throughput: HashMap::new(),
46            max_statistic_expired_secs,
47        }
48    }
49
50    pub fn add_table_throughput_with_ts(
51        &mut self,
52        table_id: u32,
53        throughput: u64,
54        timestamp_secs: i64,
55    ) {
56        let table_throughput = self.table_throughput.entry(table_id).or_default();
57        table_throughput.push_back(TableWriteThroughputStatistic {
58            throughput,
59            timestamp_secs,
60        });
61
62        // skip expired statistics
63        while let Some(statistic) = table_throughput.front() {
64            if statistic.is_expired(self.max_statistic_expired_secs, timestamp_secs) {
65                table_throughput.pop_front();
66            } else {
67                break;
68            }
69        }
70
71        if table_throughput.is_empty() {
72            self.table_throughput.remove(&table_id);
73        }
74    }
75
76    // `get_table_throughput` return the statistics of the table with the given `table_id` within the given `window_secs`.
77    // The statistics are sorted by timestamp in descending order.
78    pub fn get_table_throughput_descending(
79        &self,
80        table_id: u32,
81        window_secs: i64,
82    ) -> impl Iterator<Item = &TableWriteThroughputStatistic> {
83        let timestamp_secs = chrono::Utc::now().timestamp();
84        self.table_throughput
85            .get(&table_id)
86            .into_iter()
87            .flatten()
88            .rev()
89            .take_while(move |statistic| !statistic.is_expired(window_secs, timestamp_secs))
90    }
91
92    pub fn remove_table(&mut self, table_id: u32) {
93        self.table_throughput.remove(&table_id);
94    }
95
96    // `avg_write_throughput` returns the average write throughput of the table with the given `table_id` within the given `window_secs`.
97    pub fn avg_write_throughput(&self, table_id: u32, window_secs: i64) -> f64 {
98        let mut total_throughput = 0;
99        let mut total_count = 0;
100        let mut statistic_iter = self
101            .get_table_throughput_descending(table_id, window_secs)
102            .peekable();
103
104        if statistic_iter.peek().is_none() {
105            return 0.0;
106        }
107
108        for statistic in statistic_iter {
109            total_throughput += statistic.throughput;
110            total_count += 1;
111        }
112
113        total_throughput as f64 / total_count as f64
114    }
115}