risingwave_meta/hummock/compaction/picker/
manual_compaction_picker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_hummock_sdk::level::{InputLevel, Level, Levels, OverlappingLevel};
20use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
21use risingwave_pb::hummock::LevelType;
22
23use super::{CompactionInput, CompactionPicker, LocalPickerStatistic};
24use crate::hummock::compaction::overlap_strategy::{
25    OverlapInfo, OverlapStrategy, RangeOverlapInfo,
26};
27use crate::hummock::compaction::selector::ManualCompactionOption;
28use crate::hummock::level_handler::LevelHandler;
29
30pub struct ManualCompactionPicker {
31    overlap_strategy: Arc<dyn OverlapStrategy>,
32    option: ManualCompactionOption,
33    target_level: usize,
34}
35
36impl ManualCompactionPicker {
37    pub fn new(
38        overlap_strategy: Arc<dyn OverlapStrategy>,
39        option: ManualCompactionOption,
40        target_level: usize,
41    ) -> Self {
42        Self {
43            overlap_strategy,
44            option,
45            target_level,
46        }
47    }
48
49    fn pick_l0_to_sub_level(
50        &self,
51        l0: &OverlappingLevel,
52        level_handlers: &[LevelHandler],
53    ) -> Option<CompactionInput> {
54        assert_eq!(self.option.level, 0);
55        let mut input_levels = vec![];
56        let mut sub_level_id = 0;
57        let mut start_idx = None;
58        let mut end_idx = None;
59        // Decides the range of sub levels as input.
60        // We need pick consecutive sub_levels. See #5217.
61        for (idx, level) in l0.sub_levels.iter().enumerate() {
62            if !self.filter_level_by_option(level) {
63                continue;
64            }
65            if level_handlers[0].is_level_pending_compact(level) {
66                return None;
67            }
68            // Pick this sub_level.
69            if start_idx.is_none() {
70                sub_level_id = level.sub_level_id;
71                start_idx = Some(idx as u64);
72                end_idx = start_idx;
73            } else {
74                end_idx = Some(idx as u64);
75            }
76        }
77        let (start_idx, end_idx) = match (start_idx, end_idx) {
78            (Some(start_idx), Some(end_idx)) => (start_idx, end_idx),
79            _ => {
80                return None;
81            }
82        };
83        // Construct input.
84        for level in l0
85            .sub_levels
86            .iter()
87            .skip(start_idx as usize)
88            .take((end_idx - start_idx + 1) as usize)
89        {
90            input_levels.push(InputLevel {
91                level_idx: 0,
92                level_type: level.level_type,
93                table_infos: level.table_infos.clone(),
94            });
95        }
96        if input_levels.is_empty() {
97            return None;
98        }
99        input_levels.reverse();
100        Some(CompactionInput {
101            input_levels,
102            target_level: 0,
103            target_sub_level_id: sub_level_id,
104            ..Default::default()
105        })
106    }
107
108    fn pick_l0_to_base_level(
109        &self,
110        levels: &Levels,
111        level_handlers: &[LevelHandler],
112    ) -> Option<CompactionInput> {
113        assert!(self.option.level == 0 && self.target_level > 0);
114        for l in 1..self.target_level {
115            assert!(levels.levels[l - 1].table_infos.is_empty());
116        }
117        let l0 = &levels.l0;
118        let mut input_levels = vec![];
119        let mut max_sub_level_idx = usize::MAX;
120        let mut info = self.overlap_strategy.create_overlap_info();
121        // Decides the range of sub levels as input.
122        for (idx, level) in l0.sub_levels.iter().enumerate() {
123            if !self.filter_level_by_option(level) {
124                continue;
125            }
126            if level_handlers[0].is_level_pending_compact(level) {
127                return None;
128            }
129
130            // Pick this sub_level.
131            max_sub_level_idx = idx;
132        }
133        if max_sub_level_idx == usize::MAX {
134            return None;
135        }
136        // Construct input.
137        for idx in 0..=max_sub_level_idx {
138            for table in &l0.sub_levels[idx].table_infos {
139                info.update(table);
140            }
141            input_levels.push(InputLevel {
142                level_idx: 0,
143                level_type: l0.sub_levels[idx].level_type,
144                table_infos: l0.sub_levels[idx].table_infos.clone(),
145            })
146        }
147        let target_input_ssts_range =
148            info.check_multiple_overlap(&levels.levels[self.target_level - 1].table_infos);
149        let target_input_ssts = if target_input_ssts_range.is_empty() {
150            vec![]
151        } else {
152            levels.levels[self.target_level - 1].table_infos[target_input_ssts_range].to_vec()
153        };
154        if target_input_ssts
155            .iter()
156            .any(|table| level_handlers[self.target_level].is_pending_compact(&table.sst_id))
157        {
158            return None;
159        }
160        if input_levels.is_empty() {
161            return None;
162        }
163        input_levels.reverse();
164        input_levels.push(InputLevel {
165            level_idx: self.target_level as u32,
166            level_type: LevelType::Nonoverlapping,
167            table_infos: target_input_ssts,
168        });
169
170        Some(CompactionInput {
171            input_levels,
172            target_level: self.target_level,
173            target_sub_level_id: 0,
174            ..Default::default()
175        })
176    }
177
178    /// Returns false if the given `sst` is rejected by filter defined by `option`.
179    /// Otherwise returns true.
180    fn filter_level_by_option(&self, level: &Level) -> bool {
181        let mut hint_sst_ids: HashSet<u64> = HashSet::new();
182        hint_sst_ids.extend(self.option.sst_ids.iter());
183        let tmp_sst_info = SstableInfoInner {
184            key_range: self.option.key_range.clone(),
185            ..Default::default()
186        }
187        .into();
188        if self
189            .overlap_strategy
190            .check_overlap_with_tables(&[tmp_sst_info], &level.table_infos)
191            .is_empty()
192        {
193            return false;
194        }
195        if !hint_sst_ids.is_empty()
196            && !level
197                .table_infos
198                .iter()
199                .any(|t| hint_sst_ids.contains(&t.sst_id))
200        {
201            return false;
202        }
203        if !self.option.internal_table_id.is_empty()
204            && !level.table_infos.iter().any(|sst_info| {
205                sst_info
206                    .table_ids
207                    .iter()
208                    .any(|t| self.option.internal_table_id.contains(t))
209            })
210        {
211            return false;
212        }
213        true
214    }
215}
216
217impl CompactionPicker for ManualCompactionPicker {
218    fn pick_compaction(
219        &mut self,
220        levels: &Levels,
221        level_handlers: &[LevelHandler],
222        _stats: &mut LocalPickerStatistic,
223    ) -> Option<CompactionInput> {
224        if self.option.level == 0 {
225            if !self.option.sst_ids.is_empty() {
226                return self.pick_l0_to_sub_level(&levels.l0, level_handlers);
227            } else if self.target_level > 0 {
228                return self.pick_l0_to_base_level(levels, level_handlers);
229            } else {
230                return None;
231            }
232        }
233        let mut hint_sst_ids: HashSet<u64> = HashSet::new();
234        hint_sst_ids.extend(self.option.sst_ids.iter());
235        let mut tmp_sst_info = SstableInfoInner::default();
236        let mut range_overlap_info = RangeOverlapInfo::default();
237        tmp_sst_info.key_range = self.option.key_range.clone();
238        range_overlap_info.update(&tmp_sst_info.into());
239        let level = self.option.level;
240        let target_level = self.target_level;
241        assert!(
242            self.option.level == self.target_level || self.option.level + 1 == self.target_level
243        );
244        // We either include all `select_input_ssts` as input, or return None.
245        let mut select_input_ssts: Vec<SstableInfo> = levels
246            .get_level(self.option.level)
247            .table_infos
248            .iter()
249            .filter(|sst_info| hint_sst_ids.is_empty() || hint_sst_ids.contains(&sst_info.sst_id))
250            .filter(|sst_info| range_overlap_info.check_overlap(sst_info))
251            .filter(|sst_info| {
252                if self.option.internal_table_id.is_empty() {
253                    return true;
254                }
255
256                // to filter sst_file by table_id
257                for table_id in &sst_info.table_ids {
258                    if self.option.internal_table_id.contains(table_id) {
259                        return true;
260                    }
261                }
262                false
263            })
264            .cloned()
265            .collect();
266        if select_input_ssts.is_empty() {
267            return None;
268        }
269        let target_input_ssts = if target_level == level {
270            // For intra level compaction, input SSTs must be consecutive.
271            let (left, _) = levels
272                .get_level(level)
273                .table_infos
274                .iter()
275                .find_position(|p| p.sst_id == select_input_ssts.first().unwrap().sst_id)
276                .unwrap();
277            let (right, _) = levels
278                .get_level(level)
279                .table_infos
280                .iter()
281                .find_position(|p| p.sst_id == select_input_ssts.last().unwrap().sst_id)
282                .unwrap();
283            select_input_ssts = levels.get_level(level).table_infos[left..=right].to_vec();
284            vec![]
285        } else {
286            self.overlap_strategy.check_base_level_overlap(
287                &select_input_ssts,
288                &levels.get_level(target_level).table_infos,
289            )
290        };
291        if select_input_ssts
292            .iter()
293            .any(|table| level_handlers[level].is_pending_compact(&table.sst_id))
294        {
295            return None;
296        }
297        if target_input_ssts
298            .iter()
299            .any(|table| level_handlers[target_level].is_pending_compact(&table.sst_id))
300        {
301            return None;
302        }
303
304        Some(CompactionInput {
305            select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
306            target_input_size: target_input_ssts.iter().map(|sst| sst.sst_size).sum(),
307            total_file_count: (select_input_ssts.len() + target_input_ssts.len()) as u64,
308            input_levels: vec![
309                InputLevel {
310                    level_idx: level as u32,
311                    level_type: levels.levels[level - 1].level_type,
312                    table_infos: select_input_ssts,
313                },
314                InputLevel {
315                    level_idx: target_level as u32,
316                    level_type: levels.levels[target_level - 1].level_type,
317                    table_infos: target_input_ssts,
318                },
319            ],
320            target_level,
321            ..Default::default()
322        })
323    }
324}
325
326#[cfg(test)]
327pub mod tests {
328    use std::collections::{BTreeSet, HashMap};
329
330    use bytes::Bytes;
331    use risingwave_hummock_sdk::key_range::KeyRange;
332    use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
333    use risingwave_pb::hummock::compact_task;
334
335    use super::*;
336    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
337    use crate::hummock::compaction::overlap_strategy::RangeOverlapStrategy;
338    use crate::hummock::compaction::selector::tests::{
339        assert_compaction_task, generate_l0_nonoverlapping_sublevels,
340        generate_l0_overlapping_sublevels, generate_level, generate_table,
341    };
342    use crate::hummock::compaction::selector::{CompactionSelector, ManualCompactionSelector};
343    use crate::hummock::compaction::{CompactionDeveloperConfig, LocalSelectorStatistic};
344    use crate::hummock::model::CompactionGroup;
345    use crate::hummock::test_utils::{compaction_selector_context, iterator_test_key_of_epoch};
346
347    fn clean_task_state(level_handler: &mut LevelHandler) {
348        for pending_task_id in &level_handler.pending_tasks_ids() {
349            level_handler.remove_task(*pending_task_id);
350        }
351    }
352
353    fn is_l0_to_lbase(compaction_input: &CompactionInput) -> bool {
354        compaction_input
355            .input_levels
356            .iter()
357            .take(compaction_input.input_levels.len() - 1)
358            .all(|i| i.level_idx == 0)
359            && compaction_input
360                .input_levels
361                .iter()
362                .last()
363                .unwrap()
364                .level_idx as usize
365                == compaction_input.target_level
366            && compaction_input.target_level > 0
367    }
368
369    fn is_l0_to_l0(compaction_input: &CompactionInput) -> bool {
370        compaction_input
371            .input_levels
372            .iter()
373            .all(|i| i.level_idx == 0)
374            && compaction_input.target_level == 0
375    }
376
377    #[test]
378    fn test_manual_compaction_picker() {
379        let levels = vec![
380            Level {
381                level_idx: 1,
382                level_type: LevelType::Nonoverlapping,
383                table_infos: vec![
384                    generate_table(0, 1, 0, 100, 1),
385                    generate_table(1, 1, 101, 200, 1),
386                    generate_table(2, 1, 222, 300, 1),
387                ],
388                ..Default::default()
389            },
390            Level {
391                level_idx: 2,
392                level_type: LevelType::Nonoverlapping,
393                table_infos: vec![
394                    generate_table(4, 1, 0, 100, 1),
395                    generate_table(5, 1, 101, 150, 1),
396                    generate_table(6, 1, 151, 201, 1),
397                    generate_table(7, 1, 501, 800, 1),
398                    generate_table(8, 2, 301, 400, 1),
399                ],
400                ..Default::default()
401            },
402        ];
403        let mut levels = Levels {
404            levels,
405            l0: generate_l0_nonoverlapping_sublevels(vec![]),
406            ..Default::default()
407        };
408        let mut levels_handler = vec![
409            LevelHandler::new(0),
410            LevelHandler::new(1),
411            LevelHandler::new(2),
412        ];
413        let mut local_stats = LocalPickerStatistic::default();
414
415        {
416            // test key_range option
417            let option = ManualCompactionOption {
418                level: 1,
419                key_range: KeyRange {
420                    left: Bytes::from(iterator_test_key_of_epoch(1, 0, 1)),
421                    right: Bytes::from(iterator_test_key_of_epoch(1, 201, 1)),
422                    right_exclusive: false,
423                },
424                ..Default::default()
425            };
426
427            let target_level = option.level + 1;
428            let mut picker = ManualCompactionPicker::new(
429                Arc::new(RangeOverlapStrategy::default()),
430                option,
431                target_level,
432            );
433            let result = picker
434                .pick_compaction(&levels, &levels_handler, &mut local_stats)
435                .unwrap();
436            result.add_pending_task(0, &mut levels_handler);
437
438            assert_eq!(2, result.input_levels[0].table_infos.len());
439            assert_eq!(3, result.input_levels[1].table_infos.len());
440        }
441
442        {
443            clean_task_state(&mut levels_handler[1]);
444            clean_task_state(&mut levels_handler[2]);
445
446            // test all key range
447            let option = ManualCompactionOption::default();
448            let target_level = option.level + 1;
449            let mut picker = ManualCompactionPicker::new(
450                Arc::new(RangeOverlapStrategy::default()),
451                option,
452                target_level,
453            );
454            let result = picker
455                .pick_compaction(&levels, &levels_handler, &mut local_stats)
456                .unwrap();
457            result.add_pending_task(0, &mut levels_handler);
458
459            assert_eq!(3, result.input_levels[0].table_infos.len());
460            assert_eq!(3, result.input_levels[1].table_infos.len());
461        }
462
463        {
464            clean_task_state(&mut levels_handler[1]);
465            clean_task_state(&mut levels_handler[2]);
466
467            let level_table_info = &mut levels.levels[0].table_infos;
468            let table_info_1 = &mut level_table_info[1];
469            let mut t_inner = table_info_1.get_inner();
470            t_inner.table_ids.resize(2, 0);
471            t_inner.table_ids[0] = 1;
472            t_inner.table_ids[1] = 2;
473            *table_info_1 = t_inner.into();
474
475            // test internal_table_id
476            let option = ManualCompactionOption {
477                level: 1,
478                internal_table_id: HashSet::from([2]),
479                ..Default::default()
480            };
481
482            let target_level = option.level + 1;
483            let mut picker = ManualCompactionPicker::new(
484                Arc::new(RangeOverlapStrategy::default()),
485                option,
486                target_level,
487            );
488
489            let result = picker
490                .pick_compaction(&levels, &levels_handler, &mut local_stats)
491                .unwrap();
492            result.add_pending_task(0, &mut levels_handler);
493
494            assert_eq!(1, result.input_levels[0].table_infos.len());
495            assert_eq!(2, result.input_levels[1].table_infos.len());
496        }
497
498        {
499            clean_task_state(&mut levels_handler[1]);
500            clean_task_state(&mut levels_handler[2]);
501
502            // include all table_info
503            let level_table_info = &mut levels.levels[0].table_infos;
504            for table_info in level_table_info {
505                let mut t_inner = table_info.get_inner();
506                t_inner.table_ids.resize(2, 0);
507                t_inner.table_ids[0] = 1;
508                t_inner.table_ids[1] = 2;
509                *table_info = t_inner.into();
510            }
511
512            // test key range filter first
513            let option = ManualCompactionOption {
514                sst_ids: vec![],
515                level: 1,
516                key_range: KeyRange {
517                    left: Bytes::from(iterator_test_key_of_epoch(1, 101, 1)),
518                    right: Bytes::from(iterator_test_key_of_epoch(1, 199, 1)),
519                    right_exclusive: false,
520                },
521                internal_table_id: HashSet::from([2]),
522            };
523
524            let target_level = option.level + 1;
525            let mut picker = ManualCompactionPicker::new(
526                Arc::new(RangeOverlapStrategy::default()),
527                option,
528                target_level,
529            );
530
531            let result = picker
532                .pick_compaction(&levels, &levels_handler, &mut local_stats)
533                .unwrap();
534
535            assert_eq!(1, result.input_levels[0].table_infos.len());
536            assert_eq!(2, result.input_levels[1].table_infos.len());
537        }
538    }
539
540    fn generate_test_levels() -> (Levels, Vec<LevelHandler>) {
541        let mut l0 = generate_l0_overlapping_sublevels(vec![
542            vec![
543                generate_table(5, 1, 0, 500, 2),
544                generate_table(6, 2, 600, 1000, 2),
545            ],
546            vec![
547                generate_table(7, 1, 0, 500, 3),
548                generate_table(8, 2, 600, 1000, 3),
549            ],
550            vec![
551                generate_table(9, 1, 300, 500, 4),
552                generate_table(10, 2, 600, 1000, 4),
553            ],
554        ]);
555        // Set a nonoverlapping sub_level.
556        l0.sub_levels[1].level_type = LevelType::Nonoverlapping as _;
557        assert_eq!(l0.sub_levels.len(), 3);
558        let mut levels = vec![
559            Level {
560                level_idx: 1,
561                level_type: LevelType::Nonoverlapping,
562                table_infos: vec![
563                    generate_table(3, 1, 0, 100, 1),
564                    generate_table(4, 2, 2000, 3000, 1),
565                ],
566                ..Default::default()
567            },
568            Level {
569                level_idx: 2,
570                level_type: LevelType::Nonoverlapping,
571                table_infos: vec![
572                    generate_table(1, 1, 0, 100, 1),
573                    generate_table(2, 2, 2000, 3000, 1),
574                ],
575                ..Default::default()
576            },
577        ];
578        // Set internal_table_ids.
579        assert_eq!(levels.len(), 2);
580        for iter in [l0.sub_levels.iter_mut(), levels.iter_mut()] {
581            for (idx, l) in iter.enumerate() {
582                for t in &mut l.table_infos {
583                    let mut t_inner = t.get_inner();
584                    t_inner.table_ids.clear();
585                    if idx == 0 {
586                        t_inner.table_ids.push(((t.sst_id % 2) + 1) as _);
587                    } else {
588                        t_inner.table_ids.push(3);
589                    }
590                    *t = t_inner.into();
591                }
592            }
593        }
594        let levels = Levels {
595            levels,
596            l0,
597            ..Default::default()
598        };
599
600        let levels_handler = vec![
601            LevelHandler::new(0),
602            LevelHandler::new(1),
603            LevelHandler::new(2),
604        ];
605        (levels, levels_handler)
606    }
607
608    fn generate_intra_test_levels() -> (Levels, Vec<LevelHandler>) {
609        let l0 = generate_l0_overlapping_sublevels(vec![]);
610        let levels = vec![Level {
611            level_idx: 1,
612            level_type: LevelType::Nonoverlapping,
613            table_infos: vec![
614                generate_table(1, 1, 0, 100, 1),
615                generate_table(2, 2, 100, 200, 1),
616                generate_table(3, 2, 200, 300, 1),
617                generate_table(4, 2, 300, 400, 1),
618            ],
619            ..Default::default()
620        }];
621        let levels = Levels {
622            levels,
623            l0,
624            ..Default::default()
625        };
626
627        let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
628        (levels, levels_handler)
629    }
630
631    #[test]
632    fn test_l0_empty() {
633        let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
634        let levels = vec![Level {
635            level_idx: 1,
636            level_type: LevelType::Nonoverlapping,
637            table_infos: vec![],
638            total_file_size: 0,
639            sub_level_id: 0,
640            uncompressed_file_size: 0,
641            ..Default::default()
642        }];
643        let levels = Levels {
644            levels,
645            l0,
646            ..Default::default()
647        };
648        let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
649        let option = ManualCompactionOption {
650            sst_ids: vec![1],
651            level: 0,
652            key_range: KeyRange {
653                left: Bytes::default(),
654                right: Bytes::default(),
655                right_exclusive: false,
656            },
657            internal_table_id: HashSet::default(),
658        };
659        let mut picker =
660            ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 0);
661        assert!(
662            picker
663                .pick_compaction(
664                    &levels,
665                    &levels_handler,
666                    &mut LocalPickerStatistic::default()
667                )
668                .is_none()
669        );
670    }
671
672    #[test]
673    fn test_l0_basic() {
674        let (levels, levels_handler) = generate_test_levels();
675
676        // target_level == 0, None
677        let option = ManualCompactionOption {
678            sst_ids: vec![],
679            level: 0,
680            key_range: KeyRange {
681                left: Bytes::default(),
682                right: Bytes::default(),
683                right_exclusive: false,
684            },
685            internal_table_id: HashSet::default(),
686        };
687        let mut picker = ManualCompactionPicker::new(
688            Arc::new(RangeOverlapStrategy::default()),
689            option.clone(),
690            0,
691        );
692        let mut local_stats = LocalPickerStatistic::default();
693        assert!(
694            picker
695                .pick_compaction(&levels, &levels_handler, &mut local_stats)
696                .is_none()
697        );
698
699        // pick_l0_to_base_level
700        let mut picker =
701            ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 1);
702        let mut expected = [vec![5, 6], vec![7, 8], vec![9, 10]];
703        expected.reverse();
704        let result = picker
705            .pick_compaction(&levels, &levels_handler, &mut local_stats)
706            .unwrap();
707        assert_eq!(result.input_levels.len(), 4);
708        assert!(is_l0_to_lbase(&result));
709        assert_eq!(result.target_level, 1);
710        for (l, e) in expected.iter().enumerate().take(3) {
711            assert_eq!(
712                result.input_levels[l]
713                    .table_infos
714                    .iter()
715                    .map(|s| s.sst_id)
716                    .collect_vec(),
717                *e
718            );
719        }
720        assert_eq!(
721            result.input_levels[3].table_infos,
722            vec![levels.levels[0].table_infos[0].clone()]
723        );
724
725        // pick_l0_to_base_level, filtered by key_range
726        let option = ManualCompactionOption {
727            sst_ids: vec![],
728            level: 0,
729            key_range: KeyRange {
730                left: Bytes::from(iterator_test_key_of_epoch(1, 0, 2)),
731                right: Bytes::from(iterator_test_key_of_epoch(1, 200, 2)),
732                right_exclusive: false,
733            },
734            internal_table_id: HashSet::default(),
735        };
736        let mut picker =
737            ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 1);
738        let mut expected = [vec![5, 6], vec![7, 8]];
739        expected.reverse();
740        let result = picker
741            .pick_compaction(&levels, &levels_handler, &mut local_stats)
742            .unwrap();
743        assert_eq!(result.input_levels.len(), 3);
744        assert!(is_l0_to_lbase(&result));
745        assert_eq!(result.target_level, 1);
746        for (l, e) in expected.iter().enumerate().take(2) {
747            assert_eq!(
748                result.input_levels[l]
749                    .table_infos
750                    .iter()
751                    .map(|s| s.sst_id)
752                    .collect_vec(),
753                *e
754            );
755        }
756        assert_eq!(
757            result.input_levels[2].table_infos,
758            vec![levels.levels[0].table_infos[0].clone()]
759        );
760    }
761
762    #[test]
763    fn test_l0_to_l0_option_sst_ids() {
764        let (levels, levels_handler) = generate_test_levels();
765        // (input_level, sst_id_filter, expected_result_input_level_ssts)
766        let sst_id_filters = vec![
767            (0, vec![6], vec![vec![5, 6]]),
768            (0, vec![7], vec![vec![7, 8]]),
769            (0, vec![9], vec![vec![9, 10]]),
770            (0, vec![6, 9], vec![vec![5, 6], vec![7, 8], vec![9, 10]]),
771            (0, vec![8, 9], vec![vec![7, 8], vec![9, 10]]),
772            (0, vec![6, 8, 9], vec![vec![5, 6], vec![7, 8], vec![9, 10]]),
773        ];
774        let mut local_stats = LocalPickerStatistic::default();
775        for (input_level, sst_id_filter, expected) in &sst_id_filters {
776            let expected = expected.iter().rev().cloned().collect_vec();
777            let option = ManualCompactionOption {
778                sst_ids: sst_id_filter.clone(),
779                level: *input_level as _,
780                key_range: KeyRange {
781                    left: Bytes::default(),
782                    right: Bytes::default(),
783                    right_exclusive: false,
784                },
785                internal_table_id: HashSet::default(),
786            };
787            let mut picker = ManualCompactionPicker::new(
788                Arc::new(RangeOverlapStrategy::default()),
789                option.clone(),
790                // l0 to l0 will ignore target_level
791                input_level + 1,
792            );
793            let result = picker
794                .pick_compaction(&levels, &levels_handler, &mut local_stats)
795                .unwrap();
796            assert!(is_l0_to_l0(&result));
797            assert_eq!(result.input_levels.len(), expected.len());
798            for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
799                assert_eq!(
800                    result.input_levels[i]
801                        .table_infos
802                        .iter()
803                        .map(|s| s.sst_id)
804                        .collect_vec(),
805                    *e
806                );
807            }
808        }
809    }
810
811    #[test]
812    fn test_l0_to_lbase_option_internal_table() {
813        let (levels, mut levels_handler) = generate_test_levels();
814        let input_level = 0;
815        let target_level = input_level + 1;
816        let mut local_stats = LocalPickerStatistic::default();
817        {
818            let option = ManualCompactionOption {
819                sst_ids: vec![],
820                level: input_level,
821                key_range: KeyRange {
822                    left: Bytes::default(),
823                    right: Bytes::default(),
824                    right_exclusive: false,
825                },
826                // No matching internal table id.
827                internal_table_id: HashSet::from([100]),
828            };
829            let mut picker = ManualCompactionPicker::new(
830                Arc::new(RangeOverlapStrategy::default()),
831                option,
832                target_level,
833            );
834            assert!(
835                picker
836                    .pick_compaction(&levels, &levels_handler, &mut local_stats)
837                    .is_none()
838            )
839        }
840
841        {
842            let option = ManualCompactionOption {
843                sst_ids: vec![],
844                level: input_level,
845                key_range: KeyRange {
846                    left: Bytes::default(),
847                    right: Bytes::default(),
848                    right_exclusive: false,
849                },
850                // Include all sub level's table ids
851                internal_table_id: HashSet::from([1, 2, 3]),
852            };
853            let mut picker = ManualCompactionPicker::new(
854                Arc::new(RangeOverlapStrategy::default()),
855                option,
856                target_level,
857            );
858            let result = picker
859                .pick_compaction(&levels, &levels_handler, &mut local_stats)
860                .unwrap();
861            assert_eq!(result.input_levels.len(), 4);
862            assert!(is_l0_to_lbase(&result));
863            assert_eq!(result.target_level, 1);
864            assert!(is_l0_to_lbase(&result));
865            assert_eq!(
866                result
867                    .input_levels
868                    .iter()
869                    .take(3)
870                    .flat_map(|s| s.table_infos.clone())
871                    .map(|s| s.sst_id)
872                    .collect_vec(),
873                vec![9, 10, 7, 8, 5, 6]
874            );
875            assert_eq!(
876                result.input_levels[3]
877                    .table_infos
878                    .iter()
879                    .map(|s| s.sst_id)
880                    .collect_vec(),
881                vec![3]
882            );
883        }
884
885        {
886            let option = ManualCompactionOption {
887                sst_ids: vec![],
888                level: input_level,
889                key_range: KeyRange {
890                    left: Bytes::default(),
891                    right: Bytes::default(),
892                    right_exclusive: false,
893                },
894                // Only include bottom sub level's table id
895                internal_table_id: HashSet::from([3]),
896            };
897            let mut picker = ManualCompactionPicker::new(
898                Arc::new(RangeOverlapStrategy::default()),
899                option,
900                target_level,
901            );
902            let result = picker
903                .pick_compaction(&levels, &levels_handler, &mut local_stats)
904                .unwrap();
905            assert_eq!(result.input_levels.len(), 4);
906            assert!(is_l0_to_lbase(&result));
907            assert_eq!(
908                result
909                    .input_levels
910                    .iter()
911                    .take(3)
912                    .flat_map(|s| s.table_infos.clone())
913                    .map(|s| s.sst_id)
914                    .collect_vec(),
915                vec![9, 10, 7, 8, 5, 6]
916            );
917            assert_eq!(
918                result.input_levels[3]
919                    .table_infos
920                    .iter()
921                    .map(|s| s.sst_id)
922                    .collect_vec(),
923                vec![3]
924            );
925            assert_eq!(result.target_level, 1);
926        }
927
928        {
929            let option = ManualCompactionOption {
930                sst_ids: vec![],
931                level: input_level,
932                key_range: KeyRange {
933                    left: Bytes::default(),
934                    right: Bytes::default(),
935                    right_exclusive: false,
936                },
937                // Only include partial top sub level's table id, but the whole top sub level is
938                // picked.
939                internal_table_id: HashSet::from([1]),
940            };
941            let mut picker = ManualCompactionPicker::new(
942                Arc::new(RangeOverlapStrategy::default()),
943                option,
944                target_level,
945            );
946            let result = picker
947                .pick_compaction(&levels, &levels_handler, &mut local_stats)
948                .unwrap();
949            result.add_pending_task(0, &mut levels_handler);
950            assert_eq!(result.input_levels.len(), 2);
951            assert!(is_l0_to_lbase(&result));
952            assert_eq!(result.target_level, 1);
953            assert_eq!(
954                result
955                    .input_levels
956                    .iter()
957                    .take(1)
958                    .flat_map(|s| s.table_infos.clone())
959                    .map(|s| s.sst_id)
960                    .collect_vec(),
961                vec![5, 6]
962            );
963            assert_eq!(
964                result.input_levels[1]
965                    .table_infos
966                    .iter()
967                    .map(|s| s.sst_id)
968                    .collect_vec(),
969                vec![3]
970            );
971
972            // Pick bottom sub level while top sub level is pending
973            let option = ManualCompactionOption {
974                sst_ids: vec![],
975                level: input_level,
976                key_range: KeyRange {
977                    left: Bytes::default(),
978                    right: Bytes::default(),
979                    right_exclusive: false,
980                },
981                // Only include bottom sub level's table id
982                internal_table_id: HashSet::from([3]),
983            };
984            let mut picker = ManualCompactionPicker::new(
985                Arc::new(RangeOverlapStrategy::default()),
986                option,
987                target_level,
988            );
989            // Because top sub-level is pending.
990            assert!(
991                picker
992                    .pick_compaction(&levels, &levels_handler, &mut local_stats)
993                    .is_none()
994            );
995
996            clean_task_state(&mut levels_handler[0]);
997            clean_task_state(&mut levels_handler[1]);
998        }
999    }
1000
1001    #[test]
1002    fn test_ln_to_lnext_option_internal_table() {
1003        let (levels, levels_handler) = generate_test_levels();
1004        let input_level = 1;
1005        let target_level = input_level + 1;
1006        let mut local_stats = LocalPickerStatistic::default();
1007        {
1008            let option = ManualCompactionOption {
1009                sst_ids: vec![],
1010                level: input_level,
1011                key_range: KeyRange {
1012                    left: Bytes::default(),
1013                    right: Bytes::default(),
1014                    right_exclusive: false,
1015                },
1016                // No matching internal table id.
1017                internal_table_id: HashSet::from([100]),
1018            };
1019            let mut picker = ManualCompactionPicker::new(
1020                Arc::new(RangeOverlapStrategy::default()),
1021                option,
1022                target_level,
1023            );
1024            assert!(
1025                picker
1026                    .pick_compaction(&levels, &levels_handler, &mut local_stats)
1027                    .is_none()
1028            )
1029        }
1030
1031        {
1032            let expected_input_level_sst_ids = [vec![4], vec![2]];
1033            let option = ManualCompactionOption {
1034                sst_ids: vec![],
1035                level: input_level,
1036                key_range: KeyRange {
1037                    left: Bytes::default(),
1038                    right: Bytes::default(),
1039                    right_exclusive: false,
1040                },
1041                // Only include partial input level's table id
1042                internal_table_id: HashSet::from([1]),
1043            };
1044            let mut picker = ManualCompactionPicker::new(
1045                Arc::new(RangeOverlapStrategy::default()),
1046                option,
1047                target_level,
1048            );
1049            let result = picker
1050                .pick_compaction(&levels, &levels_handler, &mut local_stats)
1051                .unwrap();
1052            assert_eq!(
1053                result.input_levels.len(),
1054                expected_input_level_sst_ids.len()
1055            );
1056            assert_eq!(result.target_level, target_level);
1057            for (l, e) in expected_input_level_sst_ids
1058                .iter()
1059                .enumerate()
1060                .take(result.input_levels.len())
1061            {
1062                assert_eq!(
1063                    result.input_levels[l]
1064                        .table_infos
1065                        .iter()
1066                        .map(|s| s.sst_id)
1067                        .collect_vec(),
1068                    *e
1069                );
1070            }
1071        }
1072    }
1073
1074    #[test]
1075    fn test_ln_to_lnext_option_sst_ids() {
1076        let (levels, levels_handler) = generate_test_levels();
1077        // (input_level, sst_id_filter, expected_result_input_level_ssts)
1078        let sst_id_filters = vec![
1079            (1, vec![3], vec![vec![3], vec![1]]),
1080            (1, vec![4], vec![vec![4], vec![2]]),
1081            (1, vec![3, 4], vec![vec![3, 4], vec![1, 2]]),
1082        ];
1083        let mut local_stats = LocalPickerStatistic::default();
1084        for (input_level, sst_id_filter, expected) in &sst_id_filters {
1085            let option = ManualCompactionOption {
1086                sst_ids: sst_id_filter.clone(),
1087                level: *input_level as _,
1088                key_range: KeyRange {
1089                    left: Bytes::default(),
1090                    right: Bytes::default(),
1091                    right_exclusive: false,
1092                },
1093                internal_table_id: HashSet::default(),
1094            };
1095            let mut picker = ManualCompactionPicker::new(
1096                Arc::new(RangeOverlapStrategy::default()),
1097                option.clone(),
1098                input_level + 1,
1099            );
1100            let result = picker
1101                .pick_compaction(&levels, &levels_handler, &mut local_stats)
1102                .unwrap();
1103            assert_eq!(result.input_levels.len(), expected.len());
1104            for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
1105                assert_eq!(
1106                    result.input_levels[i]
1107                        .table_infos
1108                        .iter()
1109                        .map(|s| s.sst_id)
1110                        .collect_vec(),
1111                    *e
1112                );
1113            }
1114        }
1115    }
1116
1117    #[test]
1118    fn test_ln_to_ln() {
1119        let (levels, levels_handler) = generate_intra_test_levels();
1120        // (input_level, sst_id_filter, expected_result_input_level_ssts)
1121        let sst_id_filters = vec![
1122            (1, vec![1], vec![vec![1], vec![]]),
1123            (1, vec![3], vec![vec![3], vec![]]),
1124            (1, vec![4], vec![vec![4], vec![]]),
1125            (1, vec![3, 4], vec![vec![3, 4], vec![]]),
1126            (1, vec![1, 4], vec![vec![1, 2, 3, 4], vec![]]),
1127            (1, vec![2, 4], vec![vec![2, 3, 4], vec![]]),
1128            (1, vec![1, 3], vec![vec![1, 2, 3], vec![]]),
1129        ];
1130        for (input_level, sst_id_filter, expected) in &sst_id_filters {
1131            let option = ManualCompactionOption {
1132                sst_ids: sst_id_filter.clone(),
1133                level: *input_level as _,
1134                key_range: KeyRange {
1135                    left: Bytes::default(),
1136                    right: Bytes::default(),
1137                    right_exclusive: false,
1138                },
1139                internal_table_id: HashSet::default(),
1140            };
1141            let mut picker = ManualCompactionPicker::new(
1142                Arc::new(RangeOverlapStrategy::default()),
1143                option.clone(),
1144                *input_level as _,
1145            );
1146            let result = picker
1147                .pick_compaction(
1148                    &levels,
1149                    &levels_handler,
1150                    &mut LocalPickerStatistic::default(),
1151                )
1152                .unwrap();
1153            assert_eq!(result.input_levels.len(), expected.len());
1154            for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
1155                assert_eq!(
1156                    result.input_levels[i]
1157                        .table_infos
1158                        .iter()
1159                        .map(|s| s.sst_id)
1160                        .collect_vec(),
1161                    *e
1162                );
1163            }
1164        }
1165    }
1166
1167    #[test]
1168    fn test_manual_compaction_selector_l0() {
1169        let config = CompactionConfigBuilder::new().max_level(4).build();
1170        let group_config = CompactionGroup::new(1, config);
1171        let l0 = generate_l0_nonoverlapping_sublevels(vec![
1172            generate_table(0, 1, 0, 500, 1),
1173            generate_table(1, 1, 0, 500, 1),
1174        ]);
1175        assert_eq!(l0.sub_levels.len(), 2);
1176        let levels = vec![
1177            generate_level(1, vec![]),
1178            generate_level(2, vec![]),
1179            generate_level(3, vec![]),
1180            Level {
1181                level_idx: 4,
1182                level_type: LevelType::Nonoverlapping,
1183                table_infos: vec![
1184                    generate_table(2, 1, 0, 100, 1),
1185                    generate_table(3, 1, 101, 200, 1),
1186                    generate_table(4, 1, 222, 300, 1),
1187                ],
1188                ..Default::default()
1189            },
1190        ];
1191        assert_eq!(levels.len(), 4);
1192        let levels = Levels {
1193            levels,
1194            l0,
1195            ..Default::default()
1196        };
1197        let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
1198        let mut local_stats = LocalSelectorStatistic::default();
1199
1200        // pick_l0_to_sub_level
1201        {
1202            let option = ManualCompactionOption {
1203                sst_ids: vec![0, 1],
1204                key_range: KeyRange {
1205                    left: Bytes::default(),
1206                    right: Bytes::default(),
1207                    right_exclusive: false,
1208                },
1209                internal_table_id: HashSet::default(),
1210                level: 0,
1211            };
1212            let mut selector = ManualCompactionSelector::new(option);
1213            let task = selector
1214                .pick_compaction(
1215                    1,
1216                    compaction_selector_context(
1217                        &group_config,
1218                        &levels,
1219                        &BTreeSet::new(),
1220                        &mut levels_handler,
1221                        &mut local_stats,
1222                        &HashMap::default(),
1223                        Arc::new(CompactionDeveloperConfig::default()),
1224                        &Default::default(),
1225                        &HummockVersionStateTableInfo::empty(),
1226                    ),
1227                )
1228                .unwrap();
1229            assert_compaction_task(&task, &levels_handler);
1230            assert_eq!(task.input.input_levels.len(), 2);
1231            assert_eq!(task.input.input_levels[0].level_idx, 0);
1232            assert_eq!(task.input.input_levels[1].level_idx, 0);
1233            assert_eq!(task.input.target_level, 0);
1234        }
1235
1236        for level_handler in &mut levels_handler {
1237            for pending_task_id in &level_handler.pending_tasks_ids() {
1238                level_handler.remove_task(*pending_task_id);
1239            }
1240        }
1241
1242        // pick_l0_to_base_level
1243        {
1244            let option = ManualCompactionOption {
1245                sst_ids: vec![],
1246                key_range: KeyRange {
1247                    left: Bytes::default(),
1248                    right: Bytes::default(),
1249                    right_exclusive: false,
1250                },
1251                internal_table_id: HashSet::default(),
1252                level: 0,
1253            };
1254            let mut selector = ManualCompactionSelector::new(option);
1255            let task = selector
1256                .pick_compaction(
1257                    2,
1258                    compaction_selector_context(
1259                        &group_config,
1260                        &levels,
1261                        &BTreeSet::new(),
1262                        &mut levels_handler,
1263                        &mut local_stats,
1264                        &HashMap::default(),
1265                        Arc::new(CompactionDeveloperConfig::default()),
1266                        &Default::default(),
1267                        &HummockVersionStateTableInfo::empty(),
1268                    ),
1269                )
1270                .unwrap();
1271            assert_compaction_task(&task, &levels_handler);
1272            assert_eq!(task.input.input_levels.len(), 3);
1273            assert_eq!(task.input.input_levels[0].level_idx, 0);
1274            assert_eq!(task.input.input_levels[1].level_idx, 0);
1275            assert_eq!(task.input.input_levels[2].level_idx, 4);
1276            assert_eq!(task.input.target_level, 4);
1277        }
1278    }
1279
1280    /// tests `DynamicLevelSelector::manual_pick_compaction`
1281    #[test]
1282    fn test_manual_compaction_selector() {
1283        let config = CompactionConfigBuilder::new().max_level(4).build();
1284        let group_config = CompactionGroup::new(1, config);
1285        let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
1286        assert_eq!(l0.sub_levels.len(), 0);
1287        let levels = vec![
1288            generate_level(1, vec![]),
1289            generate_level(2, vec![]),
1290            generate_level(
1291                3,
1292                vec![
1293                    generate_table(0, 1, 150, 151, 1),
1294                    generate_table(1, 1, 250, 251, 1),
1295                ],
1296            ),
1297            Level {
1298                level_idx: 4,
1299                level_type: LevelType::Nonoverlapping,
1300                table_infos: vec![
1301                    generate_table(2, 1, 0, 100, 1),
1302                    generate_table(3, 1, 101, 200, 1),
1303                    generate_table(4, 1, 222, 300, 1),
1304                    generate_table(5, 1, 333, 400, 1),
1305                    generate_table(6, 1, 444, 500, 1),
1306                    generate_table(7, 1, 555, 600, 1),
1307                ],
1308                ..Default::default()
1309            },
1310        ];
1311        assert_eq!(levels.len(), 4);
1312        let levels = Levels {
1313            levels,
1314            l0,
1315            ..Default::default()
1316        };
1317        let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
1318        let mut local_stats = LocalSelectorStatistic::default();
1319
1320        // pick l3 -> l4
1321        {
1322            let option = ManualCompactionOption {
1323                sst_ids: vec![0, 1],
1324                key_range: KeyRange {
1325                    left: Bytes::default(),
1326                    right: Bytes::default(),
1327                    right_exclusive: false,
1328                },
1329                internal_table_id: HashSet::default(),
1330                level: 3,
1331            };
1332            let mut selector = ManualCompactionSelector::new(option);
1333            let task = selector
1334                .pick_compaction(
1335                    1,
1336                    compaction_selector_context(
1337                        &group_config,
1338                        &levels,
1339                        &BTreeSet::new(),
1340                        &mut levels_handler,
1341                        &mut local_stats,
1342                        &HashMap::default(),
1343                        Arc::new(CompactionDeveloperConfig::default()),
1344                        &Default::default(),
1345                        &HummockVersionStateTableInfo::empty(),
1346                    ),
1347                )
1348                .unwrap();
1349            assert_compaction_task(&task, &levels_handler);
1350            assert_eq!(task.input.input_levels.len(), 2);
1351            assert_eq!(task.input.input_levels[0].level_idx, 3);
1352            assert_eq!(task.input.input_levels[0].table_infos.len(), 2);
1353            assert_eq!(task.input.input_levels[1].level_idx, 4);
1354            assert_eq!(task.input.input_levels[1].table_infos.len(), 2);
1355            assert_eq!(task.input.target_level, 4);
1356        }
1357
1358        for level_handler in &mut levels_handler {
1359            for pending_task_id in &level_handler.pending_tasks_ids() {
1360                level_handler.remove_task(*pending_task_id);
1361            }
1362        }
1363
1364        // pick l4 -> l4
1365        {
1366            let option = ManualCompactionOption {
1367                sst_ids: vec![],
1368                key_range: KeyRange {
1369                    left: Bytes::default(),
1370                    right: Bytes::default(),
1371                    right_exclusive: false,
1372                },
1373                internal_table_id: HashSet::default(),
1374                level: 4,
1375            };
1376            let mut selector = ManualCompactionSelector::new(option);
1377            let task = selector
1378                .pick_compaction(
1379                    1,
1380                    compaction_selector_context(
1381                        &group_config,
1382                        &levels,
1383                        &BTreeSet::new(),
1384                        &mut levels_handler,
1385                        &mut local_stats,
1386                        &HashMap::default(),
1387                        Arc::new(CompactionDeveloperConfig::default()),
1388                        &Default::default(),
1389                        &HummockVersionStateTableInfo::empty(),
1390                    ),
1391                )
1392                .unwrap();
1393            assert_compaction_task(&task, &levels_handler);
1394            assert_eq!(task.input.input_levels.len(), 2);
1395            assert_eq!(task.input.input_levels[0].level_idx, 4);
1396            assert_eq!(task.input.input_levels[0].table_infos.len(), 6);
1397            assert_eq!(task.input.input_levels[1].level_idx, 4);
1398            assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
1399            assert_eq!(task.input.target_level, 4);
1400            assert!(matches!(
1401                task.compaction_task_type,
1402                compact_task::TaskType::Manual
1403            ));
1404        }
1405    }
1406}