risingwave_meta/hummock/manager/
table_write_throughput_statistic.rs1use std::collections::{HashMap, VecDeque};
16
17#[derive(Debug, Clone)]
18pub struct TableWriteThroughputStatistic {
19 pub throughput: u64,
20 pub timestamp_secs: i64,
21}
22
23impl AsRef<TableWriteThroughputStatistic> for TableWriteThroughputStatistic {
24 fn as_ref(&self) -> &TableWriteThroughputStatistic {
25 self
26 }
27}
28
29impl TableWriteThroughputStatistic {
30 pub fn is_expired(&self, max_statistic_expired_secs: i64, timestamp_secs: i64) -> bool {
31 (timestamp_secs - self.timestamp_secs).max(0) > max_statistic_expired_secs
33 }
34}
35
36#[derive(Debug, Clone)]
37pub struct TableWriteThroughputStatisticManager {
38 table_throughput: HashMap<u32, VecDeque<TableWriteThroughputStatistic>>,
39 max_statistic_expired_secs: i64,
40}
41
42impl TableWriteThroughputStatisticManager {
43 pub fn new(max_statistic_expired_secs: i64) -> Self {
44 Self {
45 table_throughput: HashMap::new(),
46 max_statistic_expired_secs,
47 }
48 }
49
50 pub fn add_table_throughput_with_ts(
51 &mut self,
52 table_id: u32,
53 throughput: u64,
54 timestamp_secs: i64,
55 ) {
56 let table_throughput = self.table_throughput.entry(table_id).or_default();
57 table_throughput.push_back(TableWriteThroughputStatistic {
58 throughput,
59 timestamp_secs,
60 });
61
62 while let Some(statistic) = table_throughput.front() {
64 if statistic.is_expired(self.max_statistic_expired_secs, timestamp_secs) {
65 table_throughput.pop_front();
66 } else {
67 break;
68 }
69 }
70
71 if table_throughput.is_empty() {
72 self.table_throughput.remove(&table_id);
73 }
74 }
75
76 pub fn get_table_throughput_descending(
79 &self,
80 table_id: u32,
81 window_secs: i64,
82 ) -> impl Iterator<Item = &TableWriteThroughputStatistic> {
83 let timestamp_secs = chrono::Utc::now().timestamp();
84 self.table_throughput
85 .get(&table_id)
86 .into_iter()
87 .flatten()
88 .rev()
89 .take_while(move |statistic| !statistic.is_expired(window_secs, timestamp_secs))
90 }
91
92 pub fn remove_table(&mut self, table_id: u32) {
93 self.table_throughput.remove(&table_id);
94 }
95
96 pub fn avg_write_throughput(&self, table_id: u32, window_secs: i64) -> f64 {
98 let mut total_throughput = 0;
99 let mut total_count = 0;
100 let mut statistic_iter = self
101 .get_table_throughput_descending(table_id, window_secs)
102 .peekable();
103
104 if statistic_iter.peek().is_none() {
105 return 0.0;
106 }
107
108 for statistic in statistic_iter {
109 total_throughput += statistic.throughput;
110 total_count += 1;
111 }
112
113 total_throughput as f64 / total_count as f64
114 }
115}