risingwave_meta/hummock/compaction/picker/
manual_compaction_picker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_hummock_sdk::HummockSstableId;
20use risingwave_hummock_sdk::level::{InputLevel, Level, Levels, OverlappingLevel};
21use risingwave_hummock_sdk::sstable_info::SstableInfo;
22use risingwave_pb::hummock::LevelType;
23
24use super::{CompactionInput, CompactionPicker, LocalPickerStatistic};
25use crate::hummock::compaction::overlap_strategy::{
26    OverlapInfo, OverlapStrategy, RangeOverlapInfo,
27};
28use crate::hummock::compaction::selector::ManualCompactionOption;
29use crate::hummock::level_handler::LevelHandler;
30
31pub struct ManualCompactionPicker {
32    overlap_strategy: Arc<dyn OverlapStrategy>,
33    option: ManualCompactionOption,
34    target_level: usize,
35}
36
37impl ManualCompactionPicker {
38    pub fn new(
39        overlap_strategy: Arc<dyn OverlapStrategy>,
40        option: ManualCompactionOption,
41        target_level: usize,
42    ) -> Self {
43        Self {
44            overlap_strategy,
45            option,
46            target_level,
47        }
48    }
49
50    fn pick_l0_to_sub_level(
51        &self,
52        l0: &OverlappingLevel,
53        level_handlers: &[LevelHandler],
54    ) -> Option<CompactionInput> {
55        assert_eq!(self.option.level, 0);
56        let mut input_levels = vec![];
57        let mut sub_level_id = 0;
58        let mut start_idx = None;
59        let mut end_idx = None;
60        // Decides the range of sub levels as input.
61        // We need pick consecutive sub_levels. See #5217.
62        for (idx, level) in l0.sub_levels.iter().enumerate() {
63            if !self.filter_level_by_option(level) {
64                continue;
65            }
66            if level_handlers[0].is_level_pending_compact(level) {
67                return None;
68            }
69            // Pick this sub_level.
70            if start_idx.is_none() {
71                sub_level_id = level.sub_level_id;
72                start_idx = Some(idx as u64);
73                end_idx = start_idx;
74            } else {
75                end_idx = Some(idx as u64);
76            }
77        }
78        let (start_idx, end_idx) = match (start_idx, end_idx) {
79            (Some(start_idx), Some(end_idx)) => (start_idx, end_idx),
80            _ => {
81                return None;
82            }
83        };
84        // Construct input.
85        for level in l0
86            .sub_levels
87            .iter()
88            .skip(start_idx as usize)
89            .take((end_idx - start_idx + 1) as usize)
90        {
91            input_levels.push(InputLevel {
92                level_idx: 0,
93                level_type: level.level_type,
94                table_infos: level.table_infos.clone(),
95            });
96        }
97        if input_levels.is_empty() {
98            return None;
99        }
100        input_levels.reverse();
101        Some(CompactionInput {
102            input_levels,
103            target_level: 0,
104            target_sub_level_id: sub_level_id,
105            ..Default::default()
106        })
107    }
108
109    fn pick_l0_to_base_level(
110        &self,
111        levels: &Levels,
112        level_handlers: &[LevelHandler],
113    ) -> Option<CompactionInput> {
114        assert!(self.option.level == 0 && self.target_level > 0);
115        for l in 1..self.target_level {
116            assert!(levels.levels[l - 1].table_infos.is_empty());
117        }
118        let l0 = &levels.l0;
119        let mut input_levels = vec![];
120        let mut max_sub_level_idx = usize::MAX;
121        let mut info = self.overlap_strategy.create_overlap_info();
122        // Decides the range of sub levels as input.
123        for (idx, level) in l0.sub_levels.iter().enumerate() {
124            if !self.filter_level_by_option(level) {
125                continue;
126            }
127            if level_handlers[0].is_level_pending_compact(level) {
128                return None;
129            }
130
131            // Pick this sub_level.
132            max_sub_level_idx = idx;
133        }
134        if max_sub_level_idx == usize::MAX {
135            return None;
136        }
137        // Construct input.
138        for idx in 0..=max_sub_level_idx {
139            for table in &l0.sub_levels[idx].table_infos {
140                info.update(&table.key_range);
141            }
142            input_levels.push(InputLevel {
143                level_idx: 0,
144                level_type: l0.sub_levels[idx].level_type,
145                table_infos: l0.sub_levels[idx].table_infos.clone(),
146            })
147        }
148        let target_input_ssts_range =
149            info.check_multiple_overlap(&levels.levels[self.target_level - 1].table_infos);
150        let target_input_ssts = if target_input_ssts_range.is_empty() {
151            vec![]
152        } else {
153            levels.levels[self.target_level - 1].table_infos[target_input_ssts_range].to_vec()
154        };
155        if target_input_ssts
156            .iter()
157            .any(|table| level_handlers[self.target_level].is_pending_compact(&table.sst_id))
158        {
159            return None;
160        }
161        if input_levels.is_empty() {
162            return None;
163        }
164        input_levels.reverse();
165        input_levels.push(InputLevel {
166            level_idx: self.target_level as u32,
167            level_type: LevelType::Nonoverlapping,
168            table_infos: target_input_ssts,
169        });
170
171        Some(CompactionInput {
172            input_levels,
173            target_level: self.target_level,
174            target_sub_level_id: 0,
175            ..Default::default()
176        })
177    }
178
179    /// Returns false if the given `sst` is rejected by filter defined by `option`.
180    /// Otherwise returns true.
181    fn filter_level_by_option(&self, level: &Level) -> bool {
182        let mut hint_sst_ids: HashSet<HummockSstableId> = HashSet::new();
183        hint_sst_ids.extend(self.option.sst_ids.iter());
184        if self
185            .overlap_strategy
186            .check_overlap_with_range(&self.option.key_range, &level.table_infos)
187            .is_empty()
188        {
189            return false;
190        }
191        if !hint_sst_ids.is_empty()
192            && !level
193                .table_infos
194                .iter()
195                .any(|t| hint_sst_ids.contains(&t.sst_id))
196        {
197            return false;
198        }
199        if !self.option.internal_table_id.is_empty()
200            && !level.table_infos.iter().any(|sst_info| {
201                sst_info
202                    .table_ids
203                    .iter()
204                    .any(|t| self.option.internal_table_id.contains(t))
205            })
206        {
207            return false;
208        }
209        true
210    }
211}
212
213impl CompactionPicker for ManualCompactionPicker {
214    fn pick_compaction(
215        &mut self,
216        levels: &Levels,
217        level_handlers: &[LevelHandler],
218        _stats: &mut LocalPickerStatistic,
219    ) -> Option<CompactionInput> {
220        if self.option.level == 0 {
221            if !self.option.sst_ids.is_empty() {
222                return self.pick_l0_to_sub_level(&levels.l0, level_handlers);
223            } else if self.target_level > 0 {
224                return self.pick_l0_to_base_level(levels, level_handlers);
225            } else {
226                return None;
227            }
228        }
229        let mut hint_sst_ids: HashSet<HummockSstableId> = HashSet::new();
230        hint_sst_ids.extend(self.option.sst_ids.iter());
231        let mut range_overlap_info = RangeOverlapInfo::default();
232        range_overlap_info.update(&self.option.key_range);
233        let level = self.option.level;
234        let target_level = self.target_level;
235        assert!(
236            self.option.level == self.target_level || self.option.level + 1 == self.target_level
237        );
238        // We either include all `select_input_ssts` as input, or return None.
239        let mut select_input_ssts: Vec<SstableInfo> = levels
240            .get_level(self.option.level)
241            .table_infos
242            .iter()
243            .filter(|sst_info| hint_sst_ids.is_empty() || hint_sst_ids.contains(&sst_info.sst_id))
244            .filter(|sst_info| range_overlap_info.check_overlap(sst_info))
245            .filter(|sst_info| {
246                if self.option.internal_table_id.is_empty() {
247                    return true;
248                }
249
250                // to filter sst_file by table_id
251                for table_id in &sst_info.table_ids {
252                    if self.option.internal_table_id.contains(table_id) {
253                        return true;
254                    }
255                }
256                false
257            })
258            .cloned()
259            .collect();
260        if select_input_ssts.is_empty() {
261            return None;
262        }
263        let target_input_ssts = if target_level == level {
264            // For intra level compaction, input SSTs must be consecutive.
265            let (left, _) = levels
266                .get_level(level)
267                .table_infos
268                .iter()
269                .find_position(|p| p.sst_id == select_input_ssts.first().unwrap().sst_id)
270                .unwrap();
271            let (right, _) = levels
272                .get_level(level)
273                .table_infos
274                .iter()
275                .find_position(|p| p.sst_id == select_input_ssts.last().unwrap().sst_id)
276                .unwrap();
277            select_input_ssts = levels.get_level(level).table_infos[left..=right].to_vec();
278            vec![]
279        } else {
280            self.overlap_strategy.check_base_level_overlap(
281                &select_input_ssts,
282                &levels.get_level(target_level).table_infos,
283            )
284        };
285        if select_input_ssts
286            .iter()
287            .any(|table| level_handlers[level].is_pending_compact(&table.sst_id))
288        {
289            return None;
290        }
291        if target_input_ssts
292            .iter()
293            .any(|table| level_handlers[target_level].is_pending_compact(&table.sst_id))
294        {
295            return None;
296        }
297
298        Some(CompactionInput {
299            select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
300            target_input_size: target_input_ssts.iter().map(|sst| sst.sst_size).sum(),
301            total_file_count: (select_input_ssts.len() + target_input_ssts.len()) as u64,
302            input_levels: vec![
303                InputLevel {
304                    level_idx: level as u32,
305                    level_type: levels.levels[level - 1].level_type,
306                    table_infos: select_input_ssts,
307                },
308                InputLevel {
309                    level_idx: target_level as u32,
310                    level_type: levels.levels[target_level - 1].level_type,
311                    table_infos: target_input_ssts,
312                },
313            ],
314            target_level,
315            ..Default::default()
316        })
317    }
318}
319
320#[cfg(test)]
321pub mod tests {
322    use std::collections::{BTreeSet, HashMap};
323
324    use bytes::Bytes;
325    use risingwave_hummock_sdk::key_range::KeyRange;
326    use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
327    use risingwave_pb::hummock::compact_task;
328
329    use super::*;
330    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
331    use crate::hummock::compaction::overlap_strategy::RangeOverlapStrategy;
332    use crate::hummock::compaction::selector::tests::{
333        assert_compaction_task, generate_l0_nonoverlapping_sublevels,
334        generate_l0_overlapping_sublevels, generate_level, generate_table,
335    };
336    use crate::hummock::compaction::selector::{CompactionSelector, ManualCompactionSelector};
337    use crate::hummock::compaction::{CompactionDeveloperConfig, LocalSelectorStatistic};
338    use crate::hummock::model::CompactionGroup;
339    use crate::hummock::test_utils::{compaction_selector_context, iterator_test_key_of_epoch};
340
341    fn clean_task_state(level_handler: &mut LevelHandler) {
342        for pending_task_id in &level_handler.pending_tasks_ids() {
343            level_handler.remove_task(*pending_task_id);
344        }
345    }
346
347    fn is_l0_to_lbase(compaction_input: &CompactionInput) -> bool {
348        compaction_input
349            .input_levels
350            .iter()
351            .take(compaction_input.input_levels.len() - 1)
352            .all(|i| i.level_idx == 0)
353            && compaction_input
354                .input_levels
355                .iter()
356                .last()
357                .unwrap()
358                .level_idx as usize
359                == compaction_input.target_level
360            && compaction_input.target_level > 0
361    }
362
363    fn is_l0_to_l0(compaction_input: &CompactionInput) -> bool {
364        compaction_input
365            .input_levels
366            .iter()
367            .all(|i| i.level_idx == 0)
368            && compaction_input.target_level == 0
369    }
370
371    #[test]
372    fn test_manual_compaction_picker() {
373        let levels = vec![
374            Level {
375                level_idx: 1,
376                level_type: LevelType::Nonoverlapping,
377                table_infos: vec![
378                    generate_table(0, 1, 0, 100, 1),
379                    generate_table(1, 1, 101, 200, 1),
380                    generate_table(2, 1, 222, 300, 1),
381                ],
382                ..Default::default()
383            },
384            Level {
385                level_idx: 2,
386                level_type: LevelType::Nonoverlapping,
387                table_infos: vec![
388                    generate_table(4, 1, 0, 100, 1),
389                    generate_table(5, 1, 101, 150, 1),
390                    generate_table(6, 1, 151, 201, 1),
391                    generate_table(7, 1, 501, 800, 1),
392                    generate_table(8, 2, 301, 400, 1),
393                ],
394                ..Default::default()
395            },
396        ];
397        let mut levels = Levels {
398            levels,
399            l0: generate_l0_nonoverlapping_sublevels(vec![]),
400            ..Default::default()
401        };
402        let mut levels_handler = vec![
403            LevelHandler::new(0),
404            LevelHandler::new(1),
405            LevelHandler::new(2),
406        ];
407        let mut local_stats = LocalPickerStatistic::default();
408
409        {
410            // test key_range option
411            let option = ManualCompactionOption {
412                level: 1,
413                key_range: KeyRange {
414                    left: Bytes::from(iterator_test_key_of_epoch(1, 0, 1)),
415                    right: Bytes::from(iterator_test_key_of_epoch(1, 201, 1)),
416                    right_exclusive: false,
417                },
418                ..Default::default()
419            };
420
421            let target_level = option.level + 1;
422            let mut picker = ManualCompactionPicker::new(
423                Arc::new(RangeOverlapStrategy::default()),
424                option,
425                target_level,
426            );
427            let result = picker
428                .pick_compaction(&levels, &levels_handler, &mut local_stats)
429                .unwrap();
430            result.add_pending_task(0, &mut levels_handler);
431
432            assert_eq!(2, result.input_levels[0].table_infos.len());
433            assert_eq!(3, result.input_levels[1].table_infos.len());
434        }
435
436        {
437            clean_task_state(&mut levels_handler[1]);
438            clean_task_state(&mut levels_handler[2]);
439
440            // test all key range
441            let option = ManualCompactionOption::default();
442            let target_level = option.level + 1;
443            let mut picker = ManualCompactionPicker::new(
444                Arc::new(RangeOverlapStrategy::default()),
445                option,
446                target_level,
447            );
448            let result = picker
449                .pick_compaction(&levels, &levels_handler, &mut local_stats)
450                .unwrap();
451            result.add_pending_task(0, &mut levels_handler);
452
453            assert_eq!(3, result.input_levels[0].table_infos.len());
454            assert_eq!(3, result.input_levels[1].table_infos.len());
455        }
456
457        {
458            clean_task_state(&mut levels_handler[1]);
459            clean_task_state(&mut levels_handler[2]);
460
461            let level_table_info = &mut levels.levels[0].table_infos;
462            let table_info_1 = &mut level_table_info[1];
463            let mut t_inner = table_info_1.get_inner();
464            t_inner.table_ids.resize(2, 0);
465            t_inner.table_ids[0] = 1;
466            t_inner.table_ids[1] = 2;
467            *table_info_1 = t_inner.into();
468
469            // test internal_table_id
470            let option = ManualCompactionOption {
471                level: 1,
472                internal_table_id: HashSet::from([2]),
473                ..Default::default()
474            };
475
476            let target_level = option.level + 1;
477            let mut picker = ManualCompactionPicker::new(
478                Arc::new(RangeOverlapStrategy::default()),
479                option,
480                target_level,
481            );
482
483            let result = picker
484                .pick_compaction(&levels, &levels_handler, &mut local_stats)
485                .unwrap();
486            result.add_pending_task(0, &mut levels_handler);
487
488            assert_eq!(1, result.input_levels[0].table_infos.len());
489            assert_eq!(2, result.input_levels[1].table_infos.len());
490        }
491
492        {
493            clean_task_state(&mut levels_handler[1]);
494            clean_task_state(&mut levels_handler[2]);
495
496            // include all table_info
497            let level_table_info = &mut levels.levels[0].table_infos;
498            for table_info in level_table_info {
499                let mut t_inner = table_info.get_inner();
500                t_inner.table_ids.resize(2, 0);
501                t_inner.table_ids[0] = 1;
502                t_inner.table_ids[1] = 2;
503                *table_info = t_inner.into();
504            }
505
506            // test key range filter first
507            let option = ManualCompactionOption {
508                sst_ids: vec![],
509                level: 1,
510                key_range: KeyRange {
511                    left: Bytes::from(iterator_test_key_of_epoch(1, 101, 1)),
512                    right: Bytes::from(iterator_test_key_of_epoch(1, 199, 1)),
513                    right_exclusive: false,
514                },
515                internal_table_id: HashSet::from([2]),
516            };
517
518            let target_level = option.level + 1;
519            let mut picker = ManualCompactionPicker::new(
520                Arc::new(RangeOverlapStrategy::default()),
521                option,
522                target_level,
523            );
524
525            let result = picker
526                .pick_compaction(&levels, &levels_handler, &mut local_stats)
527                .unwrap();
528
529            assert_eq!(1, result.input_levels[0].table_infos.len());
530            assert_eq!(2, result.input_levels[1].table_infos.len());
531        }
532    }
533
534    fn generate_test_levels() -> (Levels, Vec<LevelHandler>) {
535        let mut l0 = generate_l0_overlapping_sublevels(vec![
536            vec![
537                generate_table(5, 1, 0, 500, 2),
538                generate_table(6, 2, 600, 1000, 2),
539            ],
540            vec![
541                generate_table(7, 1, 0, 500, 3),
542                generate_table(8, 2, 600, 1000, 3),
543            ],
544            vec![
545                generate_table(9, 1, 300, 500, 4),
546                generate_table(10, 2, 600, 1000, 4),
547            ],
548        ]);
549        // Set a nonoverlapping sub_level.
550        l0.sub_levels[1].level_type = LevelType::Nonoverlapping as _;
551        assert_eq!(l0.sub_levels.len(), 3);
552        let mut levels = vec![
553            Level {
554                level_idx: 1,
555                level_type: LevelType::Nonoverlapping,
556                table_infos: vec![
557                    generate_table(3, 1, 0, 100, 1),
558                    generate_table(4, 2, 2000, 3000, 1),
559                ],
560                ..Default::default()
561            },
562            Level {
563                level_idx: 2,
564                level_type: LevelType::Nonoverlapping,
565                table_infos: vec![
566                    generate_table(1, 1, 0, 100, 1),
567                    generate_table(2, 2, 2000, 3000, 1),
568                ],
569                ..Default::default()
570            },
571        ];
572        // Set internal_table_ids.
573        assert_eq!(levels.len(), 2);
574        for iter in [l0.sub_levels.iter_mut(), levels.iter_mut()] {
575            for (idx, l) in iter.enumerate() {
576                for t in &mut l.table_infos {
577                    let mut t_inner = t.get_inner();
578                    t_inner.table_ids.clear();
579                    if idx == 0 {
580                        t_inner.table_ids.push(((t.sst_id.inner() % 2) + 1) as _);
581                    } else {
582                        t_inner.table_ids.push(3);
583                    }
584                    *t = t_inner.into();
585                }
586            }
587        }
588        let levels = Levels {
589            levels,
590            l0,
591            ..Default::default()
592        };
593
594        let levels_handler = vec![
595            LevelHandler::new(0),
596            LevelHandler::new(1),
597            LevelHandler::new(2),
598        ];
599        (levels, levels_handler)
600    }
601
602    fn generate_intra_test_levels() -> (Levels, Vec<LevelHandler>) {
603        let l0 = generate_l0_overlapping_sublevels(vec![]);
604        let levels = vec![Level {
605            level_idx: 1,
606            level_type: LevelType::Nonoverlapping,
607            table_infos: vec![
608                generate_table(1, 1, 0, 100, 1),
609                generate_table(2, 2, 100, 200, 1),
610                generate_table(3, 2, 200, 300, 1),
611                generate_table(4, 2, 300, 400, 1),
612            ],
613            ..Default::default()
614        }];
615        let levels = Levels {
616            levels,
617            l0,
618            ..Default::default()
619        };
620
621        let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
622        (levels, levels_handler)
623    }
624
625    #[test]
626    fn test_l0_empty() {
627        let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
628        let levels = vec![Level {
629            level_idx: 1,
630            level_type: LevelType::Nonoverlapping,
631            table_infos: vec![],
632            total_file_size: 0,
633            sub_level_id: 0,
634            uncompressed_file_size: 0,
635            ..Default::default()
636        }];
637        let levels = Levels {
638            levels,
639            l0,
640            ..Default::default()
641        };
642        let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
643        let option = ManualCompactionOption {
644            sst_ids: vec![1.into()],
645            level: 0,
646            key_range: KeyRange {
647                left: Bytes::default(),
648                right: Bytes::default(),
649                right_exclusive: false,
650            },
651            internal_table_id: HashSet::default(),
652        };
653        let mut picker =
654            ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 0);
655        assert!(
656            picker
657                .pick_compaction(
658                    &levels,
659                    &levels_handler,
660                    &mut LocalPickerStatistic::default()
661                )
662                .is_none()
663        );
664    }
665
666    #[test]
667    fn test_l0_basic() {
668        let (levels, levels_handler) = generate_test_levels();
669
670        // target_level == 0, None
671        let option = ManualCompactionOption {
672            sst_ids: vec![],
673            level: 0,
674            key_range: KeyRange {
675                left: Bytes::default(),
676                right: Bytes::default(),
677                right_exclusive: false,
678            },
679            internal_table_id: HashSet::default(),
680        };
681        let mut picker = ManualCompactionPicker::new(
682            Arc::new(RangeOverlapStrategy::default()),
683            option.clone(),
684            0,
685        );
686        let mut local_stats = LocalPickerStatistic::default();
687        assert!(
688            picker
689                .pick_compaction(&levels, &levels_handler, &mut local_stats)
690                .is_none()
691        );
692
693        // pick_l0_to_base_level
694        let mut picker =
695            ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 1);
696        let mut expected = [vec![5, 6], vec![7, 8], vec![9, 10]];
697        expected.reverse();
698        let result = picker
699            .pick_compaction(&levels, &levels_handler, &mut local_stats)
700            .unwrap();
701        assert_eq!(result.input_levels.len(), 4);
702        assert!(is_l0_to_lbase(&result));
703        assert_eq!(result.target_level, 1);
704        for (l, e) in expected.iter().enumerate().take(3) {
705            assert_eq!(
706                result.input_levels[l]
707                    .table_infos
708                    .iter()
709                    .map(|s| s.sst_id)
710                    .collect_vec(),
711                *e
712            );
713        }
714        assert_eq!(
715            result.input_levels[3].table_infos,
716            vec![levels.levels[0].table_infos[0].clone()]
717        );
718
719        // pick_l0_to_base_level, filtered by key_range
720        let option = ManualCompactionOption {
721            sst_ids: vec![],
722            level: 0,
723            key_range: KeyRange {
724                left: Bytes::from(iterator_test_key_of_epoch(1, 0, 2)),
725                right: Bytes::from(iterator_test_key_of_epoch(1, 200, 2)),
726                right_exclusive: false,
727            },
728            internal_table_id: HashSet::default(),
729        };
730        let mut picker =
731            ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 1);
732        let mut expected = [vec![5, 6], vec![7, 8]];
733        expected.reverse();
734        let result = picker
735            .pick_compaction(&levels, &levels_handler, &mut local_stats)
736            .unwrap();
737        assert_eq!(result.input_levels.len(), 3);
738        assert!(is_l0_to_lbase(&result));
739        assert_eq!(result.target_level, 1);
740        for (l, e) in expected.iter().enumerate().take(2) {
741            assert_eq!(
742                result.input_levels[l]
743                    .table_infos
744                    .iter()
745                    .map(|s| s.sst_id)
746                    .collect_vec(),
747                *e
748            );
749        }
750        assert_eq!(
751            result.input_levels[2].table_infos,
752            vec![levels.levels[0].table_infos[0].clone()]
753        );
754    }
755
756    #[test]
757    fn test_l0_to_l0_option_sst_ids() {
758        let (levels, levels_handler) = generate_test_levels();
759        // (input_level, sst_id_filter, expected_result_input_level_ssts)
760        let sst_id_filters = vec![
761            (0, vec![6], vec![vec![5, 6]]),
762            (0, vec![7], vec![vec![7, 8]]),
763            (0, vec![9], vec![vec![9, 10]]),
764            (0, vec![6, 9], vec![vec![5, 6], vec![7, 8], vec![9, 10]]),
765            (0, vec![8, 9], vec![vec![7, 8], vec![9, 10]]),
766            (0, vec![6, 8, 9], vec![vec![5, 6], vec![7, 8], vec![9, 10]]),
767        ];
768        let mut local_stats = LocalPickerStatistic::default();
769        for (input_level, sst_id_filter, expected) in &sst_id_filters {
770            let expected = expected.iter().rev().cloned().collect_vec();
771            let option = ManualCompactionOption {
772                sst_ids: sst_id_filter.iter().cloned().map(Into::into).collect(),
773                level: *input_level as _,
774                key_range: KeyRange {
775                    left: Bytes::default(),
776                    right: Bytes::default(),
777                    right_exclusive: false,
778                },
779                internal_table_id: HashSet::default(),
780            };
781            let mut picker = ManualCompactionPicker::new(
782                Arc::new(RangeOverlapStrategy::default()),
783                option.clone(),
784                // l0 to l0 will ignore target_level
785                input_level + 1,
786            );
787            let result = picker
788                .pick_compaction(&levels, &levels_handler, &mut local_stats)
789                .unwrap();
790            assert!(is_l0_to_l0(&result));
791            assert_eq!(result.input_levels.len(), expected.len());
792            for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
793                assert_eq!(
794                    result.input_levels[i]
795                        .table_infos
796                        .iter()
797                        .map(|s| s.sst_id)
798                        .collect_vec(),
799                    *e
800                );
801            }
802        }
803    }
804
805    #[test]
806    fn test_l0_to_lbase_option_internal_table() {
807        let (levels, mut levels_handler) = generate_test_levels();
808        let input_level = 0;
809        let target_level = input_level + 1;
810        let mut local_stats = LocalPickerStatistic::default();
811        {
812            let option = ManualCompactionOption {
813                sst_ids: vec![],
814                level: input_level,
815                key_range: KeyRange {
816                    left: Bytes::default(),
817                    right: Bytes::default(),
818                    right_exclusive: false,
819                },
820                // No matching internal table id.
821                internal_table_id: HashSet::from([100]),
822            };
823            let mut picker = ManualCompactionPicker::new(
824                Arc::new(RangeOverlapStrategy::default()),
825                option,
826                target_level,
827            );
828            assert!(
829                picker
830                    .pick_compaction(&levels, &levels_handler, &mut local_stats)
831                    .is_none()
832            )
833        }
834
835        {
836            let option = ManualCompactionOption {
837                sst_ids: vec![],
838                level: input_level,
839                key_range: KeyRange {
840                    left: Bytes::default(),
841                    right: Bytes::default(),
842                    right_exclusive: false,
843                },
844                // Include all sub level's table ids
845                internal_table_id: HashSet::from([1, 2, 3]),
846            };
847            let mut picker = ManualCompactionPicker::new(
848                Arc::new(RangeOverlapStrategy::default()),
849                option,
850                target_level,
851            );
852            let result = picker
853                .pick_compaction(&levels, &levels_handler, &mut local_stats)
854                .unwrap();
855            assert_eq!(result.input_levels.len(), 4);
856            assert!(is_l0_to_lbase(&result));
857            assert_eq!(result.target_level, 1);
858            assert!(is_l0_to_lbase(&result));
859            assert_eq!(
860                result
861                    .input_levels
862                    .iter()
863                    .take(3)
864                    .flat_map(|s| s.table_infos.clone())
865                    .map(|s| s.sst_id)
866                    .collect_vec(),
867                vec![9, 10, 7, 8, 5, 6]
868            );
869            assert_eq!(
870                result.input_levels[3]
871                    .table_infos
872                    .iter()
873                    .map(|s| s.sst_id)
874                    .collect_vec(),
875                vec![3]
876            );
877        }
878
879        {
880            let option = ManualCompactionOption {
881                sst_ids: vec![],
882                level: input_level,
883                key_range: KeyRange {
884                    left: Bytes::default(),
885                    right: Bytes::default(),
886                    right_exclusive: false,
887                },
888                // Only include bottom sub level's table id
889                internal_table_id: HashSet::from([3]),
890            };
891            let mut picker = ManualCompactionPicker::new(
892                Arc::new(RangeOverlapStrategy::default()),
893                option,
894                target_level,
895            );
896            let result = picker
897                .pick_compaction(&levels, &levels_handler, &mut local_stats)
898                .unwrap();
899            assert_eq!(result.input_levels.len(), 4);
900            assert!(is_l0_to_lbase(&result));
901            assert_eq!(
902                result
903                    .input_levels
904                    .iter()
905                    .take(3)
906                    .flat_map(|s| s.table_infos.clone())
907                    .map(|s| s.sst_id)
908                    .collect_vec(),
909                vec![9, 10, 7, 8, 5, 6]
910            );
911            assert_eq!(
912                result.input_levels[3]
913                    .table_infos
914                    .iter()
915                    .map(|s| s.sst_id)
916                    .collect_vec(),
917                vec![3]
918            );
919            assert_eq!(result.target_level, 1);
920        }
921
922        {
923            let option = ManualCompactionOption {
924                sst_ids: vec![],
925                level: input_level,
926                key_range: KeyRange {
927                    left: Bytes::default(),
928                    right: Bytes::default(),
929                    right_exclusive: false,
930                },
931                // Only include partial top sub level's table id, but the whole top sub level is
932                // picked.
933                internal_table_id: HashSet::from([1]),
934            };
935            let mut picker = ManualCompactionPicker::new(
936                Arc::new(RangeOverlapStrategy::default()),
937                option,
938                target_level,
939            );
940            let result = picker
941                .pick_compaction(&levels, &levels_handler, &mut local_stats)
942                .unwrap();
943            result.add_pending_task(0, &mut levels_handler);
944            assert_eq!(result.input_levels.len(), 2);
945            assert!(is_l0_to_lbase(&result));
946            assert_eq!(result.target_level, 1);
947            assert_eq!(
948                result
949                    .input_levels
950                    .iter()
951                    .take(1)
952                    .flat_map(|s| s.table_infos.clone())
953                    .map(|s| s.sst_id)
954                    .collect_vec(),
955                vec![5, 6]
956            );
957            assert_eq!(
958                result.input_levels[1]
959                    .table_infos
960                    .iter()
961                    .map(|s| s.sst_id)
962                    .collect_vec(),
963                vec![3]
964            );
965
966            // Pick bottom sub level while top sub level is pending
967            let option = ManualCompactionOption {
968                sst_ids: vec![],
969                level: input_level,
970                key_range: KeyRange {
971                    left: Bytes::default(),
972                    right: Bytes::default(),
973                    right_exclusive: false,
974                },
975                // Only include bottom sub level's table id
976                internal_table_id: HashSet::from([3]),
977            };
978            let mut picker = ManualCompactionPicker::new(
979                Arc::new(RangeOverlapStrategy::default()),
980                option,
981                target_level,
982            );
983            // Because top sub-level is pending.
984            assert!(
985                picker
986                    .pick_compaction(&levels, &levels_handler, &mut local_stats)
987                    .is_none()
988            );
989
990            clean_task_state(&mut levels_handler[0]);
991            clean_task_state(&mut levels_handler[1]);
992        }
993    }
994
995    #[test]
996    fn test_ln_to_lnext_option_internal_table() {
997        let (levels, levels_handler) = generate_test_levels();
998        let input_level = 1;
999        let target_level = input_level + 1;
1000        let mut local_stats = LocalPickerStatistic::default();
1001        {
1002            let option = ManualCompactionOption {
1003                sst_ids: vec![],
1004                level: input_level,
1005                key_range: KeyRange {
1006                    left: Bytes::default(),
1007                    right: Bytes::default(),
1008                    right_exclusive: false,
1009                },
1010                // No matching internal table id.
1011                internal_table_id: HashSet::from([100]),
1012            };
1013            let mut picker = ManualCompactionPicker::new(
1014                Arc::new(RangeOverlapStrategy::default()),
1015                option,
1016                target_level,
1017            );
1018            assert!(
1019                picker
1020                    .pick_compaction(&levels, &levels_handler, &mut local_stats)
1021                    .is_none()
1022            )
1023        }
1024
1025        {
1026            let expected_input_level_sst_ids = [vec![4], vec![2]];
1027            let option = ManualCompactionOption {
1028                sst_ids: vec![],
1029                level: input_level,
1030                key_range: KeyRange {
1031                    left: Bytes::default(),
1032                    right: Bytes::default(),
1033                    right_exclusive: false,
1034                },
1035                // Only include partial input level's table id
1036                internal_table_id: HashSet::from([1]),
1037            };
1038            let mut picker = ManualCompactionPicker::new(
1039                Arc::new(RangeOverlapStrategy::default()),
1040                option,
1041                target_level,
1042            );
1043            let result = picker
1044                .pick_compaction(&levels, &levels_handler, &mut local_stats)
1045                .unwrap();
1046            assert_eq!(
1047                result.input_levels.len(),
1048                expected_input_level_sst_ids.len()
1049            );
1050            assert_eq!(result.target_level, target_level);
1051            for (l, e) in expected_input_level_sst_ids
1052                .iter()
1053                .enumerate()
1054                .take(result.input_levels.len())
1055            {
1056                assert_eq!(
1057                    result.input_levels[l]
1058                        .table_infos
1059                        .iter()
1060                        .map(|s| s.sst_id)
1061                        .collect_vec(),
1062                    *e
1063                );
1064            }
1065        }
1066    }
1067
1068    #[test]
1069    fn test_ln_to_lnext_option_sst_ids() {
1070        let (levels, levels_handler) = generate_test_levels();
1071        // (input_level, sst_id_filter, expected_result_input_level_ssts)
1072        let sst_id_filters = vec![
1073            (1, vec![3], vec![vec![3], vec![1]]),
1074            (1, vec![4], vec![vec![4], vec![2]]),
1075            (1, vec![3, 4], vec![vec![3, 4], vec![1, 2]]),
1076        ];
1077        let mut local_stats = LocalPickerStatistic::default();
1078        for (input_level, sst_id_filter, expected) in &sst_id_filters {
1079            let option = ManualCompactionOption {
1080                sst_ids: sst_id_filter.iter().cloned().map(Into::into).collect(),
1081                level: *input_level as _,
1082                key_range: KeyRange {
1083                    left: Bytes::default(),
1084                    right: Bytes::default(),
1085                    right_exclusive: false,
1086                },
1087                internal_table_id: HashSet::default(),
1088            };
1089            let mut picker = ManualCompactionPicker::new(
1090                Arc::new(RangeOverlapStrategy::default()),
1091                option.clone(),
1092                input_level + 1,
1093            );
1094            let result = picker
1095                .pick_compaction(&levels, &levels_handler, &mut local_stats)
1096                .unwrap();
1097            assert_eq!(result.input_levels.len(), expected.len());
1098            for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
1099                assert_eq!(
1100                    result.input_levels[i]
1101                        .table_infos
1102                        .iter()
1103                        .map(|s| s.sst_id)
1104                        .collect_vec(),
1105                    *e
1106                );
1107            }
1108        }
1109    }
1110
1111    #[test]
1112    fn test_ln_to_ln() {
1113        let (levels, levels_handler) = generate_intra_test_levels();
1114        // (input_level, sst_id_filter, expected_result_input_level_ssts)
1115        let sst_id_filters = vec![
1116            (1, vec![1], vec![vec![1], vec![]]),
1117            (1, vec![3], vec![vec![3], vec![]]),
1118            (1, vec![4], vec![vec![4], vec![]]),
1119            (1, vec![3, 4], vec![vec![3, 4], vec![]]),
1120            (1, vec![1, 4], vec![vec![1, 2, 3, 4], vec![]]),
1121            (1, vec![2, 4], vec![vec![2, 3, 4], vec![]]),
1122            (1, vec![1, 3], vec![vec![1, 2, 3], vec![]]),
1123        ];
1124        for (input_level, sst_id_filter, expected) in &sst_id_filters {
1125            let option = ManualCompactionOption {
1126                sst_ids: sst_id_filter.iter().cloned().map(Into::into).collect(),
1127                level: *input_level as _,
1128                key_range: KeyRange {
1129                    left: Bytes::default(),
1130                    right: Bytes::default(),
1131                    right_exclusive: false,
1132                },
1133                internal_table_id: HashSet::default(),
1134            };
1135            let mut picker = ManualCompactionPicker::new(
1136                Arc::new(RangeOverlapStrategy::default()),
1137                option.clone(),
1138                *input_level as _,
1139            );
1140            let result = picker
1141                .pick_compaction(
1142                    &levels,
1143                    &levels_handler,
1144                    &mut LocalPickerStatistic::default(),
1145                )
1146                .unwrap();
1147            assert_eq!(result.input_levels.len(), expected.len());
1148            for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
1149                assert_eq!(
1150                    result.input_levels[i]
1151                        .table_infos
1152                        .iter()
1153                        .map(|s| s.sst_id)
1154                        .collect_vec(),
1155                    *e
1156                );
1157            }
1158        }
1159    }
1160
1161    #[test]
1162    fn test_manual_compaction_selector_l0() {
1163        let config = CompactionConfigBuilder::new().max_level(4).build();
1164        let group_config = CompactionGroup::new(1, config);
1165        let l0 = generate_l0_nonoverlapping_sublevels(vec![
1166            generate_table(0, 1, 0, 500, 1),
1167            generate_table(1, 1, 0, 500, 1),
1168        ]);
1169        assert_eq!(l0.sub_levels.len(), 2);
1170        let levels = vec![
1171            generate_level(1, vec![]),
1172            generate_level(2, vec![]),
1173            generate_level(3, vec![]),
1174            Level {
1175                level_idx: 4,
1176                level_type: LevelType::Nonoverlapping,
1177                table_infos: vec![
1178                    generate_table(2, 1, 0, 100, 1),
1179                    generate_table(3, 1, 101, 200, 1),
1180                    generate_table(4, 1, 222, 300, 1),
1181                ],
1182                ..Default::default()
1183            },
1184        ];
1185        assert_eq!(levels.len(), 4);
1186        let levels = Levels {
1187            levels,
1188            l0,
1189            ..Default::default()
1190        };
1191        let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
1192        let mut local_stats = LocalSelectorStatistic::default();
1193
1194        // pick_l0_to_sub_level
1195        {
1196            let option = ManualCompactionOption {
1197                sst_ids: [0, 1].iter().cloned().map(Into::into).collect(),
1198                key_range: KeyRange {
1199                    left: Bytes::default(),
1200                    right: Bytes::default(),
1201                    right_exclusive: false,
1202                },
1203                internal_table_id: HashSet::default(),
1204                level: 0,
1205            };
1206            let mut selector = ManualCompactionSelector::new(option);
1207            let task = selector
1208                .pick_compaction(
1209                    1,
1210                    compaction_selector_context(
1211                        &group_config,
1212                        &levels,
1213                        &BTreeSet::new(),
1214                        &mut levels_handler,
1215                        &mut local_stats,
1216                        &HashMap::default(),
1217                        Arc::new(CompactionDeveloperConfig::default()),
1218                        &Default::default(),
1219                        &HummockVersionStateTableInfo::empty(),
1220                    ),
1221                )
1222                .unwrap();
1223            assert_compaction_task(&task, &levels_handler);
1224            assert_eq!(task.input.input_levels.len(), 2);
1225            assert_eq!(task.input.input_levels[0].level_idx, 0);
1226            assert_eq!(task.input.input_levels[1].level_idx, 0);
1227            assert_eq!(task.input.target_level, 0);
1228        }
1229
1230        for level_handler in &mut levels_handler {
1231            for pending_task_id in &level_handler.pending_tasks_ids() {
1232                level_handler.remove_task(*pending_task_id);
1233            }
1234        }
1235
1236        // pick_l0_to_base_level
1237        {
1238            let option = ManualCompactionOption {
1239                sst_ids: vec![],
1240                key_range: KeyRange {
1241                    left: Bytes::default(),
1242                    right: Bytes::default(),
1243                    right_exclusive: false,
1244                },
1245                internal_table_id: HashSet::default(),
1246                level: 0,
1247            };
1248            let mut selector = ManualCompactionSelector::new(option);
1249            let task = selector
1250                .pick_compaction(
1251                    2,
1252                    compaction_selector_context(
1253                        &group_config,
1254                        &levels,
1255                        &BTreeSet::new(),
1256                        &mut levels_handler,
1257                        &mut local_stats,
1258                        &HashMap::default(),
1259                        Arc::new(CompactionDeveloperConfig::default()),
1260                        &Default::default(),
1261                        &HummockVersionStateTableInfo::empty(),
1262                    ),
1263                )
1264                .unwrap();
1265            assert_compaction_task(&task, &levels_handler);
1266            assert_eq!(task.input.input_levels.len(), 3);
1267            assert_eq!(task.input.input_levels[0].level_idx, 0);
1268            assert_eq!(task.input.input_levels[1].level_idx, 0);
1269            assert_eq!(task.input.input_levels[2].level_idx, 4);
1270            assert_eq!(task.input.target_level, 4);
1271        }
1272    }
1273
1274    /// tests `DynamicLevelSelector::manual_pick_compaction`
1275    #[test]
1276    fn test_manual_compaction_selector() {
1277        let config = CompactionConfigBuilder::new().max_level(4).build();
1278        let group_config = CompactionGroup::new(1, config);
1279        let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
1280        assert_eq!(l0.sub_levels.len(), 0);
1281        let levels = vec![
1282            generate_level(1, vec![]),
1283            generate_level(2, vec![]),
1284            generate_level(
1285                3,
1286                vec![
1287                    generate_table(0, 1, 150, 151, 1),
1288                    generate_table(1, 1, 250, 251, 1),
1289                ],
1290            ),
1291            Level {
1292                level_idx: 4,
1293                level_type: LevelType::Nonoverlapping,
1294                table_infos: vec![
1295                    generate_table(2, 1, 0, 100, 1),
1296                    generate_table(3, 1, 101, 200, 1),
1297                    generate_table(4, 1, 222, 300, 1),
1298                    generate_table(5, 1, 333, 400, 1),
1299                    generate_table(6, 1, 444, 500, 1),
1300                    generate_table(7, 1, 555, 600, 1),
1301                ],
1302                ..Default::default()
1303            },
1304        ];
1305        assert_eq!(levels.len(), 4);
1306        let levels = Levels {
1307            levels,
1308            l0,
1309            ..Default::default()
1310        };
1311        let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
1312        let mut local_stats = LocalSelectorStatistic::default();
1313
1314        // pick l3 -> l4
1315        {
1316            let option = ManualCompactionOption {
1317                sst_ids: [0, 1].iter().cloned().map(Into::into).collect(),
1318                key_range: KeyRange {
1319                    left: Bytes::default(),
1320                    right: Bytes::default(),
1321                    right_exclusive: false,
1322                },
1323                internal_table_id: HashSet::default(),
1324                level: 3,
1325            };
1326            let mut selector = ManualCompactionSelector::new(option);
1327            let task = selector
1328                .pick_compaction(
1329                    1,
1330                    compaction_selector_context(
1331                        &group_config,
1332                        &levels,
1333                        &BTreeSet::new(),
1334                        &mut levels_handler,
1335                        &mut local_stats,
1336                        &HashMap::default(),
1337                        Arc::new(CompactionDeveloperConfig::default()),
1338                        &Default::default(),
1339                        &HummockVersionStateTableInfo::empty(),
1340                    ),
1341                )
1342                .unwrap();
1343            assert_compaction_task(&task, &levels_handler);
1344            assert_eq!(task.input.input_levels.len(), 2);
1345            assert_eq!(task.input.input_levels[0].level_idx, 3);
1346            assert_eq!(task.input.input_levels[0].table_infos.len(), 2);
1347            assert_eq!(task.input.input_levels[1].level_idx, 4);
1348            assert_eq!(task.input.input_levels[1].table_infos.len(), 2);
1349            assert_eq!(task.input.target_level, 4);
1350        }
1351
1352        for level_handler in &mut levels_handler {
1353            for pending_task_id in &level_handler.pending_tasks_ids() {
1354                level_handler.remove_task(*pending_task_id);
1355            }
1356        }
1357
1358        // pick l4 -> l4
1359        {
1360            let option = ManualCompactionOption {
1361                sst_ids: vec![],
1362                key_range: KeyRange {
1363                    left: Bytes::default(),
1364                    right: Bytes::default(),
1365                    right_exclusive: false,
1366                },
1367                internal_table_id: HashSet::default(),
1368                level: 4,
1369            };
1370            let mut selector = ManualCompactionSelector::new(option);
1371            let task = selector
1372                .pick_compaction(
1373                    1,
1374                    compaction_selector_context(
1375                        &group_config,
1376                        &levels,
1377                        &BTreeSet::new(),
1378                        &mut levels_handler,
1379                        &mut local_stats,
1380                        &HashMap::default(),
1381                        Arc::new(CompactionDeveloperConfig::default()),
1382                        &Default::default(),
1383                        &HummockVersionStateTableInfo::empty(),
1384                    ),
1385                )
1386                .unwrap();
1387            assert_compaction_task(&task, &levels_handler);
1388            assert_eq!(task.input.input_levels.len(), 2);
1389            assert_eq!(task.input.input_levels[0].level_idx, 4);
1390            assert_eq!(task.input.input_levels[0].table_infos.len(), 6);
1391            assert_eq!(task.input.input_levels[1].level_idx, 4);
1392            assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
1393            assert_eq!(task.input.target_level, 4);
1394            assert!(matches!(
1395                task.compaction_task_type,
1396                compact_task::TaskType::Manual
1397            ));
1398        }
1399    }
1400}