risingwave_meta/hummock/compaction/picker/
manual_compaction_picker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_hummock_sdk::HummockSstableId;
20use risingwave_hummock_sdk::level::{InputLevel, Level, Levels, OverlappingLevel};
21use risingwave_hummock_sdk::sstable_info::SstableInfo;
22use risingwave_pb::hummock::LevelType;
23
24use super::{CompactionInput, CompactionPicker, LocalPickerStatistic};
25use crate::hummock::compaction::overlap_strategy::{
26    OverlapInfo, OverlapStrategy, RangeOverlapInfo,
27};
28use crate::hummock::compaction::selector::ManualCompactionOption;
29use crate::hummock::level_handler::LevelHandler;
30
31pub struct ManualCompactionPicker {
32    overlap_strategy: Arc<dyn OverlapStrategy>,
33    option: ManualCompactionOption,
34    target_level: usize,
35}
36
37impl ManualCompactionPicker {
38    pub fn new(
39        overlap_strategy: Arc<dyn OverlapStrategy>,
40        option: ManualCompactionOption,
41        target_level: usize,
42    ) -> Self {
43        Self {
44            overlap_strategy,
45            option,
46            target_level,
47        }
48    }
49
50    fn pick_l0_to_sub_level(
51        &self,
52        l0: &OverlappingLevel,
53        level_handlers: &[LevelHandler],
54    ) -> Option<CompactionInput> {
55        assert_eq!(self.option.level, 0);
56        let mut input_levels = vec![];
57        let mut sub_level_id = 0;
58        let mut start_idx = None;
59        let mut end_idx = None;
60        // Decides the range of sub levels as input.
61        // We need pick consecutive sub_levels. See #5217.
62        for (idx, level) in l0.sub_levels.iter().enumerate() {
63            if !self.filter_level_by_option(level) {
64                continue;
65            }
66            if level_handlers[0].is_level_pending_compact(level) {
67                return None;
68            }
69            // Pick this sub_level.
70            if start_idx.is_none() {
71                sub_level_id = level.sub_level_id;
72                start_idx = Some(idx as u64);
73                end_idx = start_idx;
74            } else {
75                end_idx = Some(idx as u64);
76            }
77        }
78        let (start_idx, end_idx) = match (start_idx, end_idx) {
79            (Some(start_idx), Some(end_idx)) => (start_idx, end_idx),
80            _ => {
81                return None;
82            }
83        };
84        // Construct input.
85        for level in l0
86            .sub_levels
87            .iter()
88            .skip(start_idx as usize)
89            .take((end_idx - start_idx + 1) as usize)
90        {
91            input_levels.push(InputLevel {
92                level_idx: 0,
93                level_type: level.level_type,
94                table_infos: level.table_infos.clone(),
95            });
96        }
97        if input_levels.is_empty() {
98            return None;
99        }
100        input_levels.reverse();
101        Some(CompactionInput {
102            input_levels,
103            target_level: 0,
104            target_sub_level_id: sub_level_id,
105            ..Default::default()
106        })
107    }
108
109    fn pick_l0_to_base_level(
110        &self,
111        levels: &Levels,
112        level_handlers: &[LevelHandler],
113    ) -> Option<CompactionInput> {
114        assert!(self.option.level == 0 && self.target_level > 0);
115        for l in 1..self.target_level {
116            assert!(levels.levels[l - 1].table_infos.is_empty());
117        }
118        let l0 = &levels.l0;
119        let mut input_levels = vec![];
120        let mut max_sub_level_idx = usize::MAX;
121        let mut info = self.overlap_strategy.create_overlap_info();
122        // Decides the range of sub levels as input.
123        for (idx, level) in l0.sub_levels.iter().enumerate() {
124            if !self.filter_level_by_option(level) {
125                continue;
126            }
127            if level_handlers[0].is_level_pending_compact(level) {
128                return None;
129            }
130
131            // Pick this sub_level.
132            max_sub_level_idx = idx;
133        }
134        if max_sub_level_idx == usize::MAX {
135            return None;
136        }
137        // Construct input.
138        for idx in 0..=max_sub_level_idx {
139            for table in &l0.sub_levels[idx].table_infos {
140                info.update(&table.key_range);
141            }
142            input_levels.push(InputLevel {
143                level_idx: 0,
144                level_type: l0.sub_levels[idx].level_type,
145                table_infos: l0.sub_levels[idx].table_infos.clone(),
146            })
147        }
148        let target_input_ssts_range =
149            info.check_multiple_overlap(&levels.levels[self.target_level - 1].table_infos);
150        let target_input_ssts = if target_input_ssts_range.is_empty() {
151            vec![]
152        } else {
153            levels.levels[self.target_level - 1].table_infos[target_input_ssts_range].to_vec()
154        };
155        if target_input_ssts
156            .iter()
157            .any(|table| level_handlers[self.target_level].is_pending_compact(&table.sst_id))
158        {
159            return None;
160        }
161        if input_levels.is_empty() {
162            return None;
163        }
164        input_levels.reverse();
165        input_levels.push(InputLevel {
166            level_idx: self.target_level as u32,
167            level_type: LevelType::Nonoverlapping,
168            table_infos: target_input_ssts,
169        });
170
171        Some(CompactionInput {
172            input_levels,
173            target_level: self.target_level,
174            target_sub_level_id: 0,
175            ..Default::default()
176        })
177    }
178
179    /// Returns false if the given `sst` is rejected by filter defined by `option`.
180    /// Otherwise returns true.
181    fn filter_level_by_option(&self, level: &Level) -> bool {
182        let mut hint_sst_ids: HashSet<HummockSstableId> = HashSet::new();
183        hint_sst_ids.extend(self.option.sst_ids.iter());
184        if self
185            .overlap_strategy
186            .check_overlap_with_range(&self.option.key_range, &level.table_infos)
187            .is_empty()
188        {
189            return false;
190        }
191        if !hint_sst_ids.is_empty()
192            && !level
193                .table_infos
194                .iter()
195                .any(|t| hint_sst_ids.contains(&t.sst_id))
196        {
197            return false;
198        }
199        if !self.option.internal_table_id.is_empty()
200            && !level.table_infos.iter().any(|sst_info| {
201                sst_info
202                    .table_ids
203                    .iter()
204                    .any(|t| self.option.internal_table_id.contains(t))
205            })
206        {
207            return false;
208        }
209        true
210    }
211}
212
213impl CompactionPicker for ManualCompactionPicker {
214    fn pick_compaction(
215        &mut self,
216        levels: &Levels,
217        level_handlers: &[LevelHandler],
218        _stats: &mut LocalPickerStatistic,
219    ) -> Option<CompactionInput> {
220        if self.option.level == 0 {
221            if !self.option.sst_ids.is_empty() {
222                return self.pick_l0_to_sub_level(&levels.l0, level_handlers);
223            } else if self.target_level > 0 {
224                return self.pick_l0_to_base_level(levels, level_handlers);
225            } else {
226                return None;
227            }
228        }
229        let mut hint_sst_ids: HashSet<HummockSstableId> = HashSet::new();
230        hint_sst_ids.extend(self.option.sst_ids.iter());
231        let mut range_overlap_info = RangeOverlapInfo::default();
232        range_overlap_info.update(&self.option.key_range);
233        let level = self.option.level;
234        let target_level = self.target_level;
235        assert!(
236            self.option.level == self.target_level || self.option.level + 1 == self.target_level
237        );
238        // We either include all `select_input_ssts` as input, or return None.
239        let mut select_input_ssts: Vec<SstableInfo> = levels
240            .get_level(self.option.level)
241            .table_infos
242            .iter()
243            .filter(|sst_info| hint_sst_ids.is_empty() || hint_sst_ids.contains(&sst_info.sst_id))
244            .filter(|sst_info| range_overlap_info.check_overlap(sst_info))
245            .filter(|sst_info| {
246                if self.option.internal_table_id.is_empty() {
247                    return true;
248                }
249
250                // to filter sst_file by table_id
251                for table_id in &sst_info.table_ids {
252                    if self.option.internal_table_id.contains(table_id) {
253                        return true;
254                    }
255                }
256                false
257            })
258            .cloned()
259            .collect();
260        if select_input_ssts.is_empty() {
261            return None;
262        }
263        let target_input_ssts = if target_level == level {
264            // For intra level compaction, input SSTs must be consecutive.
265            let (left, _) = levels
266                .get_level(level)
267                .table_infos
268                .iter()
269                .find_position(|p| p.sst_id == select_input_ssts.first().unwrap().sst_id)
270                .unwrap();
271            let (right, _) = levels
272                .get_level(level)
273                .table_infos
274                .iter()
275                .find_position(|p| p.sst_id == select_input_ssts.last().unwrap().sst_id)
276                .unwrap();
277            select_input_ssts = levels.get_level(level).table_infos[left..=right].to_vec();
278            vec![]
279        } else {
280            self.overlap_strategy.check_base_level_overlap(
281                &select_input_ssts,
282                &levels.get_level(target_level).table_infos,
283            )
284        };
285        if select_input_ssts
286            .iter()
287            .any(|table| level_handlers[level].is_pending_compact(&table.sst_id))
288        {
289            return None;
290        }
291        if target_input_ssts
292            .iter()
293            .any(|table| level_handlers[target_level].is_pending_compact(&table.sst_id))
294        {
295            return None;
296        }
297
298        Some(CompactionInput {
299            select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
300            target_input_size: target_input_ssts.iter().map(|sst| sst.sst_size).sum(),
301            total_file_count: (select_input_ssts.len() + target_input_ssts.len()) as u64,
302            input_levels: vec![
303                InputLevel {
304                    level_idx: level as u32,
305                    level_type: levels.levels[level - 1].level_type,
306                    table_infos: select_input_ssts,
307                },
308                InputLevel {
309                    level_idx: target_level as u32,
310                    level_type: levels.levels[target_level - 1].level_type,
311                    table_infos: target_input_ssts,
312                },
313            ],
314            target_level,
315            ..Default::default()
316        })
317    }
318}
319
320#[cfg(test)]
321pub mod tests {
322    use std::collections::{BTreeSet, HashMap};
323
324    use bytes::Bytes;
325    use risingwave_hummock_sdk::key_range::KeyRange;
326    use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
327    use risingwave_pb::hummock::compact_task;
328
329    use super::*;
330    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
331    use crate::hummock::compaction::overlap_strategy::RangeOverlapStrategy;
332    use crate::hummock::compaction::selector::tests::{
333        assert_compaction_task, generate_l0_nonoverlapping_sublevels,
334        generate_l0_overlapping_sublevels, generate_level, generate_table,
335    };
336    use crate::hummock::compaction::selector::{CompactionSelector, ManualCompactionSelector};
337    use crate::hummock::compaction::{CompactionDeveloperConfig, LocalSelectorStatistic};
338    use crate::hummock::model::CompactionGroup;
339    use crate::hummock::test_utils::{compaction_selector_context, iterator_test_key_of_epoch};
340
341    fn clean_task_state(level_handler: &mut LevelHandler) {
342        for pending_task_id in &level_handler.pending_tasks_ids() {
343            level_handler.remove_task(*pending_task_id);
344        }
345    }
346
347    fn is_l0_to_lbase(compaction_input: &CompactionInput) -> bool {
348        compaction_input
349            .input_levels
350            .iter()
351            .take(compaction_input.input_levels.len() - 1)
352            .all(|i| i.level_idx == 0)
353            && compaction_input
354                .input_levels
355                .iter()
356                .last()
357                .unwrap()
358                .level_idx as usize
359                == compaction_input.target_level
360            && compaction_input.target_level > 0
361    }
362
363    fn is_l0_to_l0(compaction_input: &CompactionInput) -> bool {
364        compaction_input
365            .input_levels
366            .iter()
367            .all(|i| i.level_idx == 0)
368            && compaction_input.target_level == 0
369    }
370
371    #[test]
372    fn test_manual_compaction_picker() {
373        let levels = vec![
374            Level {
375                level_idx: 1,
376                level_type: LevelType::Nonoverlapping,
377                table_infos: vec![
378                    generate_table(0, 1, 0, 100, 1),
379                    generate_table(1, 1, 101, 200, 1),
380                    generate_table(2, 1, 222, 300, 1),
381                ],
382                ..Default::default()
383            },
384            Level {
385                level_idx: 2,
386                level_type: LevelType::Nonoverlapping,
387                table_infos: vec![
388                    generate_table(4, 1, 0, 100, 1),
389                    generate_table(5, 1, 101, 150, 1),
390                    generate_table(6, 1, 151, 201, 1),
391                    generate_table(7, 1, 501, 800, 1),
392                    generate_table(8, 2, 301, 400, 1),
393                ],
394                ..Default::default()
395            },
396        ];
397        let mut levels = Levels {
398            levels,
399            l0: generate_l0_nonoverlapping_sublevels(vec![]),
400            ..Default::default()
401        };
402        let mut levels_handler = vec![
403            LevelHandler::new(0),
404            LevelHandler::new(1),
405            LevelHandler::new(2),
406        ];
407        let mut local_stats = LocalPickerStatistic::default();
408
409        {
410            // test key_range option
411            let option = ManualCompactionOption {
412                level: 1,
413                key_range: KeyRange {
414                    left: Bytes::from(iterator_test_key_of_epoch(1, 0, 1)),
415                    right: Bytes::from(iterator_test_key_of_epoch(1, 201, 1)),
416                    right_exclusive: false,
417                },
418                ..Default::default()
419            };
420
421            let target_level = option.level + 1;
422            let mut picker = ManualCompactionPicker::new(
423                Arc::new(RangeOverlapStrategy::default()),
424                option,
425                target_level,
426            );
427            let result = picker
428                .pick_compaction(&levels, &levels_handler, &mut local_stats)
429                .unwrap();
430            result.add_pending_task(0, &mut levels_handler);
431
432            assert_eq!(2, result.input_levels[0].table_infos.len());
433            assert_eq!(3, result.input_levels[1].table_infos.len());
434        }
435
436        {
437            clean_task_state(&mut levels_handler[1]);
438            clean_task_state(&mut levels_handler[2]);
439
440            // test all key range
441            let option = ManualCompactionOption::default();
442            let target_level = option.level + 1;
443            let mut picker = ManualCompactionPicker::new(
444                Arc::new(RangeOverlapStrategy::default()),
445                option,
446                target_level,
447            );
448            let result = picker
449                .pick_compaction(&levels, &levels_handler, &mut local_stats)
450                .unwrap();
451            result.add_pending_task(0, &mut levels_handler);
452
453            assert_eq!(3, result.input_levels[0].table_infos.len());
454            assert_eq!(3, result.input_levels[1].table_infos.len());
455        }
456
457        {
458            clean_task_state(&mut levels_handler[1]);
459            clean_task_state(&mut levels_handler[2]);
460
461            let level_table_info = &mut levels.levels[0].table_infos;
462            let table_info_1 = &mut level_table_info[1];
463            let mut t_inner = table_info_1.get_inner();
464            t_inner.table_ids.resize(2, 0.into());
465            t_inner.table_ids[0] = 1.into();
466            t_inner.table_ids[1] = 2.into();
467            *table_info_1 = t_inner.into();
468
469            // test internal_table_id
470            let option = ManualCompactionOption {
471                level: 1,
472                internal_table_id: HashSet::from([2.into()]),
473                ..Default::default()
474            };
475
476            let target_level = option.level + 1;
477            let mut picker = ManualCompactionPicker::new(
478                Arc::new(RangeOverlapStrategy::default()),
479                option,
480                target_level,
481            );
482
483            let result = picker
484                .pick_compaction(&levels, &levels_handler, &mut local_stats)
485                .unwrap();
486            result.add_pending_task(0, &mut levels_handler);
487
488            assert_eq!(1, result.input_levels[0].table_infos.len());
489            assert_eq!(2, result.input_levels[1].table_infos.len());
490        }
491
492        {
493            clean_task_state(&mut levels_handler[1]);
494            clean_task_state(&mut levels_handler[2]);
495
496            // include all table_info
497            let level_table_info = &mut levels.levels[0].table_infos;
498            for table_info in level_table_info {
499                let mut t_inner = table_info.get_inner();
500                t_inner.table_ids.resize(2, 0.into());
501                t_inner.table_ids[0] = 1.into();
502                t_inner.table_ids[1] = 2.into();
503                *table_info = t_inner.into();
504            }
505
506            // test key range filter first
507            let option = ManualCompactionOption {
508                sst_ids: vec![],
509                level: 1,
510                key_range: KeyRange {
511                    left: Bytes::from(iterator_test_key_of_epoch(1, 101, 1)),
512                    right: Bytes::from(iterator_test_key_of_epoch(1, 199, 1)),
513                    right_exclusive: false,
514                },
515                internal_table_id: HashSet::from([2.into()]),
516            };
517
518            let target_level = option.level + 1;
519            let mut picker = ManualCompactionPicker::new(
520                Arc::new(RangeOverlapStrategy::default()),
521                option,
522                target_level,
523            );
524
525            let result = picker
526                .pick_compaction(&levels, &levels_handler, &mut local_stats)
527                .unwrap();
528
529            assert_eq!(1, result.input_levels[0].table_infos.len());
530            assert_eq!(2, result.input_levels[1].table_infos.len());
531        }
532    }
533
534    fn generate_test_levels() -> (Levels, Vec<LevelHandler>) {
535        let mut l0 = generate_l0_overlapping_sublevels(vec![
536            vec![
537                generate_table(5, 1, 0, 500, 2),
538                generate_table(6, 2, 600, 1000, 2),
539            ],
540            vec![
541                generate_table(7, 1, 0, 500, 3),
542                generate_table(8, 2, 600, 1000, 3),
543            ],
544            vec![
545                generate_table(9, 1, 300, 500, 4),
546                generate_table(10, 2, 600, 1000, 4),
547            ],
548        ]);
549        // Set a nonoverlapping sub_level.
550        l0.sub_levels[1].level_type = LevelType::Nonoverlapping as _;
551        assert_eq!(l0.sub_levels.len(), 3);
552        let mut levels = vec![
553            Level {
554                level_idx: 1,
555                level_type: LevelType::Nonoverlapping,
556                table_infos: vec![
557                    generate_table(3, 1, 0, 100, 1),
558                    generate_table(4, 2, 2000, 3000, 1),
559                ],
560                ..Default::default()
561            },
562            Level {
563                level_idx: 2,
564                level_type: LevelType::Nonoverlapping,
565                table_infos: vec![
566                    generate_table(1, 1, 0, 100, 1),
567                    generate_table(2, 2, 2000, 3000, 1),
568                ],
569                ..Default::default()
570            },
571        ];
572        // Set internal_table_ids.
573        assert_eq!(levels.len(), 2);
574        for iter in [l0.sub_levels.iter_mut(), levels.iter_mut()] {
575            for (idx, l) in iter.enumerate() {
576                for t in &mut l.table_infos {
577                    let mut t_inner = t.get_inner();
578                    t_inner.table_ids.clear();
579                    if idx == 0 {
580                        t_inner
581                            .table_ids
582                            .push((((t.sst_id.inner() % 2) + 1) as u32).into());
583                    } else {
584                        t_inner.table_ids.push(3.into());
585                    }
586                    *t = t_inner.into();
587                }
588            }
589        }
590        let levels = Levels {
591            levels,
592            l0,
593            ..Default::default()
594        };
595
596        let levels_handler = vec![
597            LevelHandler::new(0),
598            LevelHandler::new(1),
599            LevelHandler::new(2),
600        ];
601        (levels, levels_handler)
602    }
603
604    fn generate_intra_test_levels() -> (Levels, Vec<LevelHandler>) {
605        let l0 = generate_l0_overlapping_sublevels(vec![]);
606        let levels = vec![Level {
607            level_idx: 1,
608            level_type: LevelType::Nonoverlapping,
609            table_infos: vec![
610                generate_table(1, 1, 0, 100, 1),
611                generate_table(2, 2, 100, 200, 1),
612                generate_table(3, 2, 200, 300, 1),
613                generate_table(4, 2, 300, 400, 1),
614            ],
615            ..Default::default()
616        }];
617        let levels = Levels {
618            levels,
619            l0,
620            ..Default::default()
621        };
622
623        let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
624        (levels, levels_handler)
625    }
626
627    #[test]
628    fn test_l0_empty() {
629        let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
630        let levels = vec![Level {
631            level_idx: 1,
632            level_type: LevelType::Nonoverlapping,
633            table_infos: vec![],
634            total_file_size: 0,
635            sub_level_id: 0,
636            uncompressed_file_size: 0,
637            ..Default::default()
638        }];
639        let levels = Levels {
640            levels,
641            l0,
642            ..Default::default()
643        };
644        let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
645        let option = ManualCompactionOption {
646            sst_ids: vec![1.into()],
647            level: 0,
648            key_range: KeyRange {
649                left: Bytes::default(),
650                right: Bytes::default(),
651                right_exclusive: false,
652            },
653            internal_table_id: HashSet::default(),
654        };
655        let mut picker =
656            ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 0);
657        assert!(
658            picker
659                .pick_compaction(
660                    &levels,
661                    &levels_handler,
662                    &mut LocalPickerStatistic::default()
663                )
664                .is_none()
665        );
666    }
667
668    #[test]
669    fn test_l0_basic() {
670        let (levels, levels_handler) = generate_test_levels();
671
672        // target_level == 0, None
673        let option = ManualCompactionOption {
674            sst_ids: vec![],
675            level: 0,
676            key_range: KeyRange {
677                left: Bytes::default(),
678                right: Bytes::default(),
679                right_exclusive: false,
680            },
681            internal_table_id: HashSet::default(),
682        };
683        let mut picker = ManualCompactionPicker::new(
684            Arc::new(RangeOverlapStrategy::default()),
685            option.clone(),
686            0,
687        );
688        let mut local_stats = LocalPickerStatistic::default();
689        assert!(
690            picker
691                .pick_compaction(&levels, &levels_handler, &mut local_stats)
692                .is_none()
693        );
694
695        // pick_l0_to_base_level
696        let mut picker =
697            ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 1);
698        let mut expected = [vec![5, 6], vec![7, 8], vec![9, 10]];
699        expected.reverse();
700        let result = picker
701            .pick_compaction(&levels, &levels_handler, &mut local_stats)
702            .unwrap();
703        assert_eq!(result.input_levels.len(), 4);
704        assert!(is_l0_to_lbase(&result));
705        assert_eq!(result.target_level, 1);
706        for (l, e) in expected.iter().enumerate().take(3) {
707            assert_eq!(
708                result.input_levels[l]
709                    .table_infos
710                    .iter()
711                    .map(|s| s.sst_id)
712                    .collect_vec(),
713                *e
714            );
715        }
716        assert_eq!(
717            result.input_levels[3].table_infos,
718            vec![levels.levels[0].table_infos[0].clone()]
719        );
720
721        // pick_l0_to_base_level, filtered by key_range
722        let option = ManualCompactionOption {
723            sst_ids: vec![],
724            level: 0,
725            key_range: KeyRange {
726                left: Bytes::from(iterator_test_key_of_epoch(1, 0, 2)),
727                right: Bytes::from(iterator_test_key_of_epoch(1, 200, 2)),
728                right_exclusive: false,
729            },
730            internal_table_id: HashSet::default(),
731        };
732        let mut picker =
733            ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 1);
734        let mut expected = [vec![5, 6], vec![7, 8]];
735        expected.reverse();
736        let result = picker
737            .pick_compaction(&levels, &levels_handler, &mut local_stats)
738            .unwrap();
739        assert_eq!(result.input_levels.len(), 3);
740        assert!(is_l0_to_lbase(&result));
741        assert_eq!(result.target_level, 1);
742        for (l, e) in expected.iter().enumerate().take(2) {
743            assert_eq!(
744                result.input_levels[l]
745                    .table_infos
746                    .iter()
747                    .map(|s| s.sst_id)
748                    .collect_vec(),
749                *e
750            );
751        }
752        assert_eq!(
753            result.input_levels[2].table_infos,
754            vec![levels.levels[0].table_infos[0].clone()]
755        );
756    }
757
758    #[test]
759    fn test_l0_to_l0_option_sst_ids() {
760        let (levels, levels_handler) = generate_test_levels();
761        // (input_level, sst_id_filter, expected_result_input_level_ssts)
762        let sst_id_filters = vec![
763            (0, vec![6], vec![vec![5, 6]]),
764            (0, vec![7], vec![vec![7, 8]]),
765            (0, vec![9], vec![vec![9, 10]]),
766            (0, vec![6, 9], vec![vec![5, 6], vec![7, 8], vec![9, 10]]),
767            (0, vec![8, 9], vec![vec![7, 8], vec![9, 10]]),
768            (0, vec![6, 8, 9], vec![vec![5, 6], vec![7, 8], vec![9, 10]]),
769        ];
770        let mut local_stats = LocalPickerStatistic::default();
771        for (input_level, sst_id_filter, expected) in &sst_id_filters {
772            let expected = expected.iter().rev().cloned().collect_vec();
773            let option = ManualCompactionOption {
774                sst_ids: sst_id_filter.iter().cloned().map(Into::into).collect(),
775                level: *input_level as _,
776                key_range: KeyRange {
777                    left: Bytes::default(),
778                    right: Bytes::default(),
779                    right_exclusive: false,
780                },
781                internal_table_id: HashSet::default(),
782            };
783            let mut picker = ManualCompactionPicker::new(
784                Arc::new(RangeOverlapStrategy::default()),
785                option.clone(),
786                // l0 to l0 will ignore target_level
787                input_level + 1,
788            );
789            let result = picker
790                .pick_compaction(&levels, &levels_handler, &mut local_stats)
791                .unwrap();
792            assert!(is_l0_to_l0(&result));
793            assert_eq!(result.input_levels.len(), expected.len());
794            for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
795                assert_eq!(
796                    result.input_levels[i]
797                        .table_infos
798                        .iter()
799                        .map(|s| s.sst_id)
800                        .collect_vec(),
801                    *e
802                );
803            }
804        }
805    }
806
807    #[test]
808    fn test_l0_to_lbase_option_internal_table() {
809        let (levels, mut levels_handler) = generate_test_levels();
810        let input_level = 0;
811        let target_level = input_level + 1;
812        let mut local_stats = LocalPickerStatistic::default();
813        {
814            let option = ManualCompactionOption {
815                sst_ids: vec![],
816                level: input_level,
817                key_range: KeyRange {
818                    left: Bytes::default(),
819                    right: Bytes::default(),
820                    right_exclusive: false,
821                },
822                // No matching internal table id.
823                internal_table_id: HashSet::from([100.into()]),
824            };
825            let mut picker = ManualCompactionPicker::new(
826                Arc::new(RangeOverlapStrategy::default()),
827                option,
828                target_level,
829            );
830            assert!(
831                picker
832                    .pick_compaction(&levels, &levels_handler, &mut local_stats)
833                    .is_none()
834            )
835        }
836
837        {
838            let option = ManualCompactionOption {
839                sst_ids: vec![],
840                level: input_level,
841                key_range: KeyRange {
842                    left: Bytes::default(),
843                    right: Bytes::default(),
844                    right_exclusive: false,
845                },
846                // Include all sub level's table ids
847                internal_table_id: HashSet::from([1.into(), 2.into(), 3.into()]),
848            };
849            let mut picker = ManualCompactionPicker::new(
850                Arc::new(RangeOverlapStrategy::default()),
851                option,
852                target_level,
853            );
854            let result = picker
855                .pick_compaction(&levels, &levels_handler, &mut local_stats)
856                .unwrap();
857            assert_eq!(result.input_levels.len(), 4);
858            assert!(is_l0_to_lbase(&result));
859            assert_eq!(result.target_level, 1);
860            assert!(is_l0_to_lbase(&result));
861            assert_eq!(
862                result
863                    .input_levels
864                    .iter()
865                    .take(3)
866                    .flat_map(|s| s.table_infos.clone())
867                    .map(|s| s.sst_id)
868                    .collect_vec(),
869                vec![9, 10, 7, 8, 5, 6]
870            );
871            assert_eq!(
872                result.input_levels[3]
873                    .table_infos
874                    .iter()
875                    .map(|s| s.sst_id)
876                    .collect_vec(),
877                vec![3]
878            );
879        }
880
881        {
882            let option = ManualCompactionOption {
883                sst_ids: vec![],
884                level: input_level,
885                key_range: KeyRange {
886                    left: Bytes::default(),
887                    right: Bytes::default(),
888                    right_exclusive: false,
889                },
890                // Only include bottom sub level's table id
891                internal_table_id: HashSet::from([3.into()]),
892            };
893            let mut picker = ManualCompactionPicker::new(
894                Arc::new(RangeOverlapStrategy::default()),
895                option,
896                target_level,
897            );
898            let result = picker
899                .pick_compaction(&levels, &levels_handler, &mut local_stats)
900                .unwrap();
901            assert_eq!(result.input_levels.len(), 4);
902            assert!(is_l0_to_lbase(&result));
903            assert_eq!(
904                result
905                    .input_levels
906                    .iter()
907                    .take(3)
908                    .flat_map(|s| s.table_infos.clone())
909                    .map(|s| s.sst_id)
910                    .collect_vec(),
911                vec![9, 10, 7, 8, 5, 6]
912            );
913            assert_eq!(
914                result.input_levels[3]
915                    .table_infos
916                    .iter()
917                    .map(|s| s.sst_id)
918                    .collect_vec(),
919                vec![3]
920            );
921            assert_eq!(result.target_level, 1);
922        }
923
924        {
925            let option = ManualCompactionOption {
926                sst_ids: vec![],
927                level: input_level,
928                key_range: KeyRange {
929                    left: Bytes::default(),
930                    right: Bytes::default(),
931                    right_exclusive: false,
932                },
933                // Only include partial top sub level's table id, but the whole top sub level is
934                // picked.
935                internal_table_id: HashSet::from([1.into()]),
936            };
937            let mut picker = ManualCompactionPicker::new(
938                Arc::new(RangeOverlapStrategy::default()),
939                option,
940                target_level,
941            );
942            let result = picker
943                .pick_compaction(&levels, &levels_handler, &mut local_stats)
944                .unwrap();
945            result.add_pending_task(0, &mut levels_handler);
946            assert_eq!(result.input_levels.len(), 2);
947            assert!(is_l0_to_lbase(&result));
948            assert_eq!(result.target_level, 1);
949            assert_eq!(
950                result
951                    .input_levels
952                    .iter()
953                    .take(1)
954                    .flat_map(|s| s.table_infos.clone())
955                    .map(|s| s.sst_id)
956                    .collect_vec(),
957                vec![5, 6]
958            );
959            assert_eq!(
960                result.input_levels[1]
961                    .table_infos
962                    .iter()
963                    .map(|s| s.sst_id)
964                    .collect_vec(),
965                vec![3]
966            );
967
968            // Pick bottom sub level while top sub level is pending
969            let option = ManualCompactionOption {
970                sst_ids: vec![],
971                level: input_level,
972                key_range: KeyRange {
973                    left: Bytes::default(),
974                    right: Bytes::default(),
975                    right_exclusive: false,
976                },
977                // Only include bottom sub level's table id
978                internal_table_id: HashSet::from([3.into()]),
979            };
980            let mut picker = ManualCompactionPicker::new(
981                Arc::new(RangeOverlapStrategy::default()),
982                option,
983                target_level,
984            );
985            // Because top sub-level is pending.
986            assert!(
987                picker
988                    .pick_compaction(&levels, &levels_handler, &mut local_stats)
989                    .is_none()
990            );
991
992            clean_task_state(&mut levels_handler[0]);
993            clean_task_state(&mut levels_handler[1]);
994        }
995    }
996
997    #[test]
998    fn test_ln_to_lnext_option_internal_table() {
999        let (levels, levels_handler) = generate_test_levels();
1000        let input_level = 1;
1001        let target_level = input_level + 1;
1002        let mut local_stats = LocalPickerStatistic::default();
1003        {
1004            let option = ManualCompactionOption {
1005                sst_ids: vec![],
1006                level: input_level,
1007                key_range: KeyRange {
1008                    left: Bytes::default(),
1009                    right: Bytes::default(),
1010                    right_exclusive: false,
1011                },
1012                // No matching internal table id.
1013                internal_table_id: HashSet::from([100.into()]),
1014            };
1015            let mut picker = ManualCompactionPicker::new(
1016                Arc::new(RangeOverlapStrategy::default()),
1017                option,
1018                target_level,
1019            );
1020            assert!(
1021                picker
1022                    .pick_compaction(&levels, &levels_handler, &mut local_stats)
1023                    .is_none()
1024            )
1025        }
1026
1027        {
1028            let expected_input_level_sst_ids = [vec![4], vec![2]];
1029            let option = ManualCompactionOption {
1030                sst_ids: vec![],
1031                level: input_level,
1032                key_range: KeyRange {
1033                    left: Bytes::default(),
1034                    right: Bytes::default(),
1035                    right_exclusive: false,
1036                },
1037                // Only include partial input level's table id
1038                internal_table_id: HashSet::from([1.into()]),
1039            };
1040            let mut picker = ManualCompactionPicker::new(
1041                Arc::new(RangeOverlapStrategy::default()),
1042                option,
1043                target_level,
1044            );
1045            let result = picker
1046                .pick_compaction(&levels, &levels_handler, &mut local_stats)
1047                .unwrap();
1048            assert_eq!(
1049                result.input_levels.len(),
1050                expected_input_level_sst_ids.len()
1051            );
1052            assert_eq!(result.target_level, target_level);
1053            for (l, e) in expected_input_level_sst_ids
1054                .iter()
1055                .enumerate()
1056                .take(result.input_levels.len())
1057            {
1058                assert_eq!(
1059                    result.input_levels[l]
1060                        .table_infos
1061                        .iter()
1062                        .map(|s| s.sst_id)
1063                        .collect_vec(),
1064                    *e
1065                );
1066            }
1067        }
1068    }
1069
1070    #[test]
1071    fn test_ln_to_lnext_option_sst_ids() {
1072        let (levels, levels_handler) = generate_test_levels();
1073        // (input_level, sst_id_filter, expected_result_input_level_ssts)
1074        let sst_id_filters = vec![
1075            (1, vec![3], vec![vec![3], vec![1]]),
1076            (1, vec![4], vec![vec![4], vec![2]]),
1077            (1, vec![3, 4], vec![vec![3, 4], vec![1, 2]]),
1078        ];
1079        let mut local_stats = LocalPickerStatistic::default();
1080        for (input_level, sst_id_filter, expected) in &sst_id_filters {
1081            let option = ManualCompactionOption {
1082                sst_ids: sst_id_filter.iter().cloned().map(Into::into).collect(),
1083                level: *input_level as _,
1084                key_range: KeyRange {
1085                    left: Bytes::default(),
1086                    right: Bytes::default(),
1087                    right_exclusive: false,
1088                },
1089                internal_table_id: HashSet::default(),
1090            };
1091            let mut picker = ManualCompactionPicker::new(
1092                Arc::new(RangeOverlapStrategy::default()),
1093                option.clone(),
1094                input_level + 1,
1095            );
1096            let result = picker
1097                .pick_compaction(&levels, &levels_handler, &mut local_stats)
1098                .unwrap();
1099            assert_eq!(result.input_levels.len(), expected.len());
1100            for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
1101                assert_eq!(
1102                    result.input_levels[i]
1103                        .table_infos
1104                        .iter()
1105                        .map(|s| s.sst_id)
1106                        .collect_vec(),
1107                    *e
1108                );
1109            }
1110        }
1111    }
1112
1113    #[test]
1114    fn test_ln_to_ln() {
1115        let (levels, levels_handler) = generate_intra_test_levels();
1116        // (input_level, sst_id_filter, expected_result_input_level_ssts)
1117        let sst_id_filters = vec![
1118            (1, vec![1], vec![vec![1], vec![]]),
1119            (1, vec![3], vec![vec![3], vec![]]),
1120            (1, vec![4], vec![vec![4], vec![]]),
1121            (1, vec![3, 4], vec![vec![3, 4], vec![]]),
1122            (1, vec![1, 4], vec![vec![1, 2, 3, 4], vec![]]),
1123            (1, vec![2, 4], vec![vec![2, 3, 4], vec![]]),
1124            (1, vec![1, 3], vec![vec![1, 2, 3], vec![]]),
1125        ];
1126        for (input_level, sst_id_filter, expected) in &sst_id_filters {
1127            let option = ManualCompactionOption {
1128                sst_ids: sst_id_filter.iter().cloned().map(Into::into).collect(),
1129                level: *input_level as _,
1130                key_range: KeyRange {
1131                    left: Bytes::default(),
1132                    right: Bytes::default(),
1133                    right_exclusive: false,
1134                },
1135                internal_table_id: HashSet::default(),
1136            };
1137            let mut picker = ManualCompactionPicker::new(
1138                Arc::new(RangeOverlapStrategy::default()),
1139                option.clone(),
1140                *input_level as _,
1141            );
1142            let result = picker
1143                .pick_compaction(
1144                    &levels,
1145                    &levels_handler,
1146                    &mut LocalPickerStatistic::default(),
1147                )
1148                .unwrap();
1149            assert_eq!(result.input_levels.len(), expected.len());
1150            for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
1151                assert_eq!(
1152                    result.input_levels[i]
1153                        .table_infos
1154                        .iter()
1155                        .map(|s| s.sst_id)
1156                        .collect_vec(),
1157                    *e
1158                );
1159            }
1160        }
1161    }
1162
1163    #[test]
1164    fn test_manual_compaction_selector_l0() {
1165        let config = CompactionConfigBuilder::new().max_level(4).build();
1166        let group_config = CompactionGroup::new(1, config);
1167        let l0 = generate_l0_nonoverlapping_sublevels(vec![
1168            generate_table(0, 1, 0, 500, 1),
1169            generate_table(1, 1, 0, 500, 1),
1170        ]);
1171        assert_eq!(l0.sub_levels.len(), 2);
1172        let levels = vec![
1173            generate_level(1, vec![]),
1174            generate_level(2, vec![]),
1175            generate_level(3, vec![]),
1176            Level {
1177                level_idx: 4,
1178                level_type: LevelType::Nonoverlapping,
1179                table_infos: vec![
1180                    generate_table(2, 1, 0, 100, 1),
1181                    generate_table(3, 1, 101, 200, 1),
1182                    generate_table(4, 1, 222, 300, 1),
1183                ],
1184                ..Default::default()
1185            },
1186        ];
1187        assert_eq!(levels.len(), 4);
1188        let levels = Levels {
1189            levels,
1190            l0,
1191            ..Default::default()
1192        };
1193        let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
1194        let mut local_stats = LocalSelectorStatistic::default();
1195
1196        // pick_l0_to_sub_level
1197        {
1198            let option = ManualCompactionOption {
1199                sst_ids: [0, 1].iter().cloned().map(Into::into).collect(),
1200                key_range: KeyRange {
1201                    left: Bytes::default(),
1202                    right: Bytes::default(),
1203                    right_exclusive: false,
1204                },
1205                internal_table_id: HashSet::default(),
1206                level: 0,
1207            };
1208            let mut selector = ManualCompactionSelector::new(option);
1209            let task = selector
1210                .pick_compaction(
1211                    1,
1212                    compaction_selector_context(
1213                        &group_config,
1214                        &levels,
1215                        &BTreeSet::new(),
1216                        &mut levels_handler,
1217                        &mut local_stats,
1218                        &HashMap::default(),
1219                        Arc::new(CompactionDeveloperConfig::default()),
1220                        &Default::default(),
1221                        &HummockVersionStateTableInfo::empty(),
1222                    ),
1223                )
1224                .unwrap();
1225            assert_compaction_task(&task, &levels_handler);
1226            assert_eq!(task.input.input_levels.len(), 2);
1227            assert_eq!(task.input.input_levels[0].level_idx, 0);
1228            assert_eq!(task.input.input_levels[1].level_idx, 0);
1229            assert_eq!(task.input.target_level, 0);
1230        }
1231
1232        for level_handler in &mut levels_handler {
1233            for pending_task_id in &level_handler.pending_tasks_ids() {
1234                level_handler.remove_task(*pending_task_id);
1235            }
1236        }
1237
1238        // pick_l0_to_base_level
1239        {
1240            let option = ManualCompactionOption {
1241                sst_ids: vec![],
1242                key_range: KeyRange {
1243                    left: Bytes::default(),
1244                    right: Bytes::default(),
1245                    right_exclusive: false,
1246                },
1247                internal_table_id: HashSet::default(),
1248                level: 0,
1249            };
1250            let mut selector = ManualCompactionSelector::new(option);
1251            let task = selector
1252                .pick_compaction(
1253                    2,
1254                    compaction_selector_context(
1255                        &group_config,
1256                        &levels,
1257                        &BTreeSet::new(),
1258                        &mut levels_handler,
1259                        &mut local_stats,
1260                        &HashMap::default(),
1261                        Arc::new(CompactionDeveloperConfig::default()),
1262                        &Default::default(),
1263                        &HummockVersionStateTableInfo::empty(),
1264                    ),
1265                )
1266                .unwrap();
1267            assert_compaction_task(&task, &levels_handler);
1268            assert_eq!(task.input.input_levels.len(), 3);
1269            assert_eq!(task.input.input_levels[0].level_idx, 0);
1270            assert_eq!(task.input.input_levels[1].level_idx, 0);
1271            assert_eq!(task.input.input_levels[2].level_idx, 4);
1272            assert_eq!(task.input.target_level, 4);
1273        }
1274    }
1275
1276    /// tests `DynamicLevelSelector::manual_pick_compaction`
1277    #[test]
1278    fn test_manual_compaction_selector() {
1279        let config = CompactionConfigBuilder::new().max_level(4).build();
1280        let group_config = CompactionGroup::new(1, config);
1281        let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
1282        assert_eq!(l0.sub_levels.len(), 0);
1283        let levels = vec![
1284            generate_level(1, vec![]),
1285            generate_level(2, vec![]),
1286            generate_level(
1287                3,
1288                vec![
1289                    generate_table(0, 1, 150, 151, 1),
1290                    generate_table(1, 1, 250, 251, 1),
1291                ],
1292            ),
1293            Level {
1294                level_idx: 4,
1295                level_type: LevelType::Nonoverlapping,
1296                table_infos: vec![
1297                    generate_table(2, 1, 0, 100, 1),
1298                    generate_table(3, 1, 101, 200, 1),
1299                    generate_table(4, 1, 222, 300, 1),
1300                    generate_table(5, 1, 333, 400, 1),
1301                    generate_table(6, 1, 444, 500, 1),
1302                    generate_table(7, 1, 555, 600, 1),
1303                ],
1304                ..Default::default()
1305            },
1306        ];
1307        assert_eq!(levels.len(), 4);
1308        let levels = Levels {
1309            levels,
1310            l0,
1311            ..Default::default()
1312        };
1313        let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
1314        let mut local_stats = LocalSelectorStatistic::default();
1315
1316        // pick l3 -> l4
1317        {
1318            let option = ManualCompactionOption {
1319                sst_ids: [0, 1].iter().cloned().map(Into::into).collect(),
1320                key_range: KeyRange {
1321                    left: Bytes::default(),
1322                    right: Bytes::default(),
1323                    right_exclusive: false,
1324                },
1325                internal_table_id: HashSet::default(),
1326                level: 3,
1327            };
1328            let mut selector = ManualCompactionSelector::new(option);
1329            let task = selector
1330                .pick_compaction(
1331                    1,
1332                    compaction_selector_context(
1333                        &group_config,
1334                        &levels,
1335                        &BTreeSet::new(),
1336                        &mut levels_handler,
1337                        &mut local_stats,
1338                        &HashMap::default(),
1339                        Arc::new(CompactionDeveloperConfig::default()),
1340                        &Default::default(),
1341                        &HummockVersionStateTableInfo::empty(),
1342                    ),
1343                )
1344                .unwrap();
1345            assert_compaction_task(&task, &levels_handler);
1346            assert_eq!(task.input.input_levels.len(), 2);
1347            assert_eq!(task.input.input_levels[0].level_idx, 3);
1348            assert_eq!(task.input.input_levels[0].table_infos.len(), 2);
1349            assert_eq!(task.input.input_levels[1].level_idx, 4);
1350            assert_eq!(task.input.input_levels[1].table_infos.len(), 2);
1351            assert_eq!(task.input.target_level, 4);
1352        }
1353
1354        for level_handler in &mut levels_handler {
1355            for pending_task_id in &level_handler.pending_tasks_ids() {
1356                level_handler.remove_task(*pending_task_id);
1357            }
1358        }
1359
1360        // pick l4 -> l4
1361        {
1362            let option = ManualCompactionOption {
1363                sst_ids: vec![],
1364                key_range: KeyRange {
1365                    left: Bytes::default(),
1366                    right: Bytes::default(),
1367                    right_exclusive: false,
1368                },
1369                internal_table_id: HashSet::default(),
1370                level: 4,
1371            };
1372            let mut selector = ManualCompactionSelector::new(option);
1373            let task = selector
1374                .pick_compaction(
1375                    1,
1376                    compaction_selector_context(
1377                        &group_config,
1378                        &levels,
1379                        &BTreeSet::new(),
1380                        &mut levels_handler,
1381                        &mut local_stats,
1382                        &HashMap::default(),
1383                        Arc::new(CompactionDeveloperConfig::default()),
1384                        &Default::default(),
1385                        &HummockVersionStateTableInfo::empty(),
1386                    ),
1387                )
1388                .unwrap();
1389            assert_compaction_task(&task, &levels_handler);
1390            assert_eq!(task.input.input_levels.len(), 2);
1391            assert_eq!(task.input.input_levels[0].level_idx, 4);
1392            assert_eq!(task.input.input_levels[0].table_infos.len(), 6);
1393            assert_eq!(task.input.input_levels[1].level_idx, 4);
1394            assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
1395            assert_eq!(task.input.target_level, 4);
1396            assert!(matches!(
1397                task.compaction_task_type,
1398                compact_task::TaskType::Manual
1399            ));
1400        }
1401    }
1402}