risingwave_meta/hummock/compaction/picker/
space_reclaim_compaction_picker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use risingwave_hummock_sdk::level::{InputLevel, Levels};
18use risingwave_hummock_sdk::sstable_info::SstableInfo;
19
20use super::CompactionInput;
21use crate::hummock::level_handler::LevelHandler;
22
23// The execution model of SpaceReclaimCompactionPicker scans through the last level of files by
24// key_range and selects the appropriate files to generate compaction
25pub struct SpaceReclaimCompactionPicker {
26    // config
27    pub _max_space_reclaim_bytes: u64,
28
29    // for filter
30    pub all_table_ids: HashSet<u32>,
31}
32
33// According to the execution model of SpaceReclaimCompactionPicker, SpaceReclaimPickerState is
34// designed to record the state of each round of scanning
35#[derive(Default)]
36pub struct SpaceReclaimPickerState {
37    pub last_level: usize,
38}
39
40impl SpaceReclaimCompactionPicker {
41    pub fn new(max_space_reclaim_bytes: u64, all_table_ids: HashSet<u32>) -> Self {
42        Self {
43            _max_space_reclaim_bytes: max_space_reclaim_bytes,
44            all_table_ids,
45        }
46    }
47
48    fn exist_table_count(&self, sst: &SstableInfo) -> usize {
49        // it means all the table exist , so we not need to pick this sst
50        sst.table_ids
51            .iter()
52            .filter(|id| self.all_table_ids.contains(id))
53            .count()
54    }
55}
56
57impl SpaceReclaimCompactionPicker {
58    pub fn pick_compaction(
59        &mut self,
60        levels: &Levels,
61        level_handlers: &[LevelHandler],
62        state: &mut SpaceReclaimPickerState,
63    ) -> Option<CompactionInput> {
64        assert!(!levels.levels.is_empty());
65        let mut select_input_ssts = vec![];
66
67        if state.last_level == 0 {
68            let l0 = &levels.l0;
69            // only pick trivial reclaim sstables because this kind of task could be optimized and do not need send to compactor.
70            for level in &l0.sub_levels {
71                for sst in &level.table_infos {
72                    let exist_count = self.exist_table_count(sst);
73                    if exist_count == sst.table_ids.len()
74                        || level_handlers[0].is_pending_compact(&sst.sst_id)
75                    {
76                        if !select_input_ssts.is_empty() {
77                            break;
78                        }
79                    } else if exist_count == 0 {
80                        select_input_ssts.push(sst.clone());
81                    } else if !select_input_ssts.is_empty() {
82                        break;
83                    }
84                }
85                if !select_input_ssts.is_empty() {
86                    return Some(CompactionInput {
87                        select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
88                        total_file_count: select_input_ssts.len() as u64,
89                        input_levels: vec![
90                            InputLevel {
91                                level_idx: level.level_idx,
92                                level_type: level.level_type,
93                                table_infos: select_input_ssts,
94                            },
95                            InputLevel {
96                                level_idx: 0,
97                                level_type: level.level_type,
98                                table_infos: vec![],
99                            },
100                        ],
101                        target_level: level.level_idx as usize,
102                        target_sub_level_id: level.sub_level_id,
103                        ..Default::default()
104                    });
105                }
106            }
107            state.last_level = 1;
108        }
109        while state.last_level <= levels.levels.len() {
110            let mut is_trivial_task = true;
111            for sst in &levels.levels[state.last_level - 1].table_infos {
112                let exist_count = self.exist_table_count(sst);
113                let need_reclaim = exist_count < sst.table_ids.len();
114                let is_trivial_sst = exist_count == 0;
115                if !need_reclaim || level_handlers[state.last_level].is_pending_compact(&sst.sst_id)
116                {
117                    if !select_input_ssts.is_empty() {
118                        // Our goal is to pick as many complete layers of data as possible and keep
119                        // the picked files contiguous to avoid overlapping
120                        // key_ranges, so the strategy is to pick as many
121                        // contiguous files as possible (at least one)
122                        break;
123                    }
124                    continue;
125                }
126
127                if !is_trivial_sst {
128                    if !select_input_ssts.is_empty() {
129                        break;
130                    }
131                    is_trivial_task = false;
132                }
133
134                select_input_ssts.push(sst.clone());
135                if !is_trivial_task {
136                    break;
137                }
138            }
139
140            // turn to next_round
141            if !select_input_ssts.is_empty() {
142                return Some(CompactionInput {
143                    select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
144                    total_file_count: select_input_ssts.len() as u64,
145                    input_levels: vec![
146                        InputLevel {
147                            level_idx: state.last_level as u32,
148                            level_type: levels.levels[state.last_level - 1].level_type,
149                            table_infos: select_input_ssts,
150                        },
151                        InputLevel {
152                            level_idx: state.last_level as u32,
153                            level_type: levels.levels[state.last_level - 1].level_type,
154                            table_infos: vec![],
155                        },
156                    ],
157                    target_level: state.last_level,
158                    ..Default::default()
159                });
160            }
161            state.last_level += 1;
162        }
163        state.last_level = 0;
164        None
165    }
166}
167
168#[cfg(test)]
169mod test {
170
171    use std::collections::{BTreeSet, HashMap};
172    use std::sync::Arc;
173
174    use itertools::Itertools;
175    use risingwave_common::catalog::TableId;
176    use risingwave_hummock_sdk::key_range::KeyRange;
177    use risingwave_hummock_sdk::level::Level;
178    use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
179    use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
180    pub use risingwave_pb::hummock::LevelType;
181    use risingwave_pb::hummock::compact_task;
182
183    use super::*;
184    use crate::hummock::compaction::CompactionDeveloperConfig;
185    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
186    use crate::hummock::compaction::selector::tests::{
187        assert_compaction_task, generate_l0_nonoverlapping_sublevels, generate_level,
188        generate_table_with_ids_and_epochs,
189    };
190    use crate::hummock::compaction::selector::{
191        CompactionSelector, LocalSelectorStatistic, SpaceReclaimCompactionSelector,
192    };
193    use crate::hummock::model::CompactionGroup;
194    use crate::hummock::test_utils::compaction_selector_context;
195
196    #[test]
197    fn test_space_reclaim_compaction_selector() {
198        let max_space_reclaim_bytes = 400;
199        let config = CompactionConfigBuilder::new()
200            .max_level(4)
201            .max_space_reclaim_bytes(max_space_reclaim_bytes)
202            .build();
203        let group_config = CompactionGroup::new(1, config);
204
205        let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
206        assert_eq!(l0.sub_levels.len(), 0);
207        let mut levels = vec![
208            generate_level(1, vec![]),
209            generate_level(2, vec![]),
210            generate_level(
211                3,
212                vec![
213                    generate_table_with_ids_and_epochs(0, 1, 150, 151, 1, vec![0], 0, 0),
214                    generate_table_with_ids_and_epochs(1, 1, 250, 251, 1, vec![1], 0, 0),
215                ],
216            ),
217            Level {
218                level_idx: 4,
219                level_type: LevelType::Nonoverlapping,
220                table_infos: vec![
221                    generate_table_with_ids_and_epochs(2, 1, 0, 100, 1, vec![2], 0, 0),
222                    generate_table_with_ids_and_epochs(3, 1, 101, 200, 1, vec![3], 0, 0),
223                    generate_table_with_ids_and_epochs(4, 1, 222, 300, 1, vec![4], 0, 0),
224                    generate_table_with_ids_and_epochs(5, 1, 333, 400, 1, vec![5], 0, 0),
225                    generate_table_with_ids_and_epochs(6, 1, 444, 500, 1, vec![6], 0, 0),
226                    generate_table_with_ids_and_epochs(7, 1, 555, 600, 1, vec![7], 0, 0),
227                    generate_table_with_ids_and_epochs(8, 1, 666, 700, 1, vec![8], 0, 0),
228                    generate_table_with_ids_and_epochs(9, 1, 777, 800, 1, vec![9], 0, 0),
229                    generate_table_with_ids_and_epochs(10, 1, 888, 1600, 1, vec![10], 0, 0),
230                    generate_table_with_ids_and_epochs(11, 1, 1600, 1800, 1, vec![10], 0, 0),
231                ],
232                ..Default::default()
233            },
234        ];
235
236        {
237            let sst_10 = levels[3].table_infos.get_mut(8).unwrap();
238            assert_eq!(10, sst_10.sst_id);
239            *sst_10 = SstableInfoInner {
240                key_range: KeyRange {
241                    right_exclusive: true,
242                    ..sst_10.get_inner().key_range.clone()
243                },
244                ..sst_10.get_inner()
245            }
246            .into();
247        }
248
249        assert_eq!(levels.len(), 4);
250        let levels = Levels {
251            levels,
252            l0,
253            ..Default::default()
254        };
255        let mut member_table_ids = BTreeSet::new();
256        let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
257        let mut local_stats = LocalSelectorStatistic::default();
258
259        let mut selector = SpaceReclaimCompactionSelector::default();
260        {
261            // test max_pick_files limit
262
263            // pick space reclaim
264            let task = selector
265                .pick_compaction(
266                    1,
267                    compaction_selector_context(
268                        &group_config,
269                        &levels,
270                        &member_table_ids,
271                        &mut levels_handler,
272                        &mut local_stats,
273                        &HashMap::default(),
274                        Arc::new(CompactionDeveloperConfig::default()),
275                        &Default::default(),
276                        &HummockVersionStateTableInfo::empty(),
277                    ),
278                )
279                .unwrap();
280            assert_compaction_task(&task, &levels_handler);
281            assert_eq!(task.input.input_levels.len(), 2);
282            assert_eq!(task.input.input_levels[0].level_idx, 3);
283            assert_eq!(task.input.input_levels[0].table_infos.len(), 2);
284            levels_handler[4].add_pending_task(0, 4, &levels.levels[3].table_infos[5..6]);
285            let task = selector
286                .pick_compaction(
287                    1,
288                    compaction_selector_context(
289                        &group_config,
290                        &levels,
291                        &member_table_ids,
292                        &mut levels_handler,
293                        &mut local_stats,
294                        &HashMap::default(),
295                        Arc::new(CompactionDeveloperConfig::default()),
296                        &Default::default(),
297                        &HummockVersionStateTableInfo::empty(),
298                    ),
299                )
300                .unwrap();
301            assert_eq!(task.input.input_levels.len(), 2);
302            assert_eq!(task.input.input_levels[0].level_idx, 4);
303            assert_eq!(task.input.input_levels[0].table_infos.len(), 5);
304
305            let mut start_id = 2;
306            for sst in &task.input.input_levels[0].table_infos {
307                assert_eq!(start_id, sst.sst_id);
308                start_id += 1;
309            }
310
311            assert_eq!(task.input.input_levels[1].level_idx, 4);
312            assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
313            assert_eq!(task.input.target_level, 4);
314            assert!(matches!(
315                task.compaction_task_type,
316                compact_task::TaskType::SpaceReclaim
317            ));
318
319            // in this case, no files is pending, so it limit by max_space_reclaim_bytes
320            let select_file_size: u64 = task.input.input_levels[0]
321                .table_infos
322                .iter()
323                .map(|sst| sst.sst_size)
324                .sum();
325            assert!(select_file_size > max_space_reclaim_bytes);
326        }
327
328        {
329            // test pick next range
330            let task = selector
331                .pick_compaction(
332                    1,
333                    compaction_selector_context(
334                        &group_config,
335                        &levels,
336                        &member_table_ids,
337                        &mut levels_handler,
338                        &mut local_stats,
339                        &HashMap::default(),
340                        Arc::new(CompactionDeveloperConfig::default()),
341                        &Default::default(),
342                        &HummockVersionStateTableInfo::empty(),
343                    ),
344                )
345                .unwrap();
346            assert_compaction_task(&task, &levels_handler);
347            assert_eq!(task.input.input_levels.len(), 2);
348            assert_eq!(task.input.input_levels[0].level_idx, 4);
349            assert_eq!(task.input.input_levels[0].table_infos.len(), 4);
350            assert_eq!(task.input.target_level, 4);
351            assert!(matches!(
352                task.compaction_task_type,
353                compact_task::TaskType::SpaceReclaim
354            ));
355            let mut start_id = 8;
356            for sst in &task.input.input_levels[0].table_infos {
357                assert_eq!(start_id, sst.sst_id);
358                start_id += 1;
359            }
360
361            assert!(
362                selector
363                    .pick_compaction(
364                        1,
365                        compaction_selector_context(
366                            &group_config,
367                            &levels,
368                            &member_table_ids,
369                            &mut levels_handler,
370                            &mut local_stats,
371                            &HashMap::default(),
372                            Arc::new(CompactionDeveloperConfig::default()),
373                            &Default::default(),
374                            &HummockVersionStateTableInfo::empty(),
375                        ),
376                    )
377                    .is_none()
378            )
379        }
380
381        {
382            // test state, after above 2
383
384            for level_handler in &mut levels_handler {
385                for pending_task_id in &level_handler.pending_tasks_ids() {
386                    level_handler.remove_task(*pending_task_id);
387                }
388            }
389
390            member_table_ids = BTreeSet::from_iter(
391                [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
392                    .into_iter()
393                    .map(TableId::new),
394            );
395            // pick space reclaim
396            let task = selector.pick_compaction(
397                1,
398                compaction_selector_context(
399                    &group_config,
400                    &levels,
401                    &member_table_ids,
402                    &mut levels_handler,
403                    &mut local_stats,
404                    &HashMap::default(),
405                    Arc::new(CompactionDeveloperConfig::default()),
406                    &Default::default(),
407                    &HummockVersionStateTableInfo::empty(),
408                ),
409            );
410            assert!(task.is_none());
411        }
412
413        {
414            for level_handler in &mut levels_handler {
415                for pending_task_id in &level_handler.pending_tasks_ids() {
416                    level_handler.remove_task(*pending_task_id);
417                }
418            }
419
420            member_table_ids =
421                BTreeSet::from_iter([2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(TableId::new));
422            // pick space reclaim
423            let task = selector
424                .pick_compaction(
425                    1,
426                    compaction_selector_context(
427                        &group_config,
428                        &levels,
429                        &member_table_ids,
430                        &mut levels_handler,
431                        &mut local_stats,
432                        &HashMap::default(),
433                        Arc::new(CompactionDeveloperConfig::default()),
434                        &Default::default(),
435                        &HummockVersionStateTableInfo::empty(),
436                    ),
437                )
438                .unwrap();
439            assert_compaction_task(&task, &levels_handler);
440            assert_eq!(task.input.input_levels.len(), 2);
441            assert_eq!(task.input.input_levels[0].level_idx, 3);
442            assert_eq!(task.input.input_levels[0].table_infos.len(), 2);
443            assert_eq!(task.input.target_level, 3);
444            assert!(matches!(
445                task.compaction_task_type,
446                compact_task::TaskType::SpaceReclaim
447            ));
448        }
449
450        {
451            // test continuous file selection
452            for level_handler in &mut levels_handler {
453                for pending_task_id in &level_handler.pending_tasks_ids() {
454                    level_handler.remove_task(*pending_task_id);
455                }
456            }
457
458            // rebuild selector
459            selector = SpaceReclaimCompactionSelector::default();
460            // cut range [3,4] [6] [8,9,10]
461            member_table_ids = BTreeSet::from_iter([0, 1, 2, 5, 7].into_iter().map(TableId::new));
462            let expect_task_file_count = [2, 1, 4];
463            let expect_task_sst_id_range = [vec![3, 4], vec![6], vec![8, 9, 10, 11]];
464            for (index, x) in expect_task_file_count.iter().enumerate() {
465                // // pick space reclaim
466                let task = selector
467                    .pick_compaction(
468                        1,
469                        compaction_selector_context(
470                            &group_config,
471                            &levels,
472                            &member_table_ids,
473                            &mut levels_handler,
474                            &mut local_stats,
475                            &HashMap::default(),
476                            Arc::new(CompactionDeveloperConfig::default()),
477                            &Default::default(),
478                            &HummockVersionStateTableInfo::empty(),
479                        ),
480                    )
481                    .unwrap();
482
483                assert_compaction_task(&task, &levels_handler);
484                assert_eq!(task.input.input_levels.len(), 2);
485                assert_eq!(task.input.input_levels[0].level_idx, 4);
486
487                assert_eq!(task.input.input_levels[0].table_infos.len(), *x);
488                let select_sst = &task.input.input_levels[0]
489                    .table_infos
490                    .iter()
491                    .map(|sst| sst.sst_id)
492                    .collect_vec();
493                assert!(select_sst.is_sorted());
494                assert_eq!(expect_task_sst_id_range[index], *select_sst);
495
496                assert_eq!(task.input.input_levels[1].level_idx, 4);
497                assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
498                assert_eq!(task.input.target_level, 4);
499                assert!(matches!(
500                    task.compaction_task_type,
501                    compact_task::TaskType::SpaceReclaim
502                ));
503            }
504        }
505
506        {
507            // test continuous file selection with filter change
508            for level_handler in &mut levels_handler {
509                for pending_task_id in &level_handler.pending_tasks_ids() {
510                    level_handler.remove_task(*pending_task_id);
511                }
512            }
513
514            // rebuild selector
515            selector = SpaceReclaimCompactionSelector::default();
516            // cut range [3,4] [6] [8,9,10]
517
518            member_table_ids = BTreeSet::from_iter([0, 1, 2, 5, 7].into_iter().map(TableId::new));
519            let expect_task_file_count = [2, 1, 5];
520            let expect_task_sst_id_range = [vec![3, 4], vec![6], vec![7, 8, 9, 10, 11]];
521            for (index, x) in expect_task_file_count.iter().enumerate() {
522                if index == expect_task_file_count.len() - 1 {
523                    member_table_ids = BTreeSet::from_iter([2, 5].into_iter().map(TableId::new));
524                }
525
526                // // pick space reclaim
527                let task = selector
528                    .pick_compaction(
529                        1,
530                        compaction_selector_context(
531                            &group_config,
532                            &levels,
533                            &member_table_ids,
534                            &mut levels_handler,
535                            &mut local_stats,
536                            &HashMap::default(),
537                            Arc::new(CompactionDeveloperConfig::default()),
538                            &Default::default(),
539                            &HummockVersionStateTableInfo::empty(),
540                        ),
541                    )
542                    .unwrap();
543
544                assert_compaction_task(&task, &levels_handler);
545                assert_eq!(task.input.input_levels.len(), 2);
546                assert_eq!(task.input.input_levels[0].level_idx, 4);
547
548                assert_eq!(task.input.input_levels[0].table_infos.len(), *x);
549                let select_sst = &task.input.input_levels[0]
550                    .table_infos
551                    .iter()
552                    .map(|sst| sst.sst_id)
553                    .collect_vec();
554                assert!(select_sst.is_sorted());
555                assert_eq!(expect_task_sst_id_range[index], *select_sst);
556
557                assert_eq!(task.input.input_levels[1].level_idx, 4);
558                assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
559                assert_eq!(task.input.target_level, 4);
560                assert!(matches!(
561                    task.compaction_task_type,
562                    compact_task::TaskType::SpaceReclaim
563                ));
564            }
565        }
566    }
567}