risingwave_storage/hummock/compactor/
task_progress.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
18
19use parking_lot::Mutex;
20use risingwave_hummock_sdk::HummockCompactionTaskId;
21use risingwave_pb::hummock::CompactTaskProgress;
22
23pub type TaskProgressManagerRef = Arc<Mutex<HashMap<HummockCompactionTaskId, Arc<TaskProgress>>>>;
24
25/// The progress of a compaction task.
26#[derive(Default)]
27pub struct TaskProgress {
28    num_ssts_sealed: AtomicU32,
29    num_ssts_uploaded: AtomicU32,
30    num_progress_key: AtomicU64,
31    num_pending_read_io: AtomicUsize,
32    num_pending_write_io: AtomicUsize,
33}
34
35impl TaskProgress {
36    pub fn inc_ssts_sealed(&self) {
37        self.num_ssts_sealed.fetch_add(1, Ordering::Relaxed);
38    }
39
40    pub fn inc_ssts_uploaded(&self) {
41        self.num_ssts_uploaded.fetch_add(1, Ordering::Relaxed);
42    }
43
44    pub fn inc_progress_key(&self, inc_key_num: u64) {
45        self.num_progress_key
46            .fetch_add(inc_key_num, Ordering::Relaxed);
47    }
48
49    pub fn inc_num_pending_read_io(&self) {
50        self.num_pending_read_io.fetch_add(1, Ordering::SeqCst);
51    }
52
53    pub fn inc_num_pending_write_io(&self) {
54        self.num_pending_write_io.fetch_add(1, Ordering::SeqCst);
55    }
56
57    pub fn dec_num_pending_read_io(&self) {
58        self.num_pending_read_io.fetch_sub(1, Ordering::SeqCst);
59    }
60
61    pub fn dec_num_pending_write_io(&self) {
62        self.num_pending_write_io.fetch_sub(1, Ordering::SeqCst);
63    }
64
65    pub fn snapshot(&self, task_id: u64) -> CompactTaskProgress {
66        CompactTaskProgress {
67            task_id,
68            num_ssts_sealed: self.num_ssts_sealed.load(Ordering::Relaxed),
69            num_ssts_uploaded: self.num_ssts_uploaded.load(Ordering::Relaxed),
70            num_pending_read_io: self.num_pending_read_io.load(Ordering::Relaxed) as u64,
71            num_pending_write_io: self.num_pending_write_io.load(Ordering::Relaxed) as u64,
72            num_progress_key: self.num_progress_key.load(Ordering::Relaxed),
73            ..Default::default()
74        }
75    }
76}
77
78/// An RAII object that contains a [`TaskProgress`] and shares it to all the splits of the task.
79#[derive(Default)]
80pub struct TaskProgressGuard {
81    task_id: HummockCompactionTaskId,
82    progress_manager: TaskProgressManagerRef,
83
84    pub progress: Arc<TaskProgress>,
85}
86
87impl TaskProgressGuard {
88    pub fn new(task_id: HummockCompactionTaskId, progress_manager: TaskProgressManagerRef) -> Self {
89        let progress = progress_manager.lock().entry(task_id).or_default().clone();
90        Self {
91            task_id,
92            progress_manager,
93            progress,
94        }
95    }
96}
97
98impl Drop for TaskProgressGuard {
99    fn drop(&mut self) {
100        // The entry is created along with `TaskProgress`, so it must exist.
101        self.progress_manager
102            .lock()
103            .remove(&self.task_id)
104            .expect("task progress should exist when task is finished");
105    }
106}
107
108#[cfg(test)]
109mod tests {
110    use super::{TaskProgressGuard, TaskProgressManagerRef};
111
112    #[test]
113    fn test_progress_removal() {
114        let task_progress_manager = TaskProgressManagerRef::default();
115        {
116            let _guard = TaskProgressGuard::new(1, task_progress_manager.clone());
117            assert_eq!(task_progress_manager.lock().len(), 1);
118        }
119        assert_eq!(task_progress_manager.lock().len(), 0);
120    }
121}