risingwave_meta/hummock/manager/
table_write_throughput_statistic.rs1use std::collections::{HashMap, VecDeque};
16
17use risingwave_common::catalog::TableId;
18
19#[derive(Debug, Clone)]
20pub struct TableWriteThroughputStatistic {
21 pub throughput: u64,
22 pub timestamp_secs: i64,
23}
24
25impl AsRef<TableWriteThroughputStatistic> for TableWriteThroughputStatistic {
26 fn as_ref(&self) -> &TableWriteThroughputStatistic {
27 self
28 }
29}
30
31impl TableWriteThroughputStatistic {
32 pub fn is_expired(&self, max_statistic_expired_secs: i64, timestamp_secs: i64) -> bool {
33 (timestamp_secs - self.timestamp_secs).max(0) > max_statistic_expired_secs
35 }
36}
37
38#[derive(Debug, Clone)]
39pub struct TableWriteThroughputStatisticManager {
40 table_throughput: HashMap<TableId, VecDeque<TableWriteThroughputStatistic>>,
41 max_statistic_expired_secs: i64,
42}
43
44impl TableWriteThroughputStatisticManager {
45 pub fn new(max_statistic_expired_secs: i64) -> Self {
46 Self {
47 table_throughput: HashMap::new(),
48 max_statistic_expired_secs,
49 }
50 }
51
52 pub fn add_table_throughput_with_ts(
53 &mut self,
54 table_id: TableId,
55 throughput: u64,
56 timestamp_secs: i64,
57 ) {
58 let table_throughput = self.table_throughput.entry(table_id).or_default();
59 table_throughput.push_back(TableWriteThroughputStatistic {
60 throughput,
61 timestamp_secs,
62 });
63
64 while let Some(statistic) = table_throughput.front() {
66 if statistic.is_expired(self.max_statistic_expired_secs, timestamp_secs) {
67 table_throughput.pop_front();
68 } else {
69 break;
70 }
71 }
72
73 if table_throughput.is_empty() {
74 self.table_throughput.remove(&table_id);
75 }
76 }
77
78 pub fn get_table_throughput_descending(
81 &self,
82 table_id: TableId,
83 window_secs: i64,
84 ) -> impl Iterator<Item = &TableWriteThroughputStatistic> {
85 let timestamp_secs = chrono::Utc::now().timestamp();
86 self.table_throughput
87 .get(&table_id)
88 .into_iter()
89 .flatten()
90 .rev()
91 .take_while(move |statistic| !statistic.is_expired(window_secs, timestamp_secs))
92 }
93
94 pub fn remove_table(&mut self, table_id: TableId) {
95 self.table_throughput.remove(&table_id);
96 }
97
98 pub fn avg_write_throughput(&self, table_id: TableId, window_secs: i64) -> f64 {
100 let mut total_throughput = 0;
101 let mut total_count = 0;
102 let mut statistic_iter = self
103 .get_table_throughput_descending(table_id, window_secs)
104 .peekable();
105
106 if statistic_iter.peek().is_none() {
107 return 0.0;
108 }
109
110 for statistic in statistic_iter {
111 total_throughput += statistic.throughput;
112 total_count += 1;
113 }
114
115 total_throughput as f64 / total_count as f64
116 }
117}