Skip to main content

risingwave_meta/hummock/compaction/selector/
mod.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
16// This source code is licensed under both the GPLv2 (found in the
17// COPYING file in the root directory) and Apache 2.0 License
18// (found in the LICENSE.Apache file in the root directory).
19
20mod emergency_selector;
21pub(crate) mod level_selector;
22mod manual_selector;
23mod space_reclaim_selector;
24mod tombstone_compaction_selector;
25mod ttl_selector;
26mod vnode_watermark_selector;
27
28use std::collections::{BTreeSet, HashMap};
29use std::sync::Arc;
30
31pub use emergency_selector::EmergencySelector;
32pub use level_selector::{DynamicLevelSelector, DynamicLevelSelectorCore};
33pub use manual_selector::{ManualCompactionOption, ManualCompactionSelector};
34use risingwave_common::catalog::{TableId, TableOption};
35use risingwave_hummock_sdk::level::Levels;
36use risingwave_hummock_sdk::table_watermark::TableWatermarks;
37use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
38use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
39use risingwave_pb::hummock::compact_task;
40pub use space_reclaim_selector::SpaceReclaimCompactionSelector;
41pub use tombstone_compaction_selector::TombstoneCompactionSelector;
42pub use ttl_selector::TtlCompactionSelector;
43pub use vnode_watermark_selector::VnodeWatermarkCompactionSelector;
44
45use super::in_progress_compaction::InProgressCompactionView;
46use super::picker::LocalPickerStatistic;
47use super::{
48    CompactionDeveloperConfig, LevelCompactionPicker, TierCompactionPicker, create_compaction_task,
49};
50use crate::hummock::compaction::CompactionTask;
51use crate::hummock::level_handler::LevelHandler;
52use crate::hummock::model::CompactionGroup;
53use crate::rpc::metrics::MetaMetrics;
54
55pub struct CompactionSelectorContext<'a> {
56    pub group: &'a CompactionGroup,
57    pub levels: &'a Levels,
58    pub member_table_ids: &'a BTreeSet<TableId>,
59    pub level_handlers: &'a mut [LevelHandler],
60    pub selector_stats: &'a mut LocalSelectorStatistic,
61    pub table_id_to_options: &'a HashMap<TableId, TableOption>,
62    pub developer_config: Arc<CompactionDeveloperConfig>,
63    pub table_watermarks: &'a HashMap<TableId, Arc<TableWatermarks>>,
64    pub state_table_info: &'a HummockVersionStateTableInfo,
65    pub in_progress_compactions: &'a InProgressCompactionView,
66}
67
68pub trait CompactionSelector: Sync + Send {
69    fn pick_compaction(
70        &mut self,
71        task_id: HummockCompactionTaskId,
72        context: CompactionSelectorContext<'_>,
73    ) -> Option<CompactionTask>;
74
75    fn report_statistic_metrics(&self, _metrics: &MetaMetrics) {}
76
77    fn name(&self) -> &'static str;
78
79    fn task_type(&self) -> compact_task::TaskType;
80}
81
82pub fn default_compaction_selector() -> Box<dyn CompactionSelector> {
83    Box::<DynamicLevelSelector>::default()
84}
85
86#[derive(Default)]
87pub struct LocalSelectorStatistic {
88    skip_picker: Vec<(usize, usize, LocalPickerStatistic)>,
89}
90
91impl LocalSelectorStatistic {
92    pub fn report_to_metrics(&self, group_id: CompactionGroupId, metrics: &MetaMetrics) {
93        for (start_level, target_level, stats) in &self.skip_picker {
94            let level_label = format!("cg{}-{}-to-{}", group_id, start_level, target_level);
95            if stats.skip_by_write_amp_limit > 0 {
96                metrics
97                    .compact_skip_frequency
98                    .with_label_values(&[level_label.as_str(), "write-amp"])
99                    .inc();
100            }
101            if stats.skip_by_count_limit > 0 {
102                metrics
103                    .compact_skip_frequency
104                    .with_label_values(&[level_label.as_str(), "count"])
105                    .inc();
106            }
107            if stats.skip_by_pending_files > 0 {
108                metrics
109                    .compact_skip_frequency
110                    .with_label_values(&[level_label.as_str(), "pending-files"])
111                    .inc();
112            }
113            if stats.skip_by_overlapping > 0 {
114                metrics
115                    .compact_skip_frequency
116                    .with_label_values(&[level_label.as_str(), "overlapping"])
117                    .inc();
118            }
119            metrics
120                .compact_skip_frequency
121                .with_label_values(&[level_label.as_str(), "picker"])
122                .inc();
123        }
124    }
125}
126
127#[cfg(test)]
128pub mod tests {
129    use std::ops::Range;
130
131    use itertools::Itertools;
132    use risingwave_hummock_sdk::key_range::KeyRange;
133    use risingwave_hummock_sdk::level::{Level, OverlappingLevel};
134    use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
135    use risingwave_pb::hummock::LevelType;
136
137    use super::*;
138    use crate::hummock::test_utils::iterator_test_key_of_epoch;
139
140    pub fn push_table_level0_overlapping(levels: &mut Levels, sst: SstableInfo) {
141        levels.l0.total_file_size += sst.sst_size;
142        levels.l0.sub_levels.push(Level {
143            level_idx: 0,
144            level_type: LevelType::Overlapping,
145            total_file_size: sst.sst_size,
146            uncompressed_file_size: sst.uncompressed_file_size,
147            sub_level_id: sst.sst_id.as_raw_id(),
148            table_infos: vec![sst],
149            ..Default::default()
150        });
151    }
152
153    pub fn push_table_level0_nonoverlapping(levels: &mut Levels, sst: SstableInfo) {
154        push_table_level0_overlapping(levels, sst);
155        levels.l0.sub_levels.last_mut().unwrap().level_type = LevelType::Nonoverlapping;
156    }
157
158    pub fn push_tables_level0_nonoverlapping(levels: &mut Levels, table_infos: Vec<SstableInfo>) {
159        let total_file_size = table_infos.iter().map(|table| table.sst_size).sum::<u64>();
160        let uncompressed_file_size = table_infos
161            .iter()
162            .map(|table| table.uncompressed_file_size)
163            .sum();
164        let sub_level_id = table_infos[0].sst_id.as_raw_id();
165        levels.l0.total_file_size += total_file_size;
166        levels.l0.sub_levels.push(Level {
167            level_idx: 0,
168            level_type: LevelType::Nonoverlapping,
169            total_file_size,
170            sub_level_id,
171            table_infos,
172            uncompressed_file_size,
173            ..Default::default()
174        });
175    }
176
177    pub fn generate_table(
178        id: u64,
179        table_prefix: u64,
180        left: usize,
181        right: usize,
182        epoch: u64,
183    ) -> SstableInfo {
184        generate_table_impl(id, table_prefix, left, right, epoch).into()
185    }
186
187    pub fn generate_table_impl(
188        id: u64,
189        table_prefix: u64,
190        left: usize,
191        right: usize,
192        epoch: u64,
193    ) -> SstableInfoInner {
194        let object_size = (right - left + 1) as u64;
195        SstableInfoInner {
196            object_id: id.into(),
197            sst_id: id.into(),
198            key_range: KeyRange {
199                left: iterator_test_key_of_epoch(table_prefix, left, epoch).into(),
200                right: iterator_test_key_of_epoch(table_prefix, right, epoch).into(),
201                right_exclusive: false,
202            },
203            file_size: object_size,
204            table_ids: vec![(table_prefix as u32).into()],
205            uncompressed_file_size: (right - left + 1) as u64,
206            total_key_count: (right - left + 1) as u64,
207            sst_size: object_size,
208            ..Default::default()
209        }
210    }
211
212    pub fn generate_table_with_ids_and_epochs(
213        id: u64,
214        table_prefix: u64,
215        left: usize,
216        right: usize,
217        epoch: u64,
218        table_ids: Vec<u32>,
219        min_epoch: u64,
220        max_epoch: u64,
221    ) -> SstableInfo {
222        let object_size = (right - left + 1) as u64;
223        SstableInfoInner {
224            object_id: id.into(),
225            sst_id: id.into(),
226            key_range: KeyRange {
227                left: iterator_test_key_of_epoch(table_prefix, left, epoch).into(),
228                right: iterator_test_key_of_epoch(table_prefix, right, epoch).into(),
229                right_exclusive: false,
230            },
231            file_size: object_size,
232            table_ids: table_ids.into_iter().map_into().collect(),
233            uncompressed_file_size: object_size,
234            min_epoch,
235            max_epoch,
236            sst_size: object_size,
237            ..Default::default()
238        }
239        .into()
240    }
241
242    pub fn generate_tables(
243        ids: Range<u64>,
244        keys: Range<usize>,
245        epoch: u64,
246        file_size: u64,
247    ) -> Vec<SstableInfo> {
248        let step = (keys.end - keys.start) / (ids.end - ids.start) as usize;
249        let mut start = keys.start;
250        let mut tables = vec![];
251        for id in ids {
252            let mut table = generate_table_impl(id, 1, start, start + step - 1, epoch);
253            table.file_size = file_size;
254            table.sst_size = file_size;
255            tables.push(table.into());
256            start += step;
257        }
258        tables
259    }
260
261    pub fn generate_level(level_idx: u32, table_infos: Vec<SstableInfo>) -> Level {
262        let total_file_size = table_infos.iter().map(|sst| sst.sst_size).sum();
263        let uncompressed_file_size = table_infos
264            .iter()
265            .map(|sst| sst.uncompressed_file_size)
266            .sum();
267        Level {
268            level_idx,
269            level_type: LevelType::Nonoverlapping,
270            table_infos,
271            total_file_size,
272            sub_level_id: 0,
273            uncompressed_file_size,
274            ..Default::default()
275        }
276    }
277
278    /// Returns a `OverlappingLevel`, with each `table_infos`'s element placed in a nonoverlapping
279    /// sub-level.
280    pub fn generate_l0_nonoverlapping_sublevels(table_infos: Vec<SstableInfo>) -> OverlappingLevel {
281        let total_file_size = table_infos.iter().map(|table| table.sst_size).sum::<u64>();
282        let uncompressed_file_size = table_infos
283            .iter()
284            .map(|table| table.uncompressed_file_size)
285            .sum::<u64>();
286        OverlappingLevel {
287            sub_levels: table_infos
288                .into_iter()
289                .enumerate()
290                .map(|(idx, table)| Level {
291                    level_idx: 0,
292                    level_type: LevelType::Nonoverlapping,
293                    total_file_size: table.sst_size,
294                    uncompressed_file_size: table.uncompressed_file_size,
295                    sub_level_id: idx as u64,
296                    table_infos: vec![table],
297                    ..Default::default()
298                })
299                .collect_vec(),
300            total_file_size,
301            uncompressed_file_size,
302        }
303    }
304
305    pub fn generate_l0_nonoverlapping_multi_sublevels(
306        table_infos: Vec<Vec<SstableInfo>>,
307    ) -> OverlappingLevel {
308        let mut l0 = OverlappingLevel {
309            sub_levels: table_infos
310                .into_iter()
311                .enumerate()
312                .map(|(idx, table)| Level {
313                    level_idx: 0,
314                    level_type: LevelType::Nonoverlapping,
315                    total_file_size: table.iter().map(|table| table.sst_size).sum::<u64>(),
316                    uncompressed_file_size: table
317                        .iter()
318                        .map(|sst| sst.uncompressed_file_size)
319                        .sum::<u64>(),
320                    sub_level_id: idx as u64,
321                    table_infos: table,
322                    ..Default::default()
323                })
324                .collect_vec(),
325            total_file_size: 0,
326            uncompressed_file_size: 0,
327        };
328
329        l0.total_file_size = l0.sub_levels.iter().map(|l| l.total_file_size).sum::<u64>();
330        l0.uncompressed_file_size = l0
331            .sub_levels
332            .iter()
333            .map(|l| l.uncompressed_file_size)
334            .sum::<u64>();
335        l0
336    }
337
338    /// Returns a `OverlappingLevel`, with each `table_infos`'s element placed in a overlapping
339    /// sub-level.
340    pub fn generate_l0_overlapping_sublevels(
341        table_infos: Vec<Vec<SstableInfo>>,
342    ) -> OverlappingLevel {
343        let mut l0 = OverlappingLevel {
344            sub_levels: table_infos
345                .into_iter()
346                .enumerate()
347                .map(|(idx, table)| Level {
348                    level_idx: 0,
349                    level_type: LevelType::Overlapping,
350                    total_file_size: table.iter().map(|table| table.sst_size).sum::<u64>(),
351                    sub_level_id: idx as u64,
352                    table_infos: table.clone(),
353                    uncompressed_file_size: table
354                        .iter()
355                        .map(|sst| sst.uncompressed_file_size)
356                        .sum::<u64>(),
357                    ..Default::default()
358                })
359                .collect_vec(),
360            total_file_size: 0,
361            uncompressed_file_size: 0,
362        };
363        l0.total_file_size = l0.sub_levels.iter().map(|l| l.total_file_size).sum::<u64>();
364        l0.uncompressed_file_size = l0
365            .sub_levels
366            .iter()
367            .map(|l| l.uncompressed_file_size)
368            .sum::<u64>();
369        l0
370    }
371
372    pub fn assert_compaction_task(compact_task: &CompactionTask, level_handlers: &[LevelHandler]) {
373        for i in &compact_task.input.input_levels {
374            for t in &i.table_infos {
375                assert!(level_handlers[i.level_idx as usize].is_pending_compact(&t.sst_id));
376            }
377        }
378    }
379}