risingwave_storage/hummock/compactor/
compaction_executor.rs1use std::future::Future;
16
17use more_asserts::assert_gt;
18use risingwave_common::util::resource_util;
19use risingwave_common::util::runtime::BackgroundShutdownRuntime;
20use tokio::task::JoinHandle;
21
22pub struct CompactionExecutor {
24 runtime: BackgroundShutdownRuntime,
26 worker_num: usize,
27}
28
29impl CompactionExecutor {
30 pub fn new(worker_threads_num: Option<usize>) -> Self {
31 let mut worker_num = resource_util::cpu::total_cpu_available().ceil() as usize;
32 let runtime = {
33 let mut builder = tokio::runtime::Builder::new_multi_thread();
34 builder.thread_name("rw-compaction");
35 if let Some(worker_threads_num) = worker_threads_num {
36 builder.worker_threads(worker_threads_num);
37 worker_num = worker_threads_num;
38 }
39 assert_gt!(worker_num, 0);
40 builder.enable_all().build().unwrap()
41 };
42
43 Self {
44 runtime: runtime.into(),
45 worker_num,
46 }
47 }
48
49 pub fn spawn<F, T>(&self, t: F) -> JoinHandle<T>
51 where
52 F: Future<Output = T> + Send + 'static,
53 T: Send + 'static,
54 {
55 self.runtime.spawn(t)
56 }
57
58 pub fn worker_num(&self) -> usize {
59 self.worker_num
60 }
61}