risingwave_meta/hummock/manager/
table_write_throughput_statistic.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, VecDeque};
16
17use risingwave_common::catalog::TableId;
18
19#[derive(Debug, Clone)]
20pub struct TableWriteThroughputStatistic {
21    pub throughput: u64,
22    pub timestamp_secs: i64,
23}
24
25impl AsRef<TableWriteThroughputStatistic> for TableWriteThroughputStatistic {
26    fn as_ref(&self) -> &TableWriteThroughputStatistic {
27        self
28    }
29}
30
31impl TableWriteThroughputStatistic {
32    pub fn is_expired(&self, max_statistic_expired_secs: i64, timestamp_secs: i64) -> bool {
33        // max(0) is used to avoid overflow
34        (timestamp_secs - self.timestamp_secs).max(0) > max_statistic_expired_secs
35    }
36}
37
38#[derive(Debug, Clone)]
39pub struct TableWriteThroughputStatisticManager {
40    table_throughput: HashMap<TableId, VecDeque<TableWriteThroughputStatistic>>,
41    max_statistic_expired_secs: i64,
42}
43
44impl TableWriteThroughputStatisticManager {
45    pub fn new(max_statistic_expired_secs: i64) -> Self {
46        Self {
47            table_throughput: HashMap::new(),
48            max_statistic_expired_secs,
49        }
50    }
51
52    pub fn add_table_throughput_with_ts(
53        &mut self,
54        table_id: TableId,
55        throughput: u64,
56        timestamp_secs: i64,
57    ) {
58        let table_throughput = self.table_throughput.entry(table_id).or_default();
59        table_throughput.push_back(TableWriteThroughputStatistic {
60            throughput,
61            timestamp_secs,
62        });
63
64        // skip expired statistics
65        while let Some(statistic) = table_throughput.front() {
66            if statistic.is_expired(self.max_statistic_expired_secs, timestamp_secs) {
67                table_throughput.pop_front();
68            } else {
69                break;
70            }
71        }
72
73        if table_throughput.is_empty() {
74            self.table_throughput.remove(&table_id);
75        }
76    }
77
78    // `get_table_throughput` return the statistics of the table with the given `table_id` within the given `window_secs`.
79    // The statistics are sorted by timestamp in descending order.
80    pub fn get_table_throughput_descending(
81        &self,
82        table_id: TableId,
83        window_secs: i64,
84    ) -> impl Iterator<Item = &TableWriteThroughputStatistic> {
85        let timestamp_secs = chrono::Utc::now().timestamp();
86        self.table_throughput
87            .get(&table_id)
88            .into_iter()
89            .flatten()
90            .rev()
91            .take_while(move |statistic| !statistic.is_expired(window_secs, timestamp_secs))
92    }
93
94    pub fn remove_table(&mut self, table_id: TableId) {
95        self.table_throughput.remove(&table_id);
96    }
97
98    // `avg_write_throughput` returns the average write throughput of the table with the given `table_id` within the given `window_secs`.
99    pub fn avg_write_throughput(&self, table_id: TableId, window_secs: i64) -> f64 {
100        let mut total_throughput = 0;
101        let mut total_count = 0;
102        let mut statistic_iter = self
103            .get_table_throughput_descending(table_id, window_secs)
104            .peekable();
105
106        if statistic_iter.peek().is_none() {
107            return 0.0;
108        }
109
110        for statistic in statistic_iter {
111            total_throughput += statistic.throughput;
112            total_count += 1;
113        }
114
115        total_throughput as f64 / total_count as f64
116    }
117}