risingwave_storage/hummock/compactor/
context.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use super::task_progress::TaskProgressManagerRef;
18use crate::hummock::MemoryLimiter;
19use crate::hummock::compactor::CompactionExecutor;
20use crate::hummock::sstable_store::SstableStoreRef;
21use crate::monitor::CompactorMetrics;
22use crate::opts::StorageOpts;
23
24pub type CompactionAwaitTreeRegRef = await_tree::Registry;
25
26pub fn new_compaction_await_tree_reg_ref(config: await_tree::Config) -> CompactionAwaitTreeRegRef {
27    await_tree::Registry::new(config)
28}
29
30pub mod await_tree_key {
31    /// Await-tree key type for compaction tasks.
32    #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
33    pub enum Compaction {
34        CompactRunner { task_id: u64, split_index: usize },
35        CompactSharedBuffer { id: usize },
36        SpawnUploadTask { id: usize },
37        MergingTask { id: usize },
38    }
39
40    pub use Compaction::*;
41}
42
43/// A `CompactorContext` describes the context of a compactor.
44#[derive(Clone)]
45pub struct CompactorContext {
46    /// Storage options.
47    pub storage_opts: Arc<StorageOpts>,
48
49    /// Sstable store that manages the sstables.
50    pub sstable_store: SstableStoreRef,
51
52    /// Statistics.
53    pub compactor_metrics: Arc<CompactorMetrics>,
54
55    /// True if it is a memory compaction (from shared buffer).
56    pub is_share_buffer_compact: bool,
57
58    pub compaction_executor: Arc<CompactionExecutor>,
59
60    pub memory_limiter: Arc<MemoryLimiter>,
61
62    pub task_progress_manager: TaskProgressManagerRef,
63
64    pub await_tree_reg: Option<CompactionAwaitTreeRegRef>,
65}
66
67impl CompactorContext {
68    pub fn new_local_compact_context(
69        storage_opts: Arc<StorageOpts>,
70        sstable_store: SstableStoreRef,
71        compactor_metrics: Arc<CompactorMetrics>,
72        await_tree_reg: Option<CompactionAwaitTreeRegRef>,
73    ) -> Self {
74        let compaction_executor = if storage_opts.share_buffer_compaction_worker_threads_number == 0
75        {
76            Arc::new(CompactionExecutor::new(None))
77        } else {
78            Arc::new(CompactionExecutor::new(Some(
79                storage_opts.share_buffer_compaction_worker_threads_number as usize,
80            )))
81        };
82
83        // not limit memory for local compact
84        Self {
85            storage_opts,
86            sstable_store,
87            compactor_metrics,
88            is_share_buffer_compact: true,
89            compaction_executor,
90            memory_limiter: MemoryLimiter::unlimit(),
91            task_progress_manager: Default::default(),
92            await_tree_reg,
93        }
94    }
95}