risingwave_storage/hummock/compactor/
compaction_executor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16
17use more_asserts::assert_gt;
18use risingwave_common::util::resource_util;
19use risingwave_common::util::runtime::BackgroundShutdownRuntime;
20use tokio::task::JoinHandle;
21
22/// `CompactionExecutor` is a dedicated runtime for compaction's CPU intensive jobs.
23pub struct CompactionExecutor {
24    /// Runtime for compaction tasks.
25    runtime: BackgroundShutdownRuntime,
26    worker_num: usize,
27}
28
29impl CompactionExecutor {
30    pub fn new(worker_threads_num: Option<usize>) -> Self {
31        let mut worker_num = resource_util::cpu::total_cpu_available().ceil() as usize;
32        let runtime = {
33            let mut builder = tokio::runtime::Builder::new_multi_thread();
34            builder.thread_name("rw-compaction");
35            if let Some(worker_threads_num) = worker_threads_num {
36                builder.worker_threads(worker_threads_num);
37                worker_num = worker_threads_num;
38            }
39            assert_gt!(worker_num, 0);
40            builder.enable_all().build().unwrap()
41        };
42
43        Self {
44            runtime: runtime.into(),
45            worker_num,
46        }
47    }
48
49    /// Send a request to the executor, returns a [`JoinHandle`] to retrieve the result.
50    pub fn spawn<F, T>(&self, t: F) -> JoinHandle<T>
51    where
52        F: Future<Output = T> + Send + 'static,
53        T: Send + 'static,
54    {
55        self.runtime.spawn(t)
56    }
57
58    pub fn worker_num(&self) -> usize {
59        self.worker_num
60    }
61}