risingwave_meta/hummock/compaction/picker/
space_reclaim_compaction_picker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use risingwave_common::catalog::TableId;
18use risingwave_hummock_sdk::level::{InputLevel, Levels};
19use risingwave_hummock_sdk::sstable_info::SstableInfo;
20
21use super::CompactionInput;
22use crate::hummock::level_handler::LevelHandler;
23
24// The execution model of SpaceReclaimCompactionPicker scans through the last level of files by
25// key_range and selects the appropriate files to generate compaction
26pub struct SpaceReclaimCompactionPicker {
27    // config
28    pub _max_space_reclaim_bytes: u64,
29
30    // for filter
31    pub all_table_ids: HashSet<TableId>,
32}
33
34// According to the execution model of SpaceReclaimCompactionPicker, SpaceReclaimPickerState is
35// designed to record the state of each round of scanning
36#[derive(Default)]
37pub struct SpaceReclaimPickerState {
38    pub last_level: usize,
39}
40
41impl SpaceReclaimCompactionPicker {
42    pub fn new(max_space_reclaim_bytes: u64, all_table_ids: HashSet<TableId>) -> Self {
43        Self {
44            _max_space_reclaim_bytes: max_space_reclaim_bytes,
45            all_table_ids,
46        }
47    }
48
49    fn exist_table_count(&self, sst: &SstableInfo) -> usize {
50        // it means all the table exist , so we not need to pick this sst
51        sst.table_ids
52            .iter()
53            .filter(|id| self.all_table_ids.contains(id))
54            .count()
55    }
56}
57
58impl SpaceReclaimCompactionPicker {
59    pub fn pick_compaction(
60        &mut self,
61        levels: &Levels,
62        level_handlers: &[LevelHandler],
63        state: &mut SpaceReclaimPickerState,
64    ) -> Option<CompactionInput> {
65        assert!(!levels.levels.is_empty());
66        let mut select_input_ssts = vec![];
67
68        if state.last_level == 0 {
69            let l0 = &levels.l0;
70            // only pick trivial reclaim sstables because this kind of task could be optimized and do not need send to compactor.
71            for level in &l0.sub_levels {
72                for sst in &level.table_infos {
73                    let exist_count = self.exist_table_count(sst);
74                    if exist_count == sst.table_ids.len()
75                        || level_handlers[0].is_pending_compact(&sst.sst_id)
76                    {
77                        if !select_input_ssts.is_empty() {
78                            break;
79                        }
80                    } else if exist_count == 0 {
81                        select_input_ssts.push(sst.clone());
82                    } else if !select_input_ssts.is_empty() {
83                        break;
84                    }
85                }
86                if !select_input_ssts.is_empty() {
87                    return Some(CompactionInput {
88                        select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
89                        total_file_count: select_input_ssts.len() as u64,
90                        input_levels: vec![
91                            InputLevel {
92                                level_idx: level.level_idx,
93                                level_type: level.level_type,
94                                table_infos: select_input_ssts,
95                            },
96                            InputLevel {
97                                level_idx: 0,
98                                level_type: level.level_type,
99                                table_infos: vec![],
100                            },
101                        ],
102                        target_level: level.level_idx as usize,
103                        target_sub_level_id: level.sub_level_id,
104                        ..Default::default()
105                    });
106                }
107            }
108            state.last_level = 1;
109        }
110        while state.last_level <= levels.levels.len() {
111            let mut is_trivial_task = true;
112            for sst in &levels.levels[state.last_level - 1].table_infos {
113                let exist_count = self.exist_table_count(sst);
114                let need_reclaim = exist_count < sst.table_ids.len();
115                let is_trivial_sst = exist_count == 0;
116                if !need_reclaim || level_handlers[state.last_level].is_pending_compact(&sst.sst_id)
117                {
118                    if !select_input_ssts.is_empty() {
119                        // Our goal is to pick as many complete layers of data as possible and keep
120                        // the picked files contiguous to avoid overlapping
121                        // key_ranges, so the strategy is to pick as many
122                        // contiguous files as possible (at least one)
123                        break;
124                    }
125                    continue;
126                }
127
128                if !is_trivial_sst {
129                    if !select_input_ssts.is_empty() {
130                        break;
131                    }
132                    is_trivial_task = false;
133                }
134
135                select_input_ssts.push(sst.clone());
136                if !is_trivial_task {
137                    break;
138                }
139            }
140
141            // turn to next_round
142            if !select_input_ssts.is_empty() {
143                return Some(CompactionInput {
144                    select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
145                    total_file_count: select_input_ssts.len() as u64,
146                    input_levels: vec![
147                        InputLevel {
148                            level_idx: state.last_level as u32,
149                            level_type: levels.levels[state.last_level - 1].level_type,
150                            table_infos: select_input_ssts,
151                        },
152                        InputLevel {
153                            level_idx: state.last_level as u32,
154                            level_type: levels.levels[state.last_level - 1].level_type,
155                            table_infos: vec![],
156                        },
157                    ],
158                    target_level: state.last_level,
159                    ..Default::default()
160                });
161            }
162            state.last_level += 1;
163        }
164        state.last_level = 0;
165        None
166    }
167}
168
169#[cfg(test)]
170mod test {
171
172    use std::collections::{BTreeSet, HashMap};
173    use std::sync::Arc;
174
175    use itertools::Itertools;
176    use risingwave_common::catalog::TableId;
177    use risingwave_hummock_sdk::key_range::KeyRange;
178    use risingwave_hummock_sdk::level::Level;
179    use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
180    use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
181    pub use risingwave_pb::hummock::LevelType;
182    use risingwave_pb::hummock::compact_task;
183
184    use super::*;
185    use crate::hummock::compaction::CompactionDeveloperConfig;
186    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
187    use crate::hummock::compaction::selector::tests::{
188        assert_compaction_task, generate_l0_nonoverlapping_sublevels, generate_level,
189        generate_table_with_ids_and_epochs,
190    };
191    use crate::hummock::compaction::selector::{
192        CompactionSelector, LocalSelectorStatistic, SpaceReclaimCompactionSelector,
193    };
194    use crate::hummock::model::CompactionGroup;
195    use crate::hummock::test_utils::compaction_selector_context;
196
197    #[test]
198    fn test_space_reclaim_compaction_selector() {
199        let max_space_reclaim_bytes = 400;
200        let config = CompactionConfigBuilder::new()
201            .max_level(4)
202            .max_space_reclaim_bytes(max_space_reclaim_bytes)
203            .build();
204        let group_config = CompactionGroup::new(1, config);
205
206        let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
207        assert_eq!(l0.sub_levels.len(), 0);
208        let mut levels = vec![
209            generate_level(1, vec![]),
210            generate_level(2, vec![]),
211            generate_level(
212                3,
213                vec![
214                    generate_table_with_ids_and_epochs(0, 1, 150, 151, 1, vec![0], 0, 0),
215                    generate_table_with_ids_and_epochs(1, 1, 250, 251, 1, vec![1], 0, 0),
216                ],
217            ),
218            Level {
219                level_idx: 4,
220                level_type: LevelType::Nonoverlapping,
221                table_infos: vec![
222                    generate_table_with_ids_and_epochs(2, 1, 0, 100, 1, vec![2], 0, 0),
223                    generate_table_with_ids_and_epochs(3, 1, 101, 200, 1, vec![3], 0, 0),
224                    generate_table_with_ids_and_epochs(4, 1, 222, 300, 1, vec![4], 0, 0),
225                    generate_table_with_ids_and_epochs(5, 1, 333, 400, 1, vec![5], 0, 0),
226                    generate_table_with_ids_and_epochs(6, 1, 444, 500, 1, vec![6], 0, 0),
227                    generate_table_with_ids_and_epochs(7, 1, 555, 600, 1, vec![7], 0, 0),
228                    generate_table_with_ids_and_epochs(8, 1, 666, 700, 1, vec![8], 0, 0),
229                    generate_table_with_ids_and_epochs(9, 1, 777, 800, 1, vec![9], 0, 0),
230                    generate_table_with_ids_and_epochs(10, 1, 888, 1600, 1, vec![10], 0, 0),
231                    generate_table_with_ids_and_epochs(11, 1, 1600, 1800, 1, vec![10], 0, 0),
232                ],
233                ..Default::default()
234            },
235        ];
236
237        {
238            let sst_10 = levels[3].table_infos.get_mut(8).unwrap();
239            assert_eq!(10, sst_10.sst_id);
240            *sst_10 = SstableInfoInner {
241                key_range: KeyRange {
242                    right_exclusive: true,
243                    ..sst_10.get_inner().key_range
244                },
245                ..sst_10.get_inner()
246            }
247            .into();
248        }
249
250        assert_eq!(levels.len(), 4);
251        let levels = Levels {
252            levels,
253            l0,
254            ..Default::default()
255        };
256        let mut member_table_ids = BTreeSet::new();
257        let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
258        let mut local_stats = LocalSelectorStatistic::default();
259
260        let mut selector = SpaceReclaimCompactionSelector::default();
261        {
262            // test max_pick_files limit
263
264            // pick space reclaim
265            let task = selector
266                .pick_compaction(
267                    1,
268                    compaction_selector_context(
269                        &group_config,
270                        &levels,
271                        &member_table_ids,
272                        &mut levels_handler,
273                        &mut local_stats,
274                        &HashMap::default(),
275                        Arc::new(CompactionDeveloperConfig::default()),
276                        &Default::default(),
277                        &HummockVersionStateTableInfo::empty(),
278                    ),
279                )
280                .unwrap();
281            assert_compaction_task(&task, &levels_handler);
282            assert_eq!(task.input.input_levels.len(), 2);
283            assert_eq!(task.input.input_levels[0].level_idx, 3);
284            assert_eq!(task.input.input_levels[0].table_infos.len(), 2);
285            levels_handler[4].add_pending_task(0, 4, &levels.levels[3].table_infos[5..6]);
286            let task = selector
287                .pick_compaction(
288                    1,
289                    compaction_selector_context(
290                        &group_config,
291                        &levels,
292                        &member_table_ids,
293                        &mut levels_handler,
294                        &mut local_stats,
295                        &HashMap::default(),
296                        Arc::new(CompactionDeveloperConfig::default()),
297                        &Default::default(),
298                        &HummockVersionStateTableInfo::empty(),
299                    ),
300                )
301                .unwrap();
302            assert_eq!(task.input.input_levels.len(), 2);
303            assert_eq!(task.input.input_levels[0].level_idx, 4);
304            assert_eq!(task.input.input_levels[0].table_infos.len(), 5);
305
306            let mut start_id = 2;
307            for sst in &task.input.input_levels[0].table_infos {
308                assert_eq!(start_id, sst.sst_id);
309                start_id += 1;
310            }
311
312            assert_eq!(task.input.input_levels[1].level_idx, 4);
313            assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
314            assert_eq!(task.input.target_level, 4);
315            assert!(matches!(
316                task.compaction_task_type,
317                compact_task::TaskType::SpaceReclaim
318            ));
319
320            // in this case, no files is pending, so it limit by max_space_reclaim_bytes
321            let select_file_size: u64 = task.input.input_levels[0]
322                .table_infos
323                .iter()
324                .map(|sst| sst.sst_size)
325                .sum();
326            assert!(select_file_size > max_space_reclaim_bytes);
327        }
328
329        {
330            // test pick next range
331            let task = selector
332                .pick_compaction(
333                    1,
334                    compaction_selector_context(
335                        &group_config,
336                        &levels,
337                        &member_table_ids,
338                        &mut levels_handler,
339                        &mut local_stats,
340                        &HashMap::default(),
341                        Arc::new(CompactionDeveloperConfig::default()),
342                        &Default::default(),
343                        &HummockVersionStateTableInfo::empty(),
344                    ),
345                )
346                .unwrap();
347            assert_compaction_task(&task, &levels_handler);
348            assert_eq!(task.input.input_levels.len(), 2);
349            assert_eq!(task.input.input_levels[0].level_idx, 4);
350            assert_eq!(task.input.input_levels[0].table_infos.len(), 4);
351            assert_eq!(task.input.target_level, 4);
352            assert!(matches!(
353                task.compaction_task_type,
354                compact_task::TaskType::SpaceReclaim
355            ));
356            let mut start_id = 8;
357            for sst in &task.input.input_levels[0].table_infos {
358                assert_eq!(start_id, sst.sst_id);
359                start_id += 1;
360            }
361
362            assert!(
363                selector
364                    .pick_compaction(
365                        1,
366                        compaction_selector_context(
367                            &group_config,
368                            &levels,
369                            &member_table_ids,
370                            &mut levels_handler,
371                            &mut local_stats,
372                            &HashMap::default(),
373                            Arc::new(CompactionDeveloperConfig::default()),
374                            &Default::default(),
375                            &HummockVersionStateTableInfo::empty(),
376                        ),
377                    )
378                    .is_none()
379            )
380        }
381
382        {
383            // test state, after above 2
384
385            for level_handler in &mut levels_handler {
386                for pending_task_id in &level_handler.pending_tasks_ids() {
387                    level_handler.remove_task(*pending_task_id);
388                }
389            }
390
391            member_table_ids = BTreeSet::from_iter(
392                [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
393                    .into_iter()
394                    .map(TableId::new),
395            );
396            // pick space reclaim
397            let task = selector.pick_compaction(
398                1,
399                compaction_selector_context(
400                    &group_config,
401                    &levels,
402                    &member_table_ids,
403                    &mut levels_handler,
404                    &mut local_stats,
405                    &HashMap::default(),
406                    Arc::new(CompactionDeveloperConfig::default()),
407                    &Default::default(),
408                    &HummockVersionStateTableInfo::empty(),
409                ),
410            );
411            assert!(task.is_none());
412        }
413
414        {
415            for level_handler in &mut levels_handler {
416                for pending_task_id in &level_handler.pending_tasks_ids() {
417                    level_handler.remove_task(*pending_task_id);
418                }
419            }
420
421            member_table_ids =
422                BTreeSet::from_iter([2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(TableId::new));
423            // pick space reclaim
424            let task = selector
425                .pick_compaction(
426                    1,
427                    compaction_selector_context(
428                        &group_config,
429                        &levels,
430                        &member_table_ids,
431                        &mut levels_handler,
432                        &mut local_stats,
433                        &HashMap::default(),
434                        Arc::new(CompactionDeveloperConfig::default()),
435                        &Default::default(),
436                        &HummockVersionStateTableInfo::empty(),
437                    ),
438                )
439                .unwrap();
440            assert_compaction_task(&task, &levels_handler);
441            assert_eq!(task.input.input_levels.len(), 2);
442            assert_eq!(task.input.input_levels[0].level_idx, 3);
443            assert_eq!(task.input.input_levels[0].table_infos.len(), 2);
444            assert_eq!(task.input.target_level, 3);
445            assert!(matches!(
446                task.compaction_task_type,
447                compact_task::TaskType::SpaceReclaim
448            ));
449        }
450
451        {
452            // test continuous file selection
453            for level_handler in &mut levels_handler {
454                for pending_task_id in &level_handler.pending_tasks_ids() {
455                    level_handler.remove_task(*pending_task_id);
456                }
457            }
458
459            // rebuild selector
460            selector = SpaceReclaimCompactionSelector::default();
461            // cut range [3,4] [6] [8,9,10]
462            member_table_ids = BTreeSet::from_iter([0, 1, 2, 5, 7].into_iter().map(TableId::new));
463            let expect_task_file_count = [2, 1, 4];
464            let expect_task_sst_id_range = [vec![3, 4], vec![6], vec![8, 9, 10, 11]];
465            for (index, x) in expect_task_file_count.iter().enumerate() {
466                // // pick space reclaim
467                let task = selector
468                    .pick_compaction(
469                        1,
470                        compaction_selector_context(
471                            &group_config,
472                            &levels,
473                            &member_table_ids,
474                            &mut levels_handler,
475                            &mut local_stats,
476                            &HashMap::default(),
477                            Arc::new(CompactionDeveloperConfig::default()),
478                            &Default::default(),
479                            &HummockVersionStateTableInfo::empty(),
480                        ),
481                    )
482                    .unwrap();
483
484                assert_compaction_task(&task, &levels_handler);
485                assert_eq!(task.input.input_levels.len(), 2);
486                assert_eq!(task.input.input_levels[0].level_idx, 4);
487
488                assert_eq!(task.input.input_levels[0].table_infos.len(), *x);
489                let select_sst = &task.input.input_levels[0]
490                    .table_infos
491                    .iter()
492                    .map(|sst| sst.sst_id)
493                    .collect_vec();
494                assert!(select_sst.is_sorted());
495                assert_eq!(expect_task_sst_id_range[index], *select_sst);
496
497                assert_eq!(task.input.input_levels[1].level_idx, 4);
498                assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
499                assert_eq!(task.input.target_level, 4);
500                assert!(matches!(
501                    task.compaction_task_type,
502                    compact_task::TaskType::SpaceReclaim
503                ));
504            }
505        }
506
507        {
508            // test continuous file selection with filter change
509            for level_handler in &mut levels_handler {
510                for pending_task_id in &level_handler.pending_tasks_ids() {
511                    level_handler.remove_task(*pending_task_id);
512                }
513            }
514
515            // rebuild selector
516            selector = SpaceReclaimCompactionSelector::default();
517            // cut range [3,4] [6] [8,9,10]
518
519            member_table_ids = BTreeSet::from_iter([0, 1, 2, 5, 7].into_iter().map(TableId::new));
520            let expect_task_file_count = [2, 1, 5];
521            let expect_task_sst_id_range = [vec![3, 4], vec![6], vec![7, 8, 9, 10, 11]];
522            for (index, x) in expect_task_file_count.iter().enumerate() {
523                if index == expect_task_file_count.len() - 1 {
524                    member_table_ids = BTreeSet::from_iter([2, 5].into_iter().map(TableId::new));
525                }
526
527                // // pick space reclaim
528                let task = selector
529                    .pick_compaction(
530                        1,
531                        compaction_selector_context(
532                            &group_config,
533                            &levels,
534                            &member_table_ids,
535                            &mut levels_handler,
536                            &mut local_stats,
537                            &HashMap::default(),
538                            Arc::new(CompactionDeveloperConfig::default()),
539                            &Default::default(),
540                            &HummockVersionStateTableInfo::empty(),
541                        ),
542                    )
543                    .unwrap();
544
545                assert_compaction_task(&task, &levels_handler);
546                assert_eq!(task.input.input_levels.len(), 2);
547                assert_eq!(task.input.input_levels[0].level_idx, 4);
548
549                assert_eq!(task.input.input_levels[0].table_infos.len(), *x);
550                let select_sst = &task.input.input_levels[0]
551                    .table_infos
552                    .iter()
553                    .map(|sst| sst.sst_id)
554                    .collect_vec();
555                assert!(select_sst.is_sorted());
556                assert_eq!(expect_task_sst_id_range[index], *select_sst);
557
558                assert_eq!(task.input.input_levels[1].level_idx, 4);
559                assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
560                assert_eq!(task.input.target_level, 4);
561                assert!(matches!(
562                    task.compaction_task_type,
563                    compact_task::TaskType::SpaceReclaim
564                ));
565            }
566        }
567    }
568}