risingwave_storage/hummock/compactor/
task_progress.rs1use std::collections::HashMap;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
18
19use parking_lot::Mutex;
20use risingwave_hummock_sdk::HummockCompactionTaskId;
21use risingwave_pb::hummock::CompactTaskProgress;
22
23pub type TaskProgressManagerRef = Arc<Mutex<HashMap<HummockCompactionTaskId, Arc<TaskProgress>>>>;
24
25#[derive(Default)]
27pub struct TaskProgress {
28 num_ssts_sealed: AtomicU32,
29 num_ssts_uploaded: AtomicU32,
30 num_progress_key: AtomicU64,
31 num_pending_read_io: AtomicUsize,
32 num_pending_write_io: AtomicUsize,
33}
34
35impl TaskProgress {
36 pub fn inc_ssts_sealed(&self) {
37 self.num_ssts_sealed.fetch_add(1, Ordering::Relaxed);
38 }
39
40 pub fn inc_ssts_uploaded(&self) {
41 self.num_ssts_uploaded.fetch_add(1, Ordering::Relaxed);
42 }
43
44 pub fn inc_progress_key(&self, inc_key_num: u64) {
45 self.num_progress_key
46 .fetch_add(inc_key_num, Ordering::Relaxed);
47 }
48
49 pub fn inc_num_pending_read_io(&self) {
50 self.num_pending_read_io.fetch_add(1, Ordering::SeqCst);
51 }
52
53 pub fn inc_num_pending_write_io(&self) {
54 self.num_pending_write_io.fetch_add(1, Ordering::SeqCst);
55 }
56
57 pub fn dec_num_pending_read_io(&self) {
58 self.num_pending_read_io.fetch_sub(1, Ordering::SeqCst);
59 }
60
61 pub fn dec_num_pending_write_io(&self) {
62 self.num_pending_write_io.fetch_sub(1, Ordering::SeqCst);
63 }
64
65 pub fn snapshot(&self, task_id: u64) -> CompactTaskProgress {
66 CompactTaskProgress {
67 task_id,
68 num_ssts_sealed: self.num_ssts_sealed.load(Ordering::Relaxed),
69 num_ssts_uploaded: self.num_ssts_uploaded.load(Ordering::Relaxed),
70 num_pending_read_io: self.num_pending_read_io.load(Ordering::Relaxed) as u64,
71 num_pending_write_io: self.num_pending_write_io.load(Ordering::Relaxed) as u64,
72 num_progress_key: self.num_progress_key.load(Ordering::Relaxed),
73 ..Default::default()
74 }
75 }
76}
77
78#[derive(Default)]
80pub struct TaskProgressGuard {
81 task_id: HummockCompactionTaskId,
82 progress_manager: TaskProgressManagerRef,
83
84 pub progress: Arc<TaskProgress>,
85}
86
87impl TaskProgressGuard {
88 pub fn new(task_id: HummockCompactionTaskId, progress_manager: TaskProgressManagerRef) -> Self {
89 let progress = progress_manager.lock().entry(task_id).or_default().clone();
90 Self {
91 task_id,
92 progress_manager,
93 progress,
94 }
95 }
96}
97
98impl Drop for TaskProgressGuard {
99 fn drop(&mut self) {
100 self.progress_manager
102 .lock()
103 .remove(&self.task_id)
104 .expect("task progress should exist when task is finished");
105 }
106}
107
108#[cfg(test)]
109mod tests {
110 use super::{TaskProgressGuard, TaskProgressManagerRef};
111
112 #[test]
113 fn test_progress_removal() {
114 let task_progress_manager = TaskProgressManagerRef::default();
115 {
116 let _guard = TaskProgressGuard::new(1, task_progress_manager.clone());
117 assert_eq!(task_progress_manager.lock().len(), 1);
118 }
119 assert_eq!(task_progress_manager.lock().len(), 0);
120 }
121}