risingwave_meta/hummock/compaction/selector/
emergency_selector.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_hummock_sdk::HummockCompactionTaskId;
16use risingwave_pb::hummock::compact_task;
17
18use super::{CompactionSelector, DynamicLevelSelectorCore};
19use crate::hummock::compaction::picker::{EmergencyCompactionPicker, LocalPickerStatistic};
20use crate::hummock::compaction::selector::CompactionSelectorContext;
21use crate::hummock::compaction::{CompactionTask, create_compaction_task};
22
23#[derive(Default)]
24pub struct EmergencySelector {}
25
26impl CompactionSelector for EmergencySelector {
27    fn pick_compaction(
28        &mut self,
29        task_id: HummockCompactionTaskId,
30        context: CompactionSelectorContext<'_>,
31    ) -> Option<CompactionTask> {
32        let CompactionSelectorContext {
33            group,
34            levels,
35            level_handlers,
36            selector_stats,
37            developer_config,
38            ..
39        } = context;
40        let dynamic_level_core = DynamicLevelSelectorCore::new(
41            group.compaction_config.clone(),
42            developer_config.clone(),
43        );
44        let ctx = dynamic_level_core.calculate_level_base_size(levels);
45        let picker = EmergencyCompactionPicker::new(
46            ctx.base_level,
47            group.compaction_config.clone(),
48            developer_config,
49        );
50
51        let mut stats = LocalPickerStatistic::default();
52        if let Some(compaction_input) = picker.pick_compaction(levels, level_handlers, &mut stats) {
53            compaction_input.add_pending_task(task_id, level_handlers);
54
55            return Some(create_compaction_task(
56                group.compaction_config.as_ref(),
57                compaction_input,
58                ctx.base_level,
59                self.task_type(),
60            ));
61        }
62
63        selector_stats.skip_picker.push((0, ctx.base_level, stats));
64
65        None
66    }
67
68    fn name(&self) -> &'static str {
69        "EmergencyCompaction"
70    }
71
72    fn task_type(&self) -> compact_task::TaskType {
73        compact_task::TaskType::Emergency
74    }
75}