risingwave_meta/hummock/compaction/picker/
manual_compaction_picker.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_hummock_sdk::HummockSstableId;
20use risingwave_hummock_sdk::level::{InputLevel, Level, Levels, OverlappingLevel};
21use risingwave_hummock_sdk::sstable_info::SstableInfo;
22use risingwave_pb::hummock::LevelType;
23
24use super::{CompactionInput, CompactionPicker, LocalPickerStatistic};
25use crate::hummock::compaction::overlap_strategy::{
26    OverlapInfo, OverlapStrategy, RangeOverlapInfo,
27};
28use crate::hummock::compaction::selector::ManualCompactionOption;
29use crate::hummock::level_handler::LevelHandler;
30
31pub struct ManualCompactionPicker {
32    overlap_strategy: Arc<dyn OverlapStrategy>,
33    option: ManualCompactionOption,
34    target_level: usize,
35}
36
37impl ManualCompactionPicker {
38    pub fn new(
39        overlap_strategy: Arc<dyn OverlapStrategy>,
40        option: ManualCompactionOption,
41        target_level: usize,
42    ) -> Self {
43        Self {
44            overlap_strategy,
45            option,
46            target_level,
47        }
48    }
49
50    fn pick_l0_to_sub_level(
51        &self,
52        l0: &OverlappingLevel,
53        level_handlers: &[LevelHandler],
54    ) -> Option<CompactionInput> {
55        assert_eq!(self.option.level, 0);
56        let mut input_levels = vec![];
57        let mut sub_level_id = 0;
58        let mut start_idx = None;
59        let mut end_idx = None;
60        // Decides the range of sub levels as input.
61        // We need pick consecutive sub_levels. See #5217.
62        for (idx, level) in l0.sub_levels.iter().enumerate() {
63            if !self.filter_level_by_option(level) {
64                continue;
65            }
66            if level_handlers[0].is_level_pending_compact(level) {
67                return None;
68            }
69            // Pick this sub_level.
70            if start_idx.is_none() {
71                sub_level_id = level.sub_level_id;
72                start_idx = Some(idx as u64);
73                end_idx = start_idx;
74            } else {
75                end_idx = Some(idx as u64);
76            }
77        }
78        let (start_idx, end_idx) = match (start_idx, end_idx) {
79            (Some(start_idx), Some(end_idx)) => (start_idx, end_idx),
80            _ => {
81                return None;
82            }
83        };
84        // Construct input.
85        for level in l0
86            .sub_levels
87            .iter()
88            .skip(start_idx as usize)
89            .take((end_idx - start_idx + 1) as usize)
90        {
91            input_levels.push(InputLevel {
92                level_idx: 0,
93                level_type: level.level_type,
94                table_infos: level.table_infos.clone(),
95            });
96        }
97        if input_levels.is_empty() {
98            return None;
99        }
100        input_levels.reverse();
101        Some(CompactionInput {
102            input_levels,
103            target_level: 0,
104            target_sub_level_id: sub_level_id,
105            ..Default::default()
106        })
107    }
108
109    fn pick_l0_to_base_level(
110        &self,
111        levels: &Levels,
112        level_handlers: &[LevelHandler],
113    ) -> Option<CompactionInput> {
114        assert!(self.option.level == 0 && self.target_level > 0);
115        for l in 1..self.target_level {
116            assert!(levels.levels[l - 1].table_infos.is_empty());
117        }
118        let l0 = &levels.l0;
119        let mut input_levels = vec![];
120        let mut max_sub_level_idx = usize::MAX;
121        let mut info = self.overlap_strategy.create_overlap_info();
122        // Decides the range of sub levels as input.
123        for (idx, level) in l0.sub_levels.iter().enumerate() {
124            if !self.filter_level_by_option(level) {
125                continue;
126            }
127            if level_handlers[0].is_level_pending_compact(level) {
128                return None;
129            }
130
131            // Pick this sub_level.
132            max_sub_level_idx = idx;
133        }
134        if max_sub_level_idx == usize::MAX {
135            return None;
136        }
137        // Construct input.
138        for idx in 0..=max_sub_level_idx {
139            for table in &l0.sub_levels[idx].table_infos {
140                info.update(&table.key_range);
141            }
142            input_levels.push(InputLevel {
143                level_idx: 0,
144                level_type: l0.sub_levels[idx].level_type,
145                table_infos: l0.sub_levels[idx].table_infos.clone(),
146            })
147        }
148        let target_input_ssts_range =
149            info.check_multiple_overlap(&levels.levels[self.target_level - 1].table_infos);
150        let target_input_ssts = if target_input_ssts_range.is_empty() {
151            vec![]
152        } else {
153            levels.levels[self.target_level - 1].table_infos[target_input_ssts_range].to_vec()
154        };
155        if target_input_ssts
156            .iter()
157            .any(|table| level_handlers[self.target_level].is_pending_compact(&table.sst_id))
158        {
159            return None;
160        }
161        if input_levels.is_empty() {
162            return None;
163        }
164        input_levels.reverse();
165        input_levels.push(InputLevel {
166            level_idx: self.target_level as u32,
167            level_type: LevelType::Nonoverlapping,
168            table_infos: target_input_ssts,
169        });
170
171        Some(CompactionInput {
172            input_levels,
173            target_level: self.target_level,
174            target_sub_level_id: 0,
175            ..Default::default()
176        })
177    }
178
179    /// Returns false if the given `sst` is rejected by filter defined by `option`.
180    /// Otherwise returns true.
181    fn filter_level_by_option(&self, level: &Level) -> bool {
182        let mut hint_sst_ids: HashSet<HummockSstableId> = HashSet::new();
183        hint_sst_ids.extend(self.option.sst_ids.iter());
184        if self
185            .overlap_strategy
186            .check_overlap_with_range(&self.option.key_range, &level.table_infos)
187            .is_empty()
188        {
189            return false;
190        }
191        if !hint_sst_ids.is_empty()
192            && !level
193                .table_infos
194                .iter()
195                .any(|t| hint_sst_ids.contains(&t.sst_id))
196        {
197            return false;
198        }
199        if !self.option.internal_table_id.is_empty()
200            && !level.table_infos.iter().any(|sst_info| {
201                sst_info
202                    .table_ids
203                    .iter()
204                    .any(|t| self.option.internal_table_id.contains(t))
205            })
206        {
207            return false;
208        }
209        true
210    }
211}
212
213impl CompactionPicker for ManualCompactionPicker {
214    fn pick_compaction(
215        &mut self,
216        levels: &Levels,
217        level_handlers: &[LevelHandler],
218        _stats: &mut LocalPickerStatistic,
219    ) -> Option<CompactionInput> {
220        if self.option.exclusive
221            && level_handlers
222                .iter()
223                .any(|level_handler| level_handler.pending_file_count() > 0)
224        {
225            return None;
226        }
227        if self.option.level == 0 {
228            if !self.option.sst_ids.is_empty() {
229                return self.pick_l0_to_sub_level(&levels.l0, level_handlers);
230            } else if self.target_level > 0 {
231                return self.pick_l0_to_base_level(levels, level_handlers);
232            } else {
233                return None;
234            }
235        }
236        let mut hint_sst_ids: HashSet<HummockSstableId> = HashSet::new();
237        hint_sst_ids.extend(self.option.sst_ids.iter());
238        let mut range_overlap_info = RangeOverlapInfo::default();
239        range_overlap_info.update(&self.option.key_range);
240        let level = self.option.level;
241        let target_level = self.target_level;
242        assert!(
243            self.option.level == self.target_level || self.option.level + 1 == self.target_level
244        );
245        // We either include all `select_input_ssts` as input, or return None.
246        let mut select_input_ssts: Vec<SstableInfo> = levels
247            .get_level(self.option.level)
248            .table_infos
249            .iter()
250            .filter(|sst_info| hint_sst_ids.is_empty() || hint_sst_ids.contains(&sst_info.sst_id))
251            .filter(|sst_info| range_overlap_info.check_overlap(sst_info))
252            .filter(|sst_info| {
253                if self.option.internal_table_id.is_empty() {
254                    return true;
255                }
256
257                // to filter sst_file by table_id
258                for table_id in &sst_info.table_ids {
259                    if self.option.internal_table_id.contains(table_id) {
260                        return true;
261                    }
262                }
263                false
264            })
265            .cloned()
266            .collect();
267        if select_input_ssts.is_empty() {
268            return None;
269        }
270        let target_input_ssts = if target_level == level {
271            // For intra level compaction, input SSTs must be consecutive.
272            let (left, _) = levels
273                .get_level(level)
274                .table_infos
275                .iter()
276                .find_position(|p| p.sst_id == select_input_ssts.first().unwrap().sst_id)
277                .unwrap();
278            let (right, _) = levels
279                .get_level(level)
280                .table_infos
281                .iter()
282                .find_position(|p| p.sst_id == select_input_ssts.last().unwrap().sst_id)
283                .unwrap();
284            select_input_ssts = levels.get_level(level).table_infos[left..=right].to_vec();
285            vec![]
286        } else {
287            self.overlap_strategy.check_base_level_overlap(
288                &select_input_ssts,
289                &levels.get_level(target_level).table_infos,
290            )
291        };
292        if select_input_ssts
293            .iter()
294            .any(|table| level_handlers[level].is_pending_compact(&table.sst_id))
295        {
296            return None;
297        }
298        if target_input_ssts
299            .iter()
300            .any(|table| level_handlers[target_level].is_pending_compact(&table.sst_id))
301        {
302            return None;
303        }
304
305        Some(CompactionInput {
306            select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
307            target_input_size: target_input_ssts.iter().map(|sst| sst.sst_size).sum(),
308            total_file_count: (select_input_ssts.len() + target_input_ssts.len()) as u64,
309            input_levels: vec![
310                InputLevel {
311                    level_idx: level as u32,
312                    level_type: levels.levels[level - 1].level_type,
313                    table_infos: select_input_ssts,
314                },
315                InputLevel {
316                    level_idx: target_level as u32,
317                    level_type: levels.levels[target_level - 1].level_type,
318                    table_infos: target_input_ssts,
319                },
320            ],
321            target_level,
322            ..Default::default()
323        })
324    }
325}
326
327#[cfg(test)]
328pub mod tests {
329    use std::collections::{BTreeSet, HashMap};
330
331    use bytes::Bytes;
332    use risingwave_hummock_sdk::key_range::KeyRange;
333    use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
334    use risingwave_pb::hummock::compact_task;
335
336    use super::*;
337    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
338    use crate::hummock::compaction::overlap_strategy::RangeOverlapStrategy;
339    use crate::hummock::compaction::selector::tests::{
340        assert_compaction_task, generate_l0_nonoverlapping_sublevels,
341        generate_l0_overlapping_sublevels, generate_level, generate_table,
342    };
343    use crate::hummock::compaction::selector::{CompactionSelector, ManualCompactionSelector};
344    use crate::hummock::compaction::{CompactionDeveloperConfig, LocalSelectorStatistic};
345    use crate::hummock::model::CompactionGroup;
346    use crate::hummock::test_utils::{compaction_selector_context, iterator_test_key_of_epoch};
347
348    fn clean_task_state(level_handler: &mut LevelHandler) {
349        for pending_task_id in &level_handler.pending_tasks_ids() {
350            level_handler.remove_task(*pending_task_id);
351        }
352    }
353
354    fn is_l0_to_lbase(compaction_input: &CompactionInput) -> bool {
355        compaction_input
356            .input_levels
357            .iter()
358            .take(compaction_input.input_levels.len() - 1)
359            .all(|i| i.level_idx == 0)
360            && compaction_input
361                .input_levels
362                .iter()
363                .last()
364                .unwrap()
365                .level_idx as usize
366                == compaction_input.target_level
367            && compaction_input.target_level > 0
368    }
369
370    fn is_l0_to_l0(compaction_input: &CompactionInput) -> bool {
371        compaction_input
372            .input_levels
373            .iter()
374            .all(|i| i.level_idx == 0)
375            && compaction_input.target_level == 0
376    }
377
378    #[test]
379    fn test_manual_compaction_picker() {
380        let levels = vec![
381            Level {
382                level_idx: 1,
383                level_type: LevelType::Nonoverlapping,
384                table_infos: vec![
385                    generate_table(0, 1, 0, 100, 1),
386                    generate_table(1, 1, 101, 200, 1),
387                    generate_table(2, 1, 222, 300, 1),
388                ],
389                ..Default::default()
390            },
391            Level {
392                level_idx: 2,
393                level_type: LevelType::Nonoverlapping,
394                table_infos: vec![
395                    generate_table(4, 1, 0, 100, 1),
396                    generate_table(5, 1, 101, 150, 1),
397                    generate_table(6, 1, 151, 201, 1),
398                    generate_table(7, 1, 501, 800, 1),
399                    generate_table(8, 2, 301, 400, 1),
400                ],
401                ..Default::default()
402            },
403        ];
404        let mut levels = Levels {
405            levels,
406            l0: generate_l0_nonoverlapping_sublevels(vec![]),
407            ..Default::default()
408        };
409        let mut levels_handler = vec![
410            LevelHandler::new(0),
411            LevelHandler::new(1),
412            LevelHandler::new(2),
413        ];
414        let mut local_stats = LocalPickerStatistic::default();
415
416        {
417            // test key_range option
418            let option = ManualCompactionOption {
419                level: 1,
420                key_range: KeyRange {
421                    left: Bytes::from(iterator_test_key_of_epoch(1, 0, 1)),
422                    right: Bytes::from(iterator_test_key_of_epoch(1, 201, 1)),
423                    right_exclusive: false,
424                },
425                ..Default::default()
426            };
427
428            let target_level = option.level + 1;
429            let mut picker = ManualCompactionPicker::new(
430                Arc::new(RangeOverlapStrategy::default()),
431                option,
432                target_level,
433            );
434            let result = picker
435                .pick_compaction(&levels, &levels_handler, &mut local_stats)
436                .unwrap();
437            result.add_pending_task(0, &mut levels_handler);
438
439            assert_eq!(2, result.input_levels[0].table_infos.len());
440            assert_eq!(3, result.input_levels[1].table_infos.len());
441        }
442
443        {
444            clean_task_state(&mut levels_handler[1]);
445            clean_task_state(&mut levels_handler[2]);
446
447            // test all key range
448            let option = ManualCompactionOption::default();
449            let target_level = option.level + 1;
450            let mut picker = ManualCompactionPicker::new(
451                Arc::new(RangeOverlapStrategy::default()),
452                option,
453                target_level,
454            );
455            let result = picker
456                .pick_compaction(&levels, &levels_handler, &mut local_stats)
457                .unwrap();
458            result.add_pending_task(0, &mut levels_handler);
459
460            assert_eq!(3, result.input_levels[0].table_infos.len());
461            assert_eq!(3, result.input_levels[1].table_infos.len());
462        }
463
464        {
465            clean_task_state(&mut levels_handler[1]);
466            clean_task_state(&mut levels_handler[2]);
467
468            let level_table_info = &mut levels.levels[0].table_infos;
469            let table_info_1 = &mut level_table_info[1];
470            let mut t_inner = table_info_1.get_inner();
471            t_inner.table_ids.resize(2, 0.into());
472            t_inner.table_ids[0] = 1.into();
473            t_inner.table_ids[1] = 2.into();
474            *table_info_1 = t_inner.into();
475
476            // test internal_table_id
477            let option = ManualCompactionOption {
478                level: 1,
479                internal_table_id: HashSet::from([2.into()]),
480                ..Default::default()
481            };
482
483            let target_level = option.level + 1;
484            let mut picker = ManualCompactionPicker::new(
485                Arc::new(RangeOverlapStrategy::default()),
486                option,
487                target_level,
488            );
489
490            let result = picker
491                .pick_compaction(&levels, &levels_handler, &mut local_stats)
492                .unwrap();
493            result.add_pending_task(0, &mut levels_handler);
494
495            assert_eq!(1, result.input_levels[0].table_infos.len());
496            assert_eq!(2, result.input_levels[1].table_infos.len());
497        }
498
499        {
500            clean_task_state(&mut levels_handler[1]);
501            clean_task_state(&mut levels_handler[2]);
502
503            // include all table_info
504            let level_table_info = &mut levels.levels[0].table_infos;
505            for table_info in level_table_info {
506                let mut t_inner = table_info.get_inner();
507                t_inner.table_ids.resize(2, 0.into());
508                t_inner.table_ids[0] = 1.into();
509                t_inner.table_ids[1] = 2.into();
510                *table_info = t_inner.into();
511            }
512
513            // test key range filter first
514            let option = ManualCompactionOption {
515                sst_ids: vec![],
516                level: 1,
517                key_range: KeyRange {
518                    left: Bytes::from(iterator_test_key_of_epoch(1, 101, 1)),
519                    right: Bytes::from(iterator_test_key_of_epoch(1, 199, 1)),
520                    right_exclusive: false,
521                },
522                internal_table_id: HashSet::from([2.into()]),
523                exclusive: false,
524            };
525
526            let target_level = option.level + 1;
527            let mut picker = ManualCompactionPicker::new(
528                Arc::new(RangeOverlapStrategy::default()),
529                option,
530                target_level,
531            );
532
533            let result = picker
534                .pick_compaction(&levels, &levels_handler, &mut local_stats)
535                .unwrap();
536
537            assert_eq!(1, result.input_levels[0].table_infos.len());
538            assert_eq!(2, result.input_levels[1].table_infos.len());
539        }
540    }
541
542    #[test]
543    fn test_manual_compaction_exclusive_blocked_by_pending() {
544        let (levels, mut levels_handler) = generate_test_levels();
545        let option = ManualCompactionOption {
546            exclusive: true,
547            ..Default::default()
548        };
549        let target_level = option.level + 1;
550        let mut picker = ManualCompactionPicker::new(
551            Arc::new(RangeOverlapStrategy::default()),
552            option,
553            target_level,
554        );
555
556        let pending_sst_id = levels.levels[0].table_infos[0].sst_id;
557        levels_handler[1].test_add_pending_sst(pending_sst_id, 1);
558
559        assert!(
560            picker
561                .pick_compaction(
562                    &levels,
563                    &levels_handler,
564                    &mut LocalPickerStatistic::default()
565                )
566                .is_none()
567        );
568    }
569
570    fn generate_test_levels() -> (Levels, Vec<LevelHandler>) {
571        let mut l0 = generate_l0_overlapping_sublevels(vec![
572            vec![
573                generate_table(5, 1, 0, 500, 2),
574                generate_table(6, 2, 600, 1000, 2),
575            ],
576            vec![
577                generate_table(7, 1, 0, 500, 3),
578                generate_table(8, 2, 600, 1000, 3),
579            ],
580            vec![
581                generate_table(9, 1, 300, 500, 4),
582                generate_table(10, 2, 600, 1000, 4),
583            ],
584        ]);
585        // Set a nonoverlapping sub_level.
586        l0.sub_levels[1].level_type = LevelType::Nonoverlapping as _;
587        assert_eq!(l0.sub_levels.len(), 3);
588        let mut levels = vec![
589            Level {
590                level_idx: 1,
591                level_type: LevelType::Nonoverlapping,
592                table_infos: vec![
593                    generate_table(3, 1, 0, 100, 1),
594                    generate_table(4, 2, 2000, 3000, 1),
595                ],
596                ..Default::default()
597            },
598            Level {
599                level_idx: 2,
600                level_type: LevelType::Nonoverlapping,
601                table_infos: vec![
602                    generate_table(1, 1, 0, 100, 1),
603                    generate_table(2, 2, 2000, 3000, 1),
604                ],
605                ..Default::default()
606            },
607        ];
608        // Set internal_table_ids.
609        assert_eq!(levels.len(), 2);
610        for iter in [l0.sub_levels.iter_mut(), levels.iter_mut()] {
611            for (idx, l) in iter.enumerate() {
612                for t in &mut l.table_infos {
613                    let mut t_inner = t.get_inner();
614                    t_inner.table_ids.clear();
615                    if idx == 0 {
616                        t_inner
617                            .table_ids
618                            .push((((t.sst_id.as_raw_id() % 2) + 1) as u32).into());
619                    } else {
620                        t_inner.table_ids.push(3.into());
621                    }
622                    *t = t_inner.into();
623                }
624            }
625        }
626        let levels = Levels {
627            levels,
628            l0,
629            ..Default::default()
630        };
631
632        let levels_handler = vec![
633            LevelHandler::new(0),
634            LevelHandler::new(1),
635            LevelHandler::new(2),
636        ];
637        (levels, levels_handler)
638    }
639
640    fn generate_intra_test_levels() -> (Levels, Vec<LevelHandler>) {
641        let l0 = generate_l0_overlapping_sublevels(vec![]);
642        let levels = vec![Level {
643            level_idx: 1,
644            level_type: LevelType::Nonoverlapping,
645            table_infos: vec![
646                generate_table(1, 1, 0, 100, 1),
647                generate_table(2, 2, 100, 200, 1),
648                generate_table(3, 2, 200, 300, 1),
649                generate_table(4, 2, 300, 400, 1),
650            ],
651            ..Default::default()
652        }];
653        let levels = Levels {
654            levels,
655            l0,
656            ..Default::default()
657        };
658
659        let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
660        (levels, levels_handler)
661    }
662
663    #[test]
664    fn test_l0_empty() {
665        let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
666        let levels = vec![Level {
667            level_idx: 1,
668            level_type: LevelType::Nonoverlapping,
669            table_infos: vec![],
670            total_file_size: 0,
671            sub_level_id: 0,
672            uncompressed_file_size: 0,
673            ..Default::default()
674        }];
675        let levels = Levels {
676            levels,
677            l0,
678            ..Default::default()
679        };
680        let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
681        let option = ManualCompactionOption {
682            sst_ids: vec![1.into()],
683            level: 0,
684            key_range: KeyRange {
685                left: Bytes::default(),
686                right: Bytes::default(),
687                right_exclusive: false,
688            },
689            internal_table_id: HashSet::default(),
690            exclusive: false,
691        };
692        let mut picker =
693            ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 0);
694        assert!(
695            picker
696                .pick_compaction(
697                    &levels,
698                    &levels_handler,
699                    &mut LocalPickerStatistic::default()
700                )
701                .is_none()
702        );
703    }
704
705    #[test]
706    fn test_l0_basic() {
707        let (levels, levels_handler) = generate_test_levels();
708
709        // target_level == 0, None
710        let option = ManualCompactionOption {
711            sst_ids: vec![],
712            level: 0,
713            key_range: KeyRange {
714                left: Bytes::default(),
715                right: Bytes::default(),
716                right_exclusive: false,
717            },
718            internal_table_id: HashSet::default(),
719            exclusive: false,
720        };
721        let mut picker = ManualCompactionPicker::new(
722            Arc::new(RangeOverlapStrategy::default()),
723            option.clone(),
724            0,
725        );
726        let mut local_stats = LocalPickerStatistic::default();
727        assert!(
728            picker
729                .pick_compaction(&levels, &levels_handler, &mut local_stats)
730                .is_none()
731        );
732
733        // pick_l0_to_base_level
734        let mut picker =
735            ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 1);
736        let mut expected = [vec![5, 6], vec![7, 8], vec![9, 10]];
737        expected.reverse();
738        let result = picker
739            .pick_compaction(&levels, &levels_handler, &mut local_stats)
740            .unwrap();
741        assert_eq!(result.input_levels.len(), 4);
742        assert!(is_l0_to_lbase(&result));
743        assert_eq!(result.target_level, 1);
744        for (l, e) in expected.iter().enumerate().take(3) {
745            assert_eq!(
746                result.input_levels[l]
747                    .table_infos
748                    .iter()
749                    .map(|s| s.sst_id)
750                    .collect_vec(),
751                *e
752            );
753        }
754        assert_eq!(
755            result.input_levels[3].table_infos,
756            vec![levels.levels[0].table_infos[0].clone()]
757        );
758
759        // pick_l0_to_base_level, filtered by key_range
760        let option = ManualCompactionOption {
761            sst_ids: vec![],
762            level: 0,
763            key_range: KeyRange {
764                left: Bytes::from(iterator_test_key_of_epoch(1, 0, 2)),
765                right: Bytes::from(iterator_test_key_of_epoch(1, 200, 2)),
766                right_exclusive: false,
767            },
768            internal_table_id: HashSet::default(),
769            exclusive: false,
770        };
771        let mut picker =
772            ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 1);
773        let mut expected = [vec![5, 6], vec![7, 8]];
774        expected.reverse();
775        let result = picker
776            .pick_compaction(&levels, &levels_handler, &mut local_stats)
777            .unwrap();
778        assert_eq!(result.input_levels.len(), 3);
779        assert!(is_l0_to_lbase(&result));
780        assert_eq!(result.target_level, 1);
781        for (l, e) in expected.iter().enumerate().take(2) {
782            assert_eq!(
783                result.input_levels[l]
784                    .table_infos
785                    .iter()
786                    .map(|s| s.sst_id)
787                    .collect_vec(),
788                *e
789            );
790        }
791        assert_eq!(
792            result.input_levels[2].table_infos,
793            vec![levels.levels[0].table_infos[0].clone()]
794        );
795    }
796
797    #[test]
798    fn test_l0_to_l0_option_sst_ids() {
799        let (levels, levels_handler) = generate_test_levels();
800        // (input_level, sst_id_filter, expected_result_input_level_ssts)
801        let sst_id_filters = vec![
802            (0, vec![6], vec![vec![5, 6]]),
803            (0, vec![7], vec![vec![7, 8]]),
804            (0, vec![9], vec![vec![9, 10]]),
805            (0, vec![6, 9], vec![vec![5, 6], vec![7, 8], vec![9, 10]]),
806            (0, vec![8, 9], vec![vec![7, 8], vec![9, 10]]),
807            (0, vec![6, 8, 9], vec![vec![5, 6], vec![7, 8], vec![9, 10]]),
808        ];
809        let mut local_stats = LocalPickerStatistic::default();
810        for (input_level, sst_id_filter, expected) in &sst_id_filters {
811            let expected = expected.iter().rev().cloned().collect_vec();
812            let option = ManualCompactionOption {
813                sst_ids: sst_id_filter.iter().cloned().map(Into::into).collect(),
814                level: *input_level as _,
815                key_range: KeyRange {
816                    left: Bytes::default(),
817                    right: Bytes::default(),
818                    right_exclusive: false,
819                },
820                internal_table_id: HashSet::default(),
821                exclusive: false,
822            };
823            let mut picker = ManualCompactionPicker::new(
824                Arc::new(RangeOverlapStrategy::default()),
825                option.clone(),
826                // l0 to l0 will ignore target_level
827                input_level + 1,
828            );
829            let result = picker
830                .pick_compaction(&levels, &levels_handler, &mut local_stats)
831                .unwrap();
832            assert!(is_l0_to_l0(&result));
833            assert_eq!(result.input_levels.len(), expected.len());
834            for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
835                assert_eq!(
836                    result.input_levels[i]
837                        .table_infos
838                        .iter()
839                        .map(|s| s.sst_id)
840                        .collect_vec(),
841                    *e
842                );
843            }
844        }
845    }
846
847    #[test]
848    fn test_l0_to_lbase_option_internal_table() {
849        let (levels, mut levels_handler) = generate_test_levels();
850        let input_level = 0;
851        let target_level = input_level + 1;
852        let mut local_stats = LocalPickerStatistic::default();
853        {
854            let option = ManualCompactionOption {
855                sst_ids: vec![],
856                level: input_level,
857                key_range: KeyRange {
858                    left: Bytes::default(),
859                    right: Bytes::default(),
860                    right_exclusive: false,
861                },
862                // No matching internal table id.
863                internal_table_id: HashSet::from([100.into()]),
864                exclusive: false,
865            };
866            let mut picker = ManualCompactionPicker::new(
867                Arc::new(RangeOverlapStrategy::default()),
868                option,
869                target_level,
870            );
871            assert!(
872                picker
873                    .pick_compaction(&levels, &levels_handler, &mut local_stats)
874                    .is_none()
875            )
876        }
877
878        {
879            let option = ManualCompactionOption {
880                sst_ids: vec![],
881                level: input_level,
882                key_range: KeyRange {
883                    left: Bytes::default(),
884                    right: Bytes::default(),
885                    right_exclusive: false,
886                },
887                // Include all sub level's table ids
888                internal_table_id: HashSet::from([1.into(), 2.into(), 3.into()]),
889                exclusive: false,
890            };
891            let mut picker = ManualCompactionPicker::new(
892                Arc::new(RangeOverlapStrategy::default()),
893                option,
894                target_level,
895            );
896            let result = picker
897                .pick_compaction(&levels, &levels_handler, &mut local_stats)
898                .unwrap();
899            assert_eq!(result.input_levels.len(), 4);
900            assert!(is_l0_to_lbase(&result));
901            assert_eq!(result.target_level, 1);
902            assert!(is_l0_to_lbase(&result));
903            assert_eq!(
904                result
905                    .input_levels
906                    .iter()
907                    .take(3)
908                    .flat_map(|s| s.table_infos.clone())
909                    .map(|s| s.sst_id)
910                    .collect_vec(),
911                vec![9, 10, 7, 8, 5, 6]
912            );
913            assert_eq!(
914                result.input_levels[3]
915                    .table_infos
916                    .iter()
917                    .map(|s| s.sst_id)
918                    .collect_vec(),
919                vec![3]
920            );
921        }
922
923        {
924            let option = ManualCompactionOption {
925                sst_ids: vec![],
926                level: input_level,
927                key_range: KeyRange {
928                    left: Bytes::default(),
929                    right: Bytes::default(),
930                    right_exclusive: false,
931                },
932                // Only include bottom sub level's table id
933                internal_table_id: HashSet::from([3.into()]),
934                exclusive: false,
935            };
936            let mut picker = ManualCompactionPicker::new(
937                Arc::new(RangeOverlapStrategy::default()),
938                option,
939                target_level,
940            );
941            let result = picker
942                .pick_compaction(&levels, &levels_handler, &mut local_stats)
943                .unwrap();
944            assert_eq!(result.input_levels.len(), 4);
945            assert!(is_l0_to_lbase(&result));
946            assert_eq!(
947                result
948                    .input_levels
949                    .iter()
950                    .take(3)
951                    .flat_map(|s| s.table_infos.clone())
952                    .map(|s| s.sst_id)
953                    .collect_vec(),
954                vec![9, 10, 7, 8, 5, 6]
955            );
956            assert_eq!(
957                result.input_levels[3]
958                    .table_infos
959                    .iter()
960                    .map(|s| s.sst_id)
961                    .collect_vec(),
962                vec![3]
963            );
964            assert_eq!(result.target_level, 1);
965        }
966
967        {
968            let option = ManualCompactionOption {
969                sst_ids: vec![],
970                level: input_level,
971                key_range: KeyRange {
972                    left: Bytes::default(),
973                    right: Bytes::default(),
974                    right_exclusive: false,
975                },
976                // Only include partial top sub level's table id, but the whole top sub level is
977                // picked.
978                internal_table_id: HashSet::from([1.into()]),
979                exclusive: false,
980            };
981            let mut picker = ManualCompactionPicker::new(
982                Arc::new(RangeOverlapStrategy::default()),
983                option,
984                target_level,
985            );
986            let result = picker
987                .pick_compaction(&levels, &levels_handler, &mut local_stats)
988                .unwrap();
989            result.add_pending_task(0, &mut levels_handler);
990            assert_eq!(result.input_levels.len(), 2);
991            assert!(is_l0_to_lbase(&result));
992            assert_eq!(result.target_level, 1);
993            assert_eq!(
994                result
995                    .input_levels
996                    .iter()
997                    .take(1)
998                    .flat_map(|s| s.table_infos.clone())
999                    .map(|s| s.sst_id)
1000                    .collect_vec(),
1001                vec![5, 6]
1002            );
1003            assert_eq!(
1004                result.input_levels[1]
1005                    .table_infos
1006                    .iter()
1007                    .map(|s| s.sst_id)
1008                    .collect_vec(),
1009                vec![3]
1010            );
1011
1012            // Pick bottom sub level while top sub level is pending
1013            let option = ManualCompactionOption {
1014                sst_ids: vec![],
1015                level: input_level,
1016                key_range: KeyRange {
1017                    left: Bytes::default(),
1018                    right: Bytes::default(),
1019                    right_exclusive: false,
1020                },
1021                // Only include bottom sub level's table id
1022                internal_table_id: HashSet::from([3.into()]),
1023                exclusive: false,
1024            };
1025            let mut picker = ManualCompactionPicker::new(
1026                Arc::new(RangeOverlapStrategy::default()),
1027                option,
1028                target_level,
1029            );
1030            // Because top sub-level is pending.
1031            assert!(
1032                picker
1033                    .pick_compaction(&levels, &levels_handler, &mut local_stats)
1034                    .is_none()
1035            );
1036
1037            clean_task_state(&mut levels_handler[0]);
1038            clean_task_state(&mut levels_handler[1]);
1039        }
1040    }
1041
1042    #[test]
1043    fn test_ln_to_lnext_option_internal_table() {
1044        let (levels, levels_handler) = generate_test_levels();
1045        let input_level = 1;
1046        let target_level = input_level + 1;
1047        let mut local_stats = LocalPickerStatistic::default();
1048        {
1049            let option = ManualCompactionOption {
1050                sst_ids: vec![],
1051                level: input_level,
1052                key_range: KeyRange {
1053                    left: Bytes::default(),
1054                    right: Bytes::default(),
1055                    right_exclusive: false,
1056                },
1057                // No matching internal table id.
1058                internal_table_id: HashSet::from([100.into()]),
1059                exclusive: false,
1060            };
1061            let mut picker = ManualCompactionPicker::new(
1062                Arc::new(RangeOverlapStrategy::default()),
1063                option,
1064                target_level,
1065            );
1066            assert!(
1067                picker
1068                    .pick_compaction(&levels, &levels_handler, &mut local_stats)
1069                    .is_none()
1070            )
1071        }
1072
1073        {
1074            let expected_input_level_sst_ids = [vec![4], vec![2]];
1075            let option = ManualCompactionOption {
1076                sst_ids: vec![],
1077                level: input_level,
1078                key_range: KeyRange {
1079                    left: Bytes::default(),
1080                    right: Bytes::default(),
1081                    right_exclusive: false,
1082                },
1083                // Only include partial input level's table id
1084                internal_table_id: HashSet::from([1.into()]),
1085                exclusive: false,
1086            };
1087            let mut picker = ManualCompactionPicker::new(
1088                Arc::new(RangeOverlapStrategy::default()),
1089                option,
1090                target_level,
1091            );
1092            let result = picker
1093                .pick_compaction(&levels, &levels_handler, &mut local_stats)
1094                .unwrap();
1095            assert_eq!(
1096                result.input_levels.len(),
1097                expected_input_level_sst_ids.len()
1098            );
1099            assert_eq!(result.target_level, target_level);
1100            for (l, e) in expected_input_level_sst_ids
1101                .iter()
1102                .enumerate()
1103                .take(result.input_levels.len())
1104            {
1105                assert_eq!(
1106                    result.input_levels[l]
1107                        .table_infos
1108                        .iter()
1109                        .map(|s| s.sst_id)
1110                        .collect_vec(),
1111                    *e
1112                );
1113            }
1114        }
1115    }
1116
1117    #[test]
1118    fn test_ln_to_lnext_option_sst_ids() {
1119        let (levels, levels_handler) = generate_test_levels();
1120        // (input_level, sst_id_filter, expected_result_input_level_ssts)
1121        let sst_id_filters = vec![
1122            (1, vec![3], vec![vec![3], vec![1]]),
1123            (1, vec![4], vec![vec![4], vec![2]]),
1124            (1, vec![3, 4], vec![vec![3, 4], vec![1, 2]]),
1125        ];
1126        let mut local_stats = LocalPickerStatistic::default();
1127        for (input_level, sst_id_filter, expected) in &sst_id_filters {
1128            let option = ManualCompactionOption {
1129                sst_ids: sst_id_filter.iter().cloned().map(Into::into).collect(),
1130                level: *input_level as _,
1131                key_range: KeyRange {
1132                    left: Bytes::default(),
1133                    right: Bytes::default(),
1134                    right_exclusive: false,
1135                },
1136                internal_table_id: HashSet::default(),
1137                exclusive: false,
1138            };
1139            let mut picker = ManualCompactionPicker::new(
1140                Arc::new(RangeOverlapStrategy::default()),
1141                option.clone(),
1142                input_level + 1,
1143            );
1144            let result = picker
1145                .pick_compaction(&levels, &levels_handler, &mut local_stats)
1146                .unwrap();
1147            assert_eq!(result.input_levels.len(), expected.len());
1148            for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
1149                assert_eq!(
1150                    result.input_levels[i]
1151                        .table_infos
1152                        .iter()
1153                        .map(|s| s.sst_id)
1154                        .collect_vec(),
1155                    *e
1156                );
1157            }
1158        }
1159    }
1160
1161    #[test]
1162    fn test_ln_to_ln() {
1163        let (levels, levels_handler) = generate_intra_test_levels();
1164        // (input_level, sst_id_filter, expected_result_input_level_ssts)
1165        let sst_id_filters = vec![
1166            (1, vec![1], vec![vec![1], vec![]]),
1167            (1, vec![3], vec![vec![3], vec![]]),
1168            (1, vec![4], vec![vec![4], vec![]]),
1169            (1, vec![3, 4], vec![vec![3, 4], vec![]]),
1170            (1, vec![1, 4], vec![vec![1, 2, 3, 4], vec![]]),
1171            (1, vec![2, 4], vec![vec![2, 3, 4], vec![]]),
1172            (1, vec![1, 3], vec![vec![1, 2, 3], vec![]]),
1173        ];
1174        for (input_level, sst_id_filter, expected) in &sst_id_filters {
1175            let option = ManualCompactionOption {
1176                sst_ids: sst_id_filter.iter().cloned().map(Into::into).collect(),
1177                level: *input_level as _,
1178                key_range: KeyRange {
1179                    left: Bytes::default(),
1180                    right: Bytes::default(),
1181                    right_exclusive: false,
1182                },
1183                internal_table_id: HashSet::default(),
1184                exclusive: false,
1185            };
1186            let mut picker = ManualCompactionPicker::new(
1187                Arc::new(RangeOverlapStrategy::default()),
1188                option.clone(),
1189                *input_level as _,
1190            );
1191            let result = picker
1192                .pick_compaction(
1193                    &levels,
1194                    &levels_handler,
1195                    &mut LocalPickerStatistic::default(),
1196                )
1197                .unwrap();
1198            assert_eq!(result.input_levels.len(), expected.len());
1199            for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
1200                assert_eq!(
1201                    result.input_levels[i]
1202                        .table_infos
1203                        .iter()
1204                        .map(|s| s.sst_id)
1205                        .collect_vec(),
1206                    *e
1207                );
1208            }
1209        }
1210    }
1211
1212    #[test]
1213    fn test_manual_compaction_selector_l0() {
1214        let config = CompactionConfigBuilder::new().max_level(4).build();
1215        let group_config = CompactionGroup::new(1, config);
1216        let l0 = generate_l0_nonoverlapping_sublevels(vec![
1217            generate_table(0, 1, 0, 500, 1),
1218            generate_table(1, 1, 0, 500, 1),
1219        ]);
1220        assert_eq!(l0.sub_levels.len(), 2);
1221        let levels = vec![
1222            generate_level(1, vec![]),
1223            generate_level(2, vec![]),
1224            generate_level(3, vec![]),
1225            Level {
1226                level_idx: 4,
1227                level_type: LevelType::Nonoverlapping,
1228                table_infos: vec![
1229                    generate_table(2, 1, 0, 100, 1),
1230                    generate_table(3, 1, 101, 200, 1),
1231                    generate_table(4, 1, 222, 300, 1),
1232                ],
1233                ..Default::default()
1234            },
1235        ];
1236        assert_eq!(levels.len(), 4);
1237        let levels = Levels {
1238            levels,
1239            l0,
1240            ..Default::default()
1241        };
1242        let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
1243        let mut local_stats = LocalSelectorStatistic::default();
1244
1245        // pick_l0_to_sub_level
1246        {
1247            let option = ManualCompactionOption {
1248                sst_ids: [0, 1].iter().cloned().map(Into::into).collect(),
1249                key_range: KeyRange {
1250                    left: Bytes::default(),
1251                    right: Bytes::default(),
1252                    right_exclusive: false,
1253                },
1254                internal_table_id: HashSet::default(),
1255                level: 0,
1256                exclusive: false,
1257            };
1258            let mut selector = ManualCompactionSelector::new(option);
1259            let task = selector
1260                .pick_compaction(
1261                    1,
1262                    compaction_selector_context(
1263                        &group_config,
1264                        &levels,
1265                        &BTreeSet::new(),
1266                        &mut levels_handler,
1267                        &mut local_stats,
1268                        &HashMap::default(),
1269                        Arc::new(CompactionDeveloperConfig::default()),
1270                        &Default::default(),
1271                        &HummockVersionStateTableInfo::empty(),
1272                    ),
1273                )
1274                .unwrap();
1275            assert_compaction_task(&task, &levels_handler);
1276            assert_eq!(task.input.input_levels.len(), 2);
1277            assert_eq!(task.input.input_levels[0].level_idx, 0);
1278            assert_eq!(task.input.input_levels[1].level_idx, 0);
1279            assert_eq!(task.input.target_level, 0);
1280        }
1281
1282        for level_handler in &mut levels_handler {
1283            for pending_task_id in &level_handler.pending_tasks_ids() {
1284                level_handler.remove_task(*pending_task_id);
1285            }
1286        }
1287
1288        // pick_l0_to_base_level
1289        {
1290            let option = ManualCompactionOption {
1291                sst_ids: vec![],
1292                key_range: KeyRange {
1293                    left: Bytes::default(),
1294                    right: Bytes::default(),
1295                    right_exclusive: false,
1296                },
1297                internal_table_id: HashSet::default(),
1298                level: 0,
1299                exclusive: false,
1300            };
1301            let mut selector = ManualCompactionSelector::new(option);
1302            let task = selector
1303                .pick_compaction(
1304                    2,
1305                    compaction_selector_context(
1306                        &group_config,
1307                        &levels,
1308                        &BTreeSet::new(),
1309                        &mut levels_handler,
1310                        &mut local_stats,
1311                        &HashMap::default(),
1312                        Arc::new(CompactionDeveloperConfig::default()),
1313                        &Default::default(),
1314                        &HummockVersionStateTableInfo::empty(),
1315                    ),
1316                )
1317                .unwrap();
1318            assert_compaction_task(&task, &levels_handler);
1319            assert_eq!(task.input.input_levels.len(), 3);
1320            assert_eq!(task.input.input_levels[0].level_idx, 0);
1321            assert_eq!(task.input.input_levels[1].level_idx, 0);
1322            assert_eq!(task.input.input_levels[2].level_idx, 4);
1323            assert_eq!(task.input.target_level, 4);
1324        }
1325    }
1326
1327    /// tests `DynamicLevelSelector::manual_pick_compaction`
1328    #[test]
1329    fn test_manual_compaction_selector() {
1330        let config = CompactionConfigBuilder::new().max_level(4).build();
1331        let group_config = CompactionGroup::new(1, config);
1332        let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
1333        assert_eq!(l0.sub_levels.len(), 0);
1334        let levels = vec![
1335            generate_level(1, vec![]),
1336            generate_level(2, vec![]),
1337            generate_level(
1338                3,
1339                vec![
1340                    generate_table(0, 1, 150, 151, 1),
1341                    generate_table(1, 1, 250, 251, 1),
1342                ],
1343            ),
1344            Level {
1345                level_idx: 4,
1346                level_type: LevelType::Nonoverlapping,
1347                table_infos: vec![
1348                    generate_table(2, 1, 0, 100, 1),
1349                    generate_table(3, 1, 101, 200, 1),
1350                    generate_table(4, 1, 222, 300, 1),
1351                    generate_table(5, 1, 333, 400, 1),
1352                    generate_table(6, 1, 444, 500, 1),
1353                    generate_table(7, 1, 555, 600, 1),
1354                ],
1355                ..Default::default()
1356            },
1357        ];
1358        assert_eq!(levels.len(), 4);
1359        let levels = Levels {
1360            levels,
1361            l0,
1362            ..Default::default()
1363        };
1364        let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
1365        let mut local_stats = LocalSelectorStatistic::default();
1366
1367        // pick l3 -> l4
1368        {
1369            let option = ManualCompactionOption {
1370                sst_ids: [0, 1].iter().cloned().map(Into::into).collect(),
1371                key_range: KeyRange {
1372                    left: Bytes::default(),
1373                    right: Bytes::default(),
1374                    right_exclusive: false,
1375                },
1376                internal_table_id: HashSet::default(),
1377                level: 3,
1378                exclusive: false,
1379            };
1380            let mut selector = ManualCompactionSelector::new(option);
1381            let task = selector
1382                .pick_compaction(
1383                    1,
1384                    compaction_selector_context(
1385                        &group_config,
1386                        &levels,
1387                        &BTreeSet::new(),
1388                        &mut levels_handler,
1389                        &mut local_stats,
1390                        &HashMap::default(),
1391                        Arc::new(CompactionDeveloperConfig::default()),
1392                        &Default::default(),
1393                        &HummockVersionStateTableInfo::empty(),
1394                    ),
1395                )
1396                .unwrap();
1397            assert_compaction_task(&task, &levels_handler);
1398            assert_eq!(task.input.input_levels.len(), 2);
1399            assert_eq!(task.input.input_levels[0].level_idx, 3);
1400            assert_eq!(task.input.input_levels[0].table_infos.len(), 2);
1401            assert_eq!(task.input.input_levels[1].level_idx, 4);
1402            assert_eq!(task.input.input_levels[1].table_infos.len(), 2);
1403            assert_eq!(task.input.target_level, 4);
1404        }
1405
1406        for level_handler in &mut levels_handler {
1407            for pending_task_id in &level_handler.pending_tasks_ids() {
1408                level_handler.remove_task(*pending_task_id);
1409            }
1410        }
1411
1412        // pick l4 -> l4
1413        {
1414            let option = ManualCompactionOption {
1415                sst_ids: vec![],
1416                key_range: KeyRange {
1417                    left: Bytes::default(),
1418                    right: Bytes::default(),
1419                    right_exclusive: false,
1420                },
1421                internal_table_id: HashSet::default(),
1422                level: 4,
1423                exclusive: false,
1424            };
1425            let mut selector = ManualCompactionSelector::new(option);
1426            let task = selector
1427                .pick_compaction(
1428                    1,
1429                    compaction_selector_context(
1430                        &group_config,
1431                        &levels,
1432                        &BTreeSet::new(),
1433                        &mut levels_handler,
1434                        &mut local_stats,
1435                        &HashMap::default(),
1436                        Arc::new(CompactionDeveloperConfig::default()),
1437                        &Default::default(),
1438                        &HummockVersionStateTableInfo::empty(),
1439                    ),
1440                )
1441                .unwrap();
1442            assert_compaction_task(&task, &levels_handler);
1443            assert_eq!(task.input.input_levels.len(), 2);
1444            assert_eq!(task.input.input_levels[0].level_idx, 4);
1445            assert_eq!(task.input.input_levels[0].table_infos.len(), 6);
1446            assert_eq!(task.input.input_levels[1].level_idx, 4);
1447            assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
1448            assert_eq!(task.input.target_level, 4);
1449            assert!(matches!(
1450                task.compaction_task_type,
1451                compact_task::TaskType::Manual
1452            ));
1453        }
1454    }
1455}