risingwave_meta/hummock/compaction/picker/
manual_compaction_picker.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_hummock_sdk::HummockSstableId;
20use risingwave_hummock_sdk::level::{InputLevel, Level, Levels, OverlappingLevel};
21use risingwave_hummock_sdk::sstable_info::SstableInfo;
22use risingwave_pb::hummock::LevelType;
23
24use super::{CompactionInput, CompactionPicker, LocalPickerStatistic};
25use crate::hummock::compaction::overlap_strategy::{
26    OverlapInfo, OverlapStrategy, RangeOverlapInfo,
27};
28use crate::hummock::compaction::selector::ManualCompactionOption;
29use crate::hummock::level_handler::LevelHandler;
30
31pub struct ManualCompactionPicker {
32    overlap_strategy: Arc<dyn OverlapStrategy>,
33    option: ManualCompactionOption,
34    target_level: usize,
35}
36
37impl ManualCompactionPicker {
38    pub fn new(
39        overlap_strategy: Arc<dyn OverlapStrategy>,
40        option: ManualCompactionOption,
41        target_level: usize,
42    ) -> Self {
43        Self {
44            overlap_strategy,
45            option,
46            target_level,
47        }
48    }
49
50    fn pick_l0_to_sub_level(
51        &self,
52        l0: &OverlappingLevel,
53        level_handlers: &[LevelHandler],
54    ) -> Option<CompactionInput> {
55        assert_eq!(self.option.level, 0);
56        let mut input_levels = vec![];
57        let mut sub_level_id = 0;
58        let mut start_idx = None;
59        let mut end_idx = None;
60        // Decides the range of sub levels as input.
61        // We need pick consecutive sub_levels. See #5217.
62        for (idx, level) in l0.sub_levels.iter().enumerate() {
63            if !self.filter_level_by_option(level) {
64                continue;
65            }
66            if level_handlers[0].is_level_pending_compact(level) {
67                return None;
68            }
69            // Pick this sub_level.
70            if start_idx.is_none() {
71                sub_level_id = level.sub_level_id;
72                start_idx = Some(idx as u64);
73                end_idx = start_idx;
74            } else {
75                end_idx = Some(idx as u64);
76            }
77        }
78        let (start_idx, end_idx) = match (start_idx, end_idx) {
79            (Some(start_idx), Some(end_idx)) => (start_idx, end_idx),
80            _ => {
81                return None;
82            }
83        };
84        // Construct input.
85        for level in l0
86            .sub_levels
87            .iter()
88            .skip(start_idx as usize)
89            .take((end_idx - start_idx + 1) as usize)
90        {
91            input_levels.push(InputLevel {
92                level_idx: 0,
93                level_type: level.level_type,
94                table_infos: level.table_infos.clone(),
95            });
96        }
97        if input_levels.is_empty() {
98            return None;
99        }
100        input_levels.reverse();
101        Some(CompactionInput {
102            input_levels,
103            target_level: 0,
104            target_sub_level_id: sub_level_id,
105            ..Default::default()
106        })
107    }
108
109    fn pick_l0_to_base_level(
110        &self,
111        levels: &Levels,
112        level_handlers: &[LevelHandler],
113    ) -> Option<CompactionInput> {
114        assert!(self.option.level == 0 && self.target_level > 0);
115        for l in 1..self.target_level {
116            assert!(levels.levels[l - 1].table_infos.is_empty());
117        }
118        let l0 = &levels.l0;
119        let mut input_levels = vec![];
120        let mut max_sub_level_idx = usize::MAX;
121        let mut info = self.overlap_strategy.create_overlap_info();
122        // Decides the range of sub levels as input.
123        for (idx, level) in l0.sub_levels.iter().enumerate() {
124            if !self.filter_level_by_option(level) {
125                continue;
126            }
127            if level_handlers[0].is_level_pending_compact(level) {
128                return None;
129            }
130
131            // Pick this sub_level.
132            max_sub_level_idx = idx;
133        }
134        if max_sub_level_idx == usize::MAX {
135            return None;
136        }
137        // Construct input.
138        for idx in 0..=max_sub_level_idx {
139            for table in &l0.sub_levels[idx].table_infos {
140                info.update(&table.key_range);
141            }
142            input_levels.push(InputLevel {
143                level_idx: 0,
144                level_type: l0.sub_levels[idx].level_type,
145                table_infos: l0.sub_levels[idx].table_infos.clone(),
146            })
147        }
148        let target_input_ssts_range =
149            info.check_multiple_overlap(&levels.levels[self.target_level - 1].table_infos);
150        let target_input_ssts = if target_input_ssts_range.is_empty() {
151            vec![]
152        } else {
153            levels.levels[self.target_level - 1].table_infos[target_input_ssts_range].to_vec()
154        };
155        if target_input_ssts
156            .iter()
157            .any(|table| level_handlers[self.target_level].is_pending_compact(&table.sst_id))
158        {
159            return None;
160        }
161        if input_levels.is_empty() {
162            return None;
163        }
164        input_levels.reverse();
165        input_levels.push(InputLevel {
166            level_idx: self.target_level as u32,
167            level_type: LevelType::Nonoverlapping,
168            table_infos: target_input_ssts,
169        });
170
171        Some(CompactionInput {
172            input_levels,
173            target_level: self.target_level,
174            target_sub_level_id: 0,
175            ..Default::default()
176        })
177    }
178
179    /// Returns false if the given `sst` is rejected by filter defined by `option`.
180    /// Otherwise returns true.
181    fn filter_level_by_option(&self, level: &Level) -> bool {
182        let mut hint_sst_ids: HashSet<HummockSstableId> = HashSet::new();
183        hint_sst_ids.extend(self.option.sst_ids.iter());
184        if self
185            .overlap_strategy
186            .check_overlap_with_range(&self.option.key_range, &level.table_infos)
187            .is_empty()
188        {
189            return false;
190        }
191        if !hint_sst_ids.is_empty()
192            && !level
193                .table_infos
194                .iter()
195                .any(|t| hint_sst_ids.contains(&t.sst_id))
196        {
197            return false;
198        }
199        if !self.option.internal_table_id.is_empty()
200            && !level.table_infos.iter().any(|sst_info| {
201                sst_info
202                    .table_ids
203                    .iter()
204                    .any(|t| self.option.internal_table_id.contains(t))
205            })
206        {
207            return false;
208        }
209        true
210    }
211}
212
213impl CompactionPicker for ManualCompactionPicker {
214    fn pick_compaction(
215        &mut self,
216        levels: &Levels,
217        level_handlers: &[LevelHandler],
218        _stats: &mut LocalPickerStatistic,
219    ) -> Option<CompactionInput> {
220        if self.option.exclusive
221            && level_handlers
222                .iter()
223                .any(|level_handler| level_handler.pending_file_count() > 0)
224        {
225            return None;
226        }
227        if self.option.level == 0 {
228            if !self.option.sst_ids.is_empty() {
229                return self.pick_l0_to_sub_level(&levels.l0, level_handlers);
230            } else if self.target_level > 0 {
231                return self.pick_l0_to_base_level(levels, level_handlers);
232            } else {
233                return None;
234            }
235        }
236        let mut hint_sst_ids: HashSet<HummockSstableId> = HashSet::new();
237        hint_sst_ids.extend(self.option.sst_ids.iter());
238        let mut range_overlap_info = RangeOverlapInfo::default();
239        range_overlap_info.update(&self.option.key_range);
240        let level = self.option.level;
241        let target_level = self.target_level;
242        assert!(
243            self.option.level == self.target_level || self.option.level + 1 == self.target_level
244        );
245        // We either include all `select_input_ssts` as input, or return None.
246        let mut select_input_ssts: Vec<SstableInfo> = levels
247            .get_level(self.option.level)
248            .table_infos
249            .iter()
250            .filter(|sst_info| hint_sst_ids.is_empty() || hint_sst_ids.contains(&sst_info.sst_id))
251            .filter(|sst_info| range_overlap_info.check_overlap(sst_info))
252            .filter(|sst_info| {
253                if self.option.internal_table_id.is_empty() {
254                    return true;
255                }
256
257                // to filter sst_file by table_id
258                for table_id in &sst_info.table_ids {
259                    if self.option.internal_table_id.contains(table_id) {
260                        return true;
261                    }
262                }
263                false
264            })
265            .cloned()
266            .collect();
267        if select_input_ssts.is_empty() {
268            return None;
269        }
270        let target_input_ssts = if target_level == level {
271            // For intra level compaction, input SSTs must be consecutive.
272            let (left, _) = levels
273                .get_level(level)
274                .table_infos
275                .iter()
276                .find_position(|p| p.sst_id == select_input_ssts.first().unwrap().sst_id)
277                .unwrap();
278            let (right, _) = levels
279                .get_level(level)
280                .table_infos
281                .iter()
282                .find_position(|p| p.sst_id == select_input_ssts.last().unwrap().sst_id)
283                .unwrap();
284            select_input_ssts = levels.get_level(level).table_infos[left..=right].to_vec();
285            vec![]
286        } else {
287            self.overlap_strategy.check_base_level_overlap(
288                &select_input_ssts,
289                &levels.get_level(target_level).table_infos,
290            )
291        };
292        if select_input_ssts
293            .iter()
294            .any(|table| level_handlers[level].is_pending_compact(&table.sst_id))
295        {
296            return None;
297        }
298        if target_input_ssts
299            .iter()
300            .any(|table| level_handlers[target_level].is_pending_compact(&table.sst_id))
301        {
302            return None;
303        }
304
305        Some(CompactionInput {
306            select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
307            target_input_size: target_input_ssts.iter().map(|sst| sst.sst_size).sum(),
308            total_file_count: (select_input_ssts.len() + target_input_ssts.len()) as u64,
309            input_levels: vec![
310                InputLevel {
311                    level_idx: level as u32,
312                    level_type: levels.levels[level - 1].level_type,
313                    table_infos: select_input_ssts,
314                },
315                InputLevel {
316                    level_idx: target_level as u32,
317                    level_type: levels.levels[target_level - 1].level_type,
318                    table_infos: target_input_ssts,
319                },
320            ],
321            target_level,
322            ..Default::default()
323        })
324    }
325}
326
327#[cfg(test)]
328pub mod tests {
329    use std::collections::{BTreeSet, HashMap};
330
331    use bytes::Bytes;
332    use risingwave_hummock_sdk::key_range::KeyRange;
333    use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
334    use risingwave_pb::hummock::compact_task;
335
336    use super::*;
337    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
338    use crate::hummock::compaction::overlap_strategy::RangeOverlapStrategy;
339    use crate::hummock::compaction::selector::tests::{
340        assert_compaction_task, generate_l0_nonoverlapping_sublevels,
341        generate_l0_overlapping_sublevels, generate_level, generate_table,
342    };
343    use crate::hummock::compaction::selector::{CompactionSelector, ManualCompactionSelector};
344    use crate::hummock::compaction::{CompactionDeveloperConfig, LocalSelectorStatistic};
345    use crate::hummock::model::CompactionGroup;
346    use crate::hummock::test_utils::{compaction_selector_context, iterator_test_key_of_epoch};
347
348    fn clean_task_state(level_handler: &mut LevelHandler) {
349        for pending_task_id in &level_handler.pending_tasks_ids() {
350            level_handler.remove_task(*pending_task_id);
351        }
352    }
353
354    fn is_l0_to_lbase(compaction_input: &CompactionInput) -> bool {
355        compaction_input
356            .input_levels
357            .iter()
358            .take(compaction_input.input_levels.len() - 1)
359            .all(|i| i.level_idx == 0)
360            && compaction_input
361                .input_levels
362                .iter()
363                .last()
364                .unwrap()
365                .level_idx as usize
366                == compaction_input.target_level
367            && compaction_input.target_level > 0
368    }
369
370    fn is_l0_to_l0(compaction_input: &CompactionInput) -> bool {
371        compaction_input
372            .input_levels
373            .iter()
374            .all(|i| i.level_idx == 0)
375            && compaction_input.target_level == 0
376    }
377
378    #[test]
379    fn test_manual_compaction_picker() {
380        let levels = vec![
381            Level {
382                level_idx: 1,
383                level_type: LevelType::Nonoverlapping,
384                table_infos: vec![
385                    generate_table(0, 1, 0, 100, 1),
386                    generate_table(1, 1, 101, 200, 1),
387                    generate_table(2, 1, 222, 300, 1),
388                ],
389                ..Default::default()
390            },
391            Level {
392                level_idx: 2,
393                level_type: LevelType::Nonoverlapping,
394                table_infos: vec![
395                    generate_table(4, 1, 0, 100, 1),
396                    generate_table(5, 1, 101, 150, 1),
397                    generate_table(6, 1, 151, 201, 1),
398                    generate_table(7, 1, 501, 800, 1),
399                    generate_table(8, 2, 301, 400, 1),
400                ],
401                ..Default::default()
402            },
403        ];
404        let mut levels = Levels {
405            levels,
406            l0: generate_l0_nonoverlapping_sublevels(vec![]),
407            ..Default::default()
408        };
409        let mut levels_handler = vec![
410            LevelHandler::new(0),
411            LevelHandler::new(1),
412            LevelHandler::new(2),
413        ];
414        let mut local_stats = LocalPickerStatistic::default();
415
416        {
417            // test key_range option
418            let option = ManualCompactionOption {
419                level: 1,
420                key_range: KeyRange {
421                    left: Bytes::from(iterator_test_key_of_epoch(1, 0, 1)),
422                    right: Bytes::from(iterator_test_key_of_epoch(1, 201, 1)),
423                    right_exclusive: false,
424                },
425                ..Default::default()
426            };
427
428            let target_level = option.level + 1;
429            let mut picker = ManualCompactionPicker::new(
430                Arc::new(RangeOverlapStrategy::default()),
431                option,
432                target_level,
433            );
434            let result = picker
435                .pick_compaction(&levels, &levels_handler, &mut local_stats)
436                .unwrap();
437            result.add_pending_task(0, &mut levels_handler);
438
439            assert_eq!(2, result.input_levels[0].table_infos.len());
440            assert_eq!(3, result.input_levels[1].table_infos.len());
441        }
442
443        {
444            clean_task_state(&mut levels_handler[1]);
445            clean_task_state(&mut levels_handler[2]);
446
447            // test all key range
448            let option = ManualCompactionOption::default();
449            let target_level = option.level + 1;
450            let mut picker = ManualCompactionPicker::new(
451                Arc::new(RangeOverlapStrategy::default()),
452                option,
453                target_level,
454            );
455            let result = picker
456                .pick_compaction(&levels, &levels_handler, &mut local_stats)
457                .unwrap();
458            result.add_pending_task(0, &mut levels_handler);
459
460            assert_eq!(3, result.input_levels[0].table_infos.len());
461            assert_eq!(3, result.input_levels[1].table_infos.len());
462        }
463
464        {
465            clean_task_state(&mut levels_handler[1]);
466            clean_task_state(&mut levels_handler[2]);
467
468            let level_table_info = &mut levels.levels[0].table_infos;
469            let table_info_1 = &mut level_table_info[1];
470            let mut t_inner = table_info_1.get_inner();
471            t_inner.table_ids.resize(2, 0.into());
472            t_inner.table_ids[0] = 1.into();
473            t_inner.table_ids[1] = 2.into();
474            *table_info_1 = t_inner.into();
475
476            // test internal_table_id
477            let option = ManualCompactionOption {
478                level: 1,
479                internal_table_id: HashSet::from([2.into()]),
480                ..Default::default()
481            };
482
483            let target_level = option.level + 1;
484            let mut picker = ManualCompactionPicker::new(
485                Arc::new(RangeOverlapStrategy::default()),
486                option,
487                target_level,
488            );
489
490            let result = picker
491                .pick_compaction(&levels, &levels_handler, &mut local_stats)
492                .unwrap();
493            result.add_pending_task(0, &mut levels_handler);
494
495            assert_eq!(1, result.input_levels[0].table_infos.len());
496            assert_eq!(2, result.input_levels[1].table_infos.len());
497        }
498
499        {
500            clean_task_state(&mut levels_handler[1]);
501            clean_task_state(&mut levels_handler[2]);
502
503            // include all table_info
504            let level_table_info = &mut levels.levels[0].table_infos;
505            for table_info in level_table_info {
506                let mut t_inner = table_info.get_inner();
507                t_inner.table_ids.resize(2, 0.into());
508                t_inner.table_ids[0] = 1.into();
509                t_inner.table_ids[1] = 2.into();
510                *table_info = t_inner.into();
511            }
512
513            // test key range filter first
514            let option = ManualCompactionOption {
515                sst_ids: vec![],
516                level: 1,
517                key_range: KeyRange {
518                    left: Bytes::from(iterator_test_key_of_epoch(1, 101, 1)),
519                    right: Bytes::from(iterator_test_key_of_epoch(1, 199, 1)),
520                    right_exclusive: false,
521                },
522                internal_table_id: HashSet::from([2.into()]),
523                target_level: None,
524                exclusive: false,
525            };
526
527            let target_level = option.level + 1;
528            let mut picker = ManualCompactionPicker::new(
529                Arc::new(RangeOverlapStrategy::default()),
530                option,
531                target_level,
532            );
533
534            let result = picker
535                .pick_compaction(&levels, &levels_handler, &mut local_stats)
536                .unwrap();
537
538            assert_eq!(1, result.input_levels[0].table_infos.len());
539            assert_eq!(2, result.input_levels[1].table_infos.len());
540        }
541    }
542
543    #[test]
544    fn test_manual_compaction_exclusive_blocked_by_pending() {
545        let (levels, mut levels_handler) = generate_test_levels();
546        let option = ManualCompactionOption {
547            exclusive: true,
548            ..Default::default()
549        };
550        let target_level = option.level + 1;
551        let mut picker = ManualCompactionPicker::new(
552            Arc::new(RangeOverlapStrategy::default()),
553            option,
554            target_level,
555        );
556
557        let pending_sst_id = levels.levels[0].table_infos[0].sst_id;
558        levels_handler[1].test_add_pending_sst(pending_sst_id, 1);
559
560        assert!(
561            picker
562                .pick_compaction(
563                    &levels,
564                    &levels_handler,
565                    &mut LocalPickerStatistic::default()
566                )
567                .is_none()
568        );
569    }
570
571    fn generate_test_levels() -> (Levels, Vec<LevelHandler>) {
572        let mut l0 = generate_l0_overlapping_sublevels(vec![
573            vec![
574                generate_table(5, 1, 0, 500, 2),
575                generate_table(6, 2, 600, 1000, 2),
576            ],
577            vec![
578                generate_table(7, 1, 0, 500, 3),
579                generate_table(8, 2, 600, 1000, 3),
580            ],
581            vec![
582                generate_table(9, 1, 300, 500, 4),
583                generate_table(10, 2, 600, 1000, 4),
584            ],
585        ]);
586        // Set a nonoverlapping sub_level.
587        l0.sub_levels[1].level_type = LevelType::Nonoverlapping as _;
588        assert_eq!(l0.sub_levels.len(), 3);
589        let mut levels = vec![
590            Level {
591                level_idx: 1,
592                level_type: LevelType::Nonoverlapping,
593                table_infos: vec![
594                    generate_table(3, 1, 0, 100, 1),
595                    generate_table(4, 2, 2000, 3000, 1),
596                ],
597                ..Default::default()
598            },
599            Level {
600                level_idx: 2,
601                level_type: LevelType::Nonoverlapping,
602                table_infos: vec![
603                    generate_table(1, 1, 0, 100, 1),
604                    generate_table(2, 2, 2000, 3000, 1),
605                ],
606                ..Default::default()
607            },
608        ];
609        // Set internal_table_ids.
610        assert_eq!(levels.len(), 2);
611        for iter in [l0.sub_levels.iter_mut(), levels.iter_mut()] {
612            for (idx, l) in iter.enumerate() {
613                for t in &mut l.table_infos {
614                    let mut t_inner = t.get_inner();
615                    t_inner.table_ids.clear();
616                    if idx == 0 {
617                        t_inner
618                            .table_ids
619                            .push((((t.sst_id.as_raw_id() % 2) + 1) as u32).into());
620                    } else {
621                        t_inner.table_ids.push(3.into());
622                    }
623                    *t = t_inner.into();
624                }
625            }
626        }
627        let levels = Levels {
628            levels,
629            l0,
630            ..Default::default()
631        };
632
633        let levels_handler = vec![
634            LevelHandler::new(0),
635            LevelHandler::new(1),
636            LevelHandler::new(2),
637        ];
638        (levels, levels_handler)
639    }
640
641    fn generate_intra_test_levels() -> (Levels, Vec<LevelHandler>) {
642        let l0 = generate_l0_overlapping_sublevels(vec![]);
643        let levels = vec![Level {
644            level_idx: 1,
645            level_type: LevelType::Nonoverlapping,
646            table_infos: vec![
647                generate_table(1, 1, 0, 100, 1),
648                generate_table(2, 2, 100, 200, 1),
649                generate_table(3, 2, 200, 300, 1),
650                generate_table(4, 2, 300, 400, 1),
651            ],
652            ..Default::default()
653        }];
654        let levels = Levels {
655            levels,
656            l0,
657            ..Default::default()
658        };
659
660        let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
661        (levels, levels_handler)
662    }
663
664    #[test]
665    fn test_l0_empty() {
666        let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
667        let levels = vec![Level {
668            level_idx: 1,
669            level_type: LevelType::Nonoverlapping,
670            table_infos: vec![],
671            total_file_size: 0,
672            sub_level_id: 0,
673            uncompressed_file_size: 0,
674            ..Default::default()
675        }];
676        let levels = Levels {
677            levels,
678            l0,
679            ..Default::default()
680        };
681        let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
682        let option = ManualCompactionOption {
683            sst_ids: vec![1.into()],
684            level: 0,
685            key_range: KeyRange {
686                left: Bytes::default(),
687                right: Bytes::default(),
688                right_exclusive: false,
689            },
690            internal_table_id: HashSet::default(),
691            target_level: None,
692            exclusive: false,
693        };
694        let mut picker =
695            ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 0);
696        assert!(
697            picker
698                .pick_compaction(
699                    &levels,
700                    &levels_handler,
701                    &mut LocalPickerStatistic::default()
702                )
703                .is_none()
704        );
705    }
706
707    #[test]
708    fn test_l0_basic() {
709        let (levels, levels_handler) = generate_test_levels();
710
711        // target_level == 0, None
712        let option = ManualCompactionOption {
713            sst_ids: vec![],
714            level: 0,
715            key_range: KeyRange {
716                left: Bytes::default(),
717                right: Bytes::default(),
718                right_exclusive: false,
719            },
720            internal_table_id: HashSet::default(),
721            target_level: None,
722            exclusive: false,
723        };
724        let mut picker = ManualCompactionPicker::new(
725            Arc::new(RangeOverlapStrategy::default()),
726            option.clone(),
727            0,
728        );
729        let mut local_stats = LocalPickerStatistic::default();
730        assert!(
731            picker
732                .pick_compaction(&levels, &levels_handler, &mut local_stats)
733                .is_none()
734        );
735
736        // pick_l0_to_base_level
737        let mut picker =
738            ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 1);
739        let mut expected = [vec![5, 6], vec![7, 8], vec![9, 10]];
740        expected.reverse();
741        let result = picker
742            .pick_compaction(&levels, &levels_handler, &mut local_stats)
743            .unwrap();
744        assert_eq!(result.input_levels.len(), 4);
745        assert!(is_l0_to_lbase(&result));
746        assert_eq!(result.target_level, 1);
747        for (l, e) in expected.iter().enumerate().take(3) {
748            assert_eq!(
749                result.input_levels[l]
750                    .table_infos
751                    .iter()
752                    .map(|s| s.sst_id)
753                    .collect_vec(),
754                *e
755            );
756        }
757        assert_eq!(
758            result.input_levels[3].table_infos,
759            vec![levels.levels[0].table_infos[0].clone()]
760        );
761
762        // pick_l0_to_base_level, filtered by key_range
763        let option = ManualCompactionOption {
764            sst_ids: vec![],
765            level: 0,
766            key_range: KeyRange {
767                left: Bytes::from(iterator_test_key_of_epoch(1, 0, 2)),
768                right: Bytes::from(iterator_test_key_of_epoch(1, 200, 2)),
769                right_exclusive: false,
770            },
771            internal_table_id: HashSet::default(),
772            target_level: None,
773            exclusive: false,
774        };
775        let mut picker =
776            ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 1);
777        let mut expected = [vec![5, 6], vec![7, 8]];
778        expected.reverse();
779        let result = picker
780            .pick_compaction(&levels, &levels_handler, &mut local_stats)
781            .unwrap();
782        assert_eq!(result.input_levels.len(), 3);
783        assert!(is_l0_to_lbase(&result));
784        assert_eq!(result.target_level, 1);
785        for (l, e) in expected.iter().enumerate().take(2) {
786            assert_eq!(
787                result.input_levels[l]
788                    .table_infos
789                    .iter()
790                    .map(|s| s.sst_id)
791                    .collect_vec(),
792                *e
793            );
794        }
795        assert_eq!(
796            result.input_levels[2].table_infos,
797            vec![levels.levels[0].table_infos[0].clone()]
798        );
799    }
800
801    #[test]
802    fn test_l0_to_l0_option_sst_ids() {
803        let (levels, levels_handler) = generate_test_levels();
804        // (input_level, sst_id_filter, expected_result_input_level_ssts)
805        let sst_id_filters = vec![
806            (0, vec![6], vec![vec![5, 6]]),
807            (0, vec![7], vec![vec![7, 8]]),
808            (0, vec![9], vec![vec![9, 10]]),
809            (0, vec![6, 9], vec![vec![5, 6], vec![7, 8], vec![9, 10]]),
810            (0, vec![8, 9], vec![vec![7, 8], vec![9, 10]]),
811            (0, vec![6, 8, 9], vec![vec![5, 6], vec![7, 8], vec![9, 10]]),
812        ];
813        let mut local_stats = LocalPickerStatistic::default();
814        for (input_level, sst_id_filter, expected) in &sst_id_filters {
815            let expected = expected.iter().rev().cloned().collect_vec();
816            let option = ManualCompactionOption {
817                sst_ids: sst_id_filter.iter().cloned().map(Into::into).collect(),
818                level: *input_level as _,
819                key_range: KeyRange {
820                    left: Bytes::default(),
821                    right: Bytes::default(),
822                    right_exclusive: false,
823                },
824                internal_table_id: HashSet::default(),
825                target_level: None,
826                exclusive: false,
827            };
828            let mut picker = ManualCompactionPicker::new(
829                Arc::new(RangeOverlapStrategy::default()),
830                option.clone(),
831                // l0 to l0 will ignore target_level
832                input_level + 1,
833            );
834            let result = picker
835                .pick_compaction(&levels, &levels_handler, &mut local_stats)
836                .unwrap();
837            assert!(is_l0_to_l0(&result));
838            assert_eq!(result.input_levels.len(), expected.len());
839            for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
840                assert_eq!(
841                    result.input_levels[i]
842                        .table_infos
843                        .iter()
844                        .map(|s| s.sst_id)
845                        .collect_vec(),
846                    *e
847                );
848            }
849        }
850    }
851
852    #[test]
853    fn test_l0_to_lbase_option_internal_table() {
854        let (levels, mut levels_handler) = generate_test_levels();
855        let input_level = 0;
856        let target_level = input_level + 1;
857        let mut local_stats = LocalPickerStatistic::default();
858        {
859            let option = ManualCompactionOption {
860                sst_ids: vec![],
861                level: input_level,
862                key_range: KeyRange {
863                    left: Bytes::default(),
864                    right: Bytes::default(),
865                    right_exclusive: false,
866                },
867                // No matching internal table id.
868                internal_table_id: HashSet::from([100.into()]),
869                target_level: None,
870                exclusive: false,
871            };
872            let mut picker = ManualCompactionPicker::new(
873                Arc::new(RangeOverlapStrategy::default()),
874                option,
875                target_level,
876            );
877            assert!(
878                picker
879                    .pick_compaction(&levels, &levels_handler, &mut local_stats)
880                    .is_none()
881            )
882        }
883
884        {
885            let option = ManualCompactionOption {
886                sst_ids: vec![],
887                level: input_level,
888                key_range: KeyRange {
889                    left: Bytes::default(),
890                    right: Bytes::default(),
891                    right_exclusive: false,
892                },
893                // Include all sub level's table ids
894                internal_table_id: HashSet::from([1.into(), 2.into(), 3.into()]),
895                target_level: None,
896                exclusive: false,
897            };
898            let mut picker = ManualCompactionPicker::new(
899                Arc::new(RangeOverlapStrategy::default()),
900                option,
901                target_level,
902            );
903            let result = picker
904                .pick_compaction(&levels, &levels_handler, &mut local_stats)
905                .unwrap();
906            assert_eq!(result.input_levels.len(), 4);
907            assert!(is_l0_to_lbase(&result));
908            assert_eq!(result.target_level, 1);
909            assert!(is_l0_to_lbase(&result));
910            assert_eq!(
911                result
912                    .input_levels
913                    .iter()
914                    .take(3)
915                    .flat_map(|s| s.table_infos.clone())
916                    .map(|s| s.sst_id)
917                    .collect_vec(),
918                vec![9, 10, 7, 8, 5, 6]
919            );
920            assert_eq!(
921                result.input_levels[3]
922                    .table_infos
923                    .iter()
924                    .map(|s| s.sst_id)
925                    .collect_vec(),
926                vec![3]
927            );
928        }
929
930        {
931            let option = ManualCompactionOption {
932                sst_ids: vec![],
933                level: input_level,
934                key_range: KeyRange {
935                    left: Bytes::default(),
936                    right: Bytes::default(),
937                    right_exclusive: false,
938                },
939                // Only include bottom sub level's table id
940                internal_table_id: HashSet::from([3.into()]),
941                target_level: None,
942                exclusive: false,
943            };
944            let mut picker = ManualCompactionPicker::new(
945                Arc::new(RangeOverlapStrategy::default()),
946                option,
947                target_level,
948            );
949            let result = picker
950                .pick_compaction(&levels, &levels_handler, &mut local_stats)
951                .unwrap();
952            assert_eq!(result.input_levels.len(), 4);
953            assert!(is_l0_to_lbase(&result));
954            assert_eq!(
955                result
956                    .input_levels
957                    .iter()
958                    .take(3)
959                    .flat_map(|s| s.table_infos.clone())
960                    .map(|s| s.sst_id)
961                    .collect_vec(),
962                vec![9, 10, 7, 8, 5, 6]
963            );
964            assert_eq!(
965                result.input_levels[3]
966                    .table_infos
967                    .iter()
968                    .map(|s| s.sst_id)
969                    .collect_vec(),
970                vec![3]
971            );
972            assert_eq!(result.target_level, 1);
973        }
974
975        {
976            let option = ManualCompactionOption {
977                sst_ids: vec![],
978                level: input_level,
979                key_range: KeyRange {
980                    left: Bytes::default(),
981                    right: Bytes::default(),
982                    right_exclusive: false,
983                },
984                // Only include partial top sub level's table id, but the whole top sub level is
985                // picked.
986                internal_table_id: HashSet::from([1.into()]),
987                target_level: None,
988                exclusive: false,
989            };
990            let mut picker = ManualCompactionPicker::new(
991                Arc::new(RangeOverlapStrategy::default()),
992                option,
993                target_level,
994            );
995            let result = picker
996                .pick_compaction(&levels, &levels_handler, &mut local_stats)
997                .unwrap();
998            result.add_pending_task(0, &mut levels_handler);
999            assert_eq!(result.input_levels.len(), 2);
1000            assert!(is_l0_to_lbase(&result));
1001            assert_eq!(result.target_level, 1);
1002            assert_eq!(
1003                result
1004                    .input_levels
1005                    .iter()
1006                    .take(1)
1007                    .flat_map(|s| s.table_infos.clone())
1008                    .map(|s| s.sst_id)
1009                    .collect_vec(),
1010                vec![5, 6]
1011            );
1012            assert_eq!(
1013                result.input_levels[1]
1014                    .table_infos
1015                    .iter()
1016                    .map(|s| s.sst_id)
1017                    .collect_vec(),
1018                vec![3]
1019            );
1020
1021            // Pick bottom sub level while top sub level is pending
1022            let option = ManualCompactionOption {
1023                sst_ids: vec![],
1024                level: input_level,
1025                key_range: KeyRange {
1026                    left: Bytes::default(),
1027                    right: Bytes::default(),
1028                    right_exclusive: false,
1029                },
1030                // Only include bottom sub level's table id
1031                internal_table_id: HashSet::from([3.into()]),
1032                target_level: None,
1033                exclusive: false,
1034            };
1035            let mut picker = ManualCompactionPicker::new(
1036                Arc::new(RangeOverlapStrategy::default()),
1037                option,
1038                target_level,
1039            );
1040            // Because top sub-level is pending.
1041            assert!(
1042                picker
1043                    .pick_compaction(&levels, &levels_handler, &mut local_stats)
1044                    .is_none()
1045            );
1046
1047            clean_task_state(&mut levels_handler[0]);
1048            clean_task_state(&mut levels_handler[1]);
1049        }
1050    }
1051
1052    #[test]
1053    fn test_ln_to_lnext_option_internal_table() {
1054        let (levels, levels_handler) = generate_test_levels();
1055        let input_level = 1;
1056        let target_level = input_level + 1;
1057        let mut local_stats = LocalPickerStatistic::default();
1058        {
1059            let option = ManualCompactionOption {
1060                sst_ids: vec![],
1061                level: input_level,
1062                key_range: KeyRange {
1063                    left: Bytes::default(),
1064                    right: Bytes::default(),
1065                    right_exclusive: false,
1066                },
1067                // No matching internal table id.
1068                internal_table_id: HashSet::from([100.into()]),
1069                target_level: None,
1070                exclusive: false,
1071            };
1072            let mut picker = ManualCompactionPicker::new(
1073                Arc::new(RangeOverlapStrategy::default()),
1074                option,
1075                target_level,
1076            );
1077            assert!(
1078                picker
1079                    .pick_compaction(&levels, &levels_handler, &mut local_stats)
1080                    .is_none()
1081            )
1082        }
1083
1084        {
1085            let expected_input_level_sst_ids = [vec![4], vec![2]];
1086            let option = ManualCompactionOption {
1087                sst_ids: vec![],
1088                level: input_level,
1089                key_range: KeyRange {
1090                    left: Bytes::default(),
1091                    right: Bytes::default(),
1092                    right_exclusive: false,
1093                },
1094                // Only include partial input level's table id
1095                internal_table_id: HashSet::from([1.into()]),
1096                target_level: None,
1097                exclusive: false,
1098            };
1099            let mut picker = ManualCompactionPicker::new(
1100                Arc::new(RangeOverlapStrategy::default()),
1101                option,
1102                target_level,
1103            );
1104            let result = picker
1105                .pick_compaction(&levels, &levels_handler, &mut local_stats)
1106                .unwrap();
1107            assert_eq!(
1108                result.input_levels.len(),
1109                expected_input_level_sst_ids.len()
1110            );
1111            assert_eq!(result.target_level, target_level);
1112            for (l, e) in expected_input_level_sst_ids
1113                .iter()
1114                .enumerate()
1115                .take(result.input_levels.len())
1116            {
1117                assert_eq!(
1118                    result.input_levels[l]
1119                        .table_infos
1120                        .iter()
1121                        .map(|s| s.sst_id)
1122                        .collect_vec(),
1123                    *e
1124                );
1125            }
1126        }
1127    }
1128
1129    #[test]
1130    fn test_ln_to_lnext_option_sst_ids() {
1131        let (levels, levels_handler) = generate_test_levels();
1132        // (input_level, sst_id_filter, expected_result_input_level_ssts)
1133        let sst_id_filters = vec![
1134            (1, vec![3], vec![vec![3], vec![1]]),
1135            (1, vec![4], vec![vec![4], vec![2]]),
1136            (1, vec![3, 4], vec![vec![3, 4], vec![1, 2]]),
1137        ];
1138        let mut local_stats = LocalPickerStatistic::default();
1139        for (input_level, sst_id_filter, expected) in &sst_id_filters {
1140            let option = ManualCompactionOption {
1141                sst_ids: sst_id_filter.iter().cloned().map(Into::into).collect(),
1142                level: *input_level as _,
1143                key_range: KeyRange {
1144                    left: Bytes::default(),
1145                    right: Bytes::default(),
1146                    right_exclusive: false,
1147                },
1148                internal_table_id: HashSet::default(),
1149                target_level: None,
1150                exclusive: false,
1151            };
1152            let mut picker = ManualCompactionPicker::new(
1153                Arc::new(RangeOverlapStrategy::default()),
1154                option.clone(),
1155                input_level + 1,
1156            );
1157            let result = picker
1158                .pick_compaction(&levels, &levels_handler, &mut local_stats)
1159                .unwrap();
1160            assert_eq!(result.input_levels.len(), expected.len());
1161            for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
1162                assert_eq!(
1163                    result.input_levels[i]
1164                        .table_infos
1165                        .iter()
1166                        .map(|s| s.sst_id)
1167                        .collect_vec(),
1168                    *e
1169                );
1170            }
1171        }
1172    }
1173
1174    #[test]
1175    fn test_ln_to_ln() {
1176        let (levels, levels_handler) = generate_intra_test_levels();
1177        // (input_level, sst_id_filter, expected_result_input_level_ssts)
1178        let sst_id_filters = vec![
1179            (1, vec![1], vec![vec![1], vec![]]),
1180            (1, vec![3], vec![vec![3], vec![]]),
1181            (1, vec![4], vec![vec![4], vec![]]),
1182            (1, vec![3, 4], vec![vec![3, 4], vec![]]),
1183            (1, vec![1, 4], vec![vec![1, 2, 3, 4], vec![]]),
1184            (1, vec![2, 4], vec![vec![2, 3, 4], vec![]]),
1185            (1, vec![1, 3], vec![vec![1, 2, 3], vec![]]),
1186        ];
1187        for (input_level, sst_id_filter, expected) in &sst_id_filters {
1188            let option = ManualCompactionOption {
1189                sst_ids: sst_id_filter.iter().cloned().map(Into::into).collect(),
1190                level: *input_level as _,
1191                key_range: KeyRange {
1192                    left: Bytes::default(),
1193                    right: Bytes::default(),
1194                    right_exclusive: false,
1195                },
1196                internal_table_id: HashSet::default(),
1197                target_level: None,
1198                exclusive: false,
1199            };
1200            let mut picker = ManualCompactionPicker::new(
1201                Arc::new(RangeOverlapStrategy::default()),
1202                option.clone(),
1203                *input_level as _,
1204            );
1205            let result = picker
1206                .pick_compaction(
1207                    &levels,
1208                    &levels_handler,
1209                    &mut LocalPickerStatistic::default(),
1210                )
1211                .unwrap();
1212            assert_eq!(result.input_levels.len(), expected.len());
1213            for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
1214                assert_eq!(
1215                    result.input_levels[i]
1216                        .table_infos
1217                        .iter()
1218                        .map(|s| s.sst_id)
1219                        .collect_vec(),
1220                    *e
1221                );
1222            }
1223        }
1224    }
1225
1226    #[test]
1227    fn test_manual_compaction_selector_l0() {
1228        let config = CompactionConfigBuilder::new().max_level(4).build();
1229        let group_config = CompactionGroup::new(1, config);
1230        let l0 = generate_l0_nonoverlapping_sublevels(vec![
1231            generate_table(0, 1, 0, 500, 1),
1232            generate_table(1, 1, 0, 500, 1),
1233        ]);
1234        assert_eq!(l0.sub_levels.len(), 2);
1235        let levels = vec![
1236            generate_level(1, vec![]),
1237            generate_level(2, vec![]),
1238            generate_level(3, vec![]),
1239            Level {
1240                level_idx: 4,
1241                level_type: LevelType::Nonoverlapping,
1242                table_infos: vec![
1243                    generate_table(2, 1, 0, 100, 1),
1244                    generate_table(3, 1, 101, 200, 1),
1245                    generate_table(4, 1, 222, 300, 1),
1246                ],
1247                ..Default::default()
1248            },
1249        ];
1250        assert_eq!(levels.len(), 4);
1251        let levels = Levels {
1252            levels,
1253            l0,
1254            ..Default::default()
1255        };
1256        let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
1257        let mut local_stats = LocalSelectorStatistic::default();
1258
1259        // pick_l0_to_sub_level
1260        {
1261            let option = ManualCompactionOption {
1262                sst_ids: [0, 1].iter().cloned().map(Into::into).collect(),
1263                key_range: KeyRange {
1264                    left: Bytes::default(),
1265                    right: Bytes::default(),
1266                    right_exclusive: false,
1267                },
1268                internal_table_id: HashSet::default(),
1269                level: 0,
1270                target_level: None,
1271                exclusive: false,
1272            };
1273            let mut selector = ManualCompactionSelector::new(option);
1274            let task = selector
1275                .pick_compaction(
1276                    1,
1277                    compaction_selector_context(
1278                        &group_config,
1279                        &levels,
1280                        &BTreeSet::new(),
1281                        &mut levels_handler,
1282                        &mut local_stats,
1283                        &HashMap::default(),
1284                        Arc::new(CompactionDeveloperConfig::default()),
1285                        &Default::default(),
1286                        &HummockVersionStateTableInfo::empty(),
1287                    ),
1288                )
1289                .unwrap();
1290            assert_compaction_task(&task, &levels_handler);
1291            assert_eq!(task.input.input_levels.len(), 2);
1292            assert_eq!(task.input.input_levels[0].level_idx, 0);
1293            assert_eq!(task.input.input_levels[1].level_idx, 0);
1294            assert_eq!(task.input.target_level, 0);
1295        }
1296
1297        for level_handler in &mut levels_handler {
1298            for pending_task_id in &level_handler.pending_tasks_ids() {
1299                level_handler.remove_task(*pending_task_id);
1300            }
1301        }
1302
1303        // pick_l0_to_base_level
1304        {
1305            let option = ManualCompactionOption {
1306                sst_ids: vec![],
1307                key_range: KeyRange {
1308                    left: Bytes::default(),
1309                    right: Bytes::default(),
1310                    right_exclusive: false,
1311                },
1312                internal_table_id: HashSet::default(),
1313                level: 0,
1314                target_level: None,
1315                exclusive: false,
1316            };
1317            let mut selector = ManualCompactionSelector::new(option);
1318            let task = selector
1319                .pick_compaction(
1320                    2,
1321                    compaction_selector_context(
1322                        &group_config,
1323                        &levels,
1324                        &BTreeSet::new(),
1325                        &mut levels_handler,
1326                        &mut local_stats,
1327                        &HashMap::default(),
1328                        Arc::new(CompactionDeveloperConfig::default()),
1329                        &Default::default(),
1330                        &HummockVersionStateTableInfo::empty(),
1331                    ),
1332                )
1333                .unwrap();
1334            assert_compaction_task(&task, &levels_handler);
1335            assert_eq!(task.input.input_levels.len(), 3);
1336            assert_eq!(task.input.input_levels[0].level_idx, 0);
1337            assert_eq!(task.input.input_levels[1].level_idx, 0);
1338            assert_eq!(task.input.input_levels[2].level_idx, 4);
1339            assert_eq!(task.input.target_level, 4);
1340        }
1341    }
1342
1343    #[test]
1344    fn test_manual_compaction_selector_rejects_l0_non_base_target_level() {
1345        let config = CompactionConfigBuilder::new().max_level(4).build();
1346        let group_config = CompactionGroup::new(1, config);
1347        let l0 = generate_l0_nonoverlapping_sublevels(vec![
1348            generate_table(0, 1, 0, 500, 1),
1349            generate_table(1, 1, 0, 500, 1),
1350        ]);
1351        let levels = Levels {
1352            levels: vec![
1353                generate_level(1, vec![]),
1354                generate_level(2, vec![]),
1355                generate_level(3, vec![generate_table(2, 1, 0, 500, 1)]),
1356                generate_level(4, vec![]),
1357            ],
1358            l0,
1359            ..Default::default()
1360        };
1361        let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
1362        let mut local_stats = LocalSelectorStatistic::default();
1363        let option = ManualCompactionOption {
1364            sst_ids: vec![],
1365            key_range: KeyRange {
1366                left: Bytes::default(),
1367                right: Bytes::default(),
1368                right_exclusive: false,
1369            },
1370            internal_table_id: HashSet::default(),
1371            level: 0,
1372            target_level: Some(4),
1373            exclusive: false,
1374        };
1375        let mut selector = ManualCompactionSelector::new(option);
1376
1377        assert!(
1378            selector
1379                .pick_compaction(
1380                    1,
1381                    compaction_selector_context(
1382                        &group_config,
1383                        &levels,
1384                        &BTreeSet::new(),
1385                        &mut levels_handler,
1386                        &mut local_stats,
1387                        &HashMap::default(),
1388                        Arc::new(CompactionDeveloperConfig::default()),
1389                        &Default::default(),
1390                        &HummockVersionStateTableInfo::empty(),
1391                    ),
1392                )
1393                .is_none()
1394        );
1395        assert_eq!(
1396            selector.validation_error(),
1397            Some("target_level for L0 must be 3, got 4")
1398        );
1399    }
1400
1401    /// tests `DynamicLevelSelector::manual_pick_compaction`
1402    #[test]
1403    fn test_manual_compaction_selector() {
1404        let config = CompactionConfigBuilder::new().max_level(4).build();
1405        let group_config = CompactionGroup::new(1, config);
1406        let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
1407        assert_eq!(l0.sub_levels.len(), 0);
1408        let levels = vec![
1409            generate_level(1, vec![]),
1410            generate_level(2, vec![]),
1411            generate_level(
1412                3,
1413                vec![
1414                    generate_table(0, 1, 150, 151, 1),
1415                    generate_table(1, 1, 250, 251, 1),
1416                ],
1417            ),
1418            Level {
1419                level_idx: 4,
1420                level_type: LevelType::Nonoverlapping,
1421                table_infos: vec![
1422                    generate_table(2, 1, 0, 100, 1),
1423                    generate_table(3, 1, 101, 200, 1),
1424                    generate_table(4, 1, 222, 300, 1),
1425                    generate_table(5, 1, 333, 400, 1),
1426                    generate_table(6, 1, 444, 500, 1),
1427                    generate_table(7, 1, 555, 600, 1),
1428                ],
1429                ..Default::default()
1430            },
1431        ];
1432        assert_eq!(levels.len(), 4);
1433        let levels = Levels {
1434            levels,
1435            l0,
1436            ..Default::default()
1437        };
1438        let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
1439        let mut local_stats = LocalSelectorStatistic::default();
1440
1441        // pick l3 -> l4
1442        {
1443            let option = ManualCompactionOption {
1444                sst_ids: [0, 1].iter().cloned().map(Into::into).collect(),
1445                key_range: KeyRange {
1446                    left: Bytes::default(),
1447                    right: Bytes::default(),
1448                    right_exclusive: false,
1449                },
1450                internal_table_id: HashSet::default(),
1451                level: 3,
1452                target_level: None,
1453                exclusive: false,
1454            };
1455            let mut selector = ManualCompactionSelector::new(option);
1456            let task = selector
1457                .pick_compaction(
1458                    1,
1459                    compaction_selector_context(
1460                        &group_config,
1461                        &levels,
1462                        &BTreeSet::new(),
1463                        &mut levels_handler,
1464                        &mut local_stats,
1465                        &HashMap::default(),
1466                        Arc::new(CompactionDeveloperConfig::default()),
1467                        &Default::default(),
1468                        &HummockVersionStateTableInfo::empty(),
1469                    ),
1470                )
1471                .unwrap();
1472            assert_compaction_task(&task, &levels_handler);
1473            assert_eq!(task.input.input_levels.len(), 2);
1474            assert_eq!(task.input.input_levels[0].level_idx, 3);
1475            assert_eq!(task.input.input_levels[0].table_infos.len(), 2);
1476            assert_eq!(task.input.input_levels[1].level_idx, 4);
1477            assert_eq!(task.input.input_levels[1].table_infos.len(), 2);
1478            assert_eq!(task.input.target_level, 4);
1479        }
1480
1481        for level_handler in &mut levels_handler {
1482            for pending_task_id in &level_handler.pending_tasks_ids() {
1483                level_handler.remove_task(*pending_task_id);
1484            }
1485        }
1486
1487        // pick l3 -> l3
1488        {
1489            let option = ManualCompactionOption {
1490                sst_ids: [0, 1].iter().cloned().map(Into::into).collect(),
1491                key_range: KeyRange {
1492                    left: Bytes::default(),
1493                    right: Bytes::default(),
1494                    right_exclusive: false,
1495                },
1496                internal_table_id: HashSet::default(),
1497                level: 3,
1498                target_level: Some(3),
1499                exclusive: false,
1500            };
1501            let mut selector = ManualCompactionSelector::new(option);
1502            let task = selector
1503                .pick_compaction(
1504                    2,
1505                    compaction_selector_context(
1506                        &group_config,
1507                        &levels,
1508                        &BTreeSet::new(),
1509                        &mut levels_handler,
1510                        &mut local_stats,
1511                        &HashMap::default(),
1512                        Arc::new(CompactionDeveloperConfig::default()),
1513                        &Default::default(),
1514                        &HummockVersionStateTableInfo::empty(),
1515                    ),
1516                )
1517                .unwrap();
1518            assert_compaction_task(&task, &levels_handler);
1519            assert_eq!(task.input.input_levels.len(), 2);
1520            assert_eq!(task.input.input_levels[0].level_idx, 3);
1521            assert_eq!(task.input.input_levels[0].table_infos.len(), 2);
1522            assert_eq!(task.input.input_levels[1].level_idx, 3);
1523            assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
1524            assert_eq!(task.input.target_level, 3);
1525        }
1526
1527        for level_handler in &mut levels_handler {
1528            for pending_task_id in &level_handler.pending_tasks_ids() {
1529                level_handler.remove_task(*pending_task_id);
1530            }
1531        }
1532
1533        // pick l4 -> l4
1534        {
1535            let option = ManualCompactionOption {
1536                sst_ids: vec![],
1537                key_range: KeyRange {
1538                    left: Bytes::default(),
1539                    right: Bytes::default(),
1540                    right_exclusive: false,
1541                },
1542                internal_table_id: HashSet::default(),
1543                level: 4,
1544                target_level: None,
1545                exclusive: false,
1546            };
1547            let mut selector = ManualCompactionSelector::new(option);
1548            let task = selector
1549                .pick_compaction(
1550                    1,
1551                    compaction_selector_context(
1552                        &group_config,
1553                        &levels,
1554                        &BTreeSet::new(),
1555                        &mut levels_handler,
1556                        &mut local_stats,
1557                        &HashMap::default(),
1558                        Arc::new(CompactionDeveloperConfig::default()),
1559                        &Default::default(),
1560                        &HummockVersionStateTableInfo::empty(),
1561                    ),
1562                )
1563                .unwrap();
1564            assert_compaction_task(&task, &levels_handler);
1565            assert_eq!(task.input.input_levels.len(), 2);
1566            assert_eq!(task.input.input_levels[0].level_idx, 4);
1567            assert_eq!(task.input.input_levels[0].table_infos.len(), 6);
1568            assert_eq!(task.input.input_levels[1].level_idx, 4);
1569            assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
1570            assert_eq!(task.input.target_level, 4);
1571            assert!(matches!(
1572                task.compaction_task_type,
1573                compact_task::TaskType::Manual
1574            ));
1575        }
1576    }
1577}