risingwave_meta/hummock/compaction/picker/
min_overlap_compaction_picker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_hummock_sdk::append_sstable_info_to_string;
18use risingwave_hummock_sdk::level::{InputLevel, Level, Levels};
19use risingwave_hummock_sdk::sstable_info::SstableInfo;
20use risingwave_pb::hummock::LevelType;
21
22use super::{CompactionInput, CompactionPicker, LocalPickerStatistic};
23use crate::hummock::compaction::overlap_strategy::OverlapStrategy;
24use crate::hummock::level_handler::LevelHandler;
25
26pub struct MinOverlappingPicker {
27    level: usize,
28    target_level: usize,
29    max_select_bytes: u64,
30    vnode_partition_count: u32,
31    overlap_strategy: Arc<dyn OverlapStrategy>,
32}
33
34impl MinOverlappingPicker {
35    pub fn new(
36        level: usize,
37        target_level: usize,
38        max_select_bytes: u64,
39        vnode_partition_count: u32,
40        overlap_strategy: Arc<dyn OverlapStrategy>,
41    ) -> MinOverlappingPicker {
42        MinOverlappingPicker {
43            level,
44            target_level,
45            max_select_bytes,
46            vnode_partition_count,
47            overlap_strategy,
48        }
49    }
50
51    pub fn pick_tables(
52        &self,
53        select_tables: &[SstableInfo],
54        target_tables: &[SstableInfo],
55        level_handlers: &[LevelHandler],
56    ) -> (Vec<SstableInfo>, Vec<SstableInfo>) {
57        let mut select_file_ranges = vec![];
58        for (idx, sst) in select_tables.iter().enumerate() {
59            if level_handlers[self.level].is_pending_compact(&sst.sst_id) {
60                continue;
61            }
62            let mut overlap_info = self.overlap_strategy.create_overlap_info();
63            overlap_info.update(&sst.key_range);
64            let overlap_files_range = overlap_info.check_multiple_overlap(target_tables);
65
66            if overlap_files_range.is_empty() {
67                return (vec![sst.clone()], vec![]);
68            }
69            select_file_ranges.push((idx, overlap_files_range));
70        }
71        select_file_ranges.retain(|(_, range)| {
72            let mut pending_compact = false;
73            for other in &target_tables[range.clone()] {
74                if level_handlers[self.target_level].is_pending_compact(&other.sst_id) {
75                    pending_compact = true;
76                    break;
77                }
78            }
79            !pending_compact
80        });
81
82        let mut min_score = u64::MAX;
83        let mut min_score_select_range = 0..0;
84        let mut min_score_target_range = 0..0;
85        let mut min_score_select_file_size = 0;
86        for left in 0..select_file_ranges.len() {
87            let mut select_file_size = 0;
88            let mut target_level_overlap_range = select_file_ranges[left].1.clone();
89            let mut total_file_size = 0;
90            for other in &target_tables[target_level_overlap_range.clone()] {
91                total_file_size += other.sst_size;
92            }
93            let start_idx = select_file_ranges[left].0;
94            let mut end_idx = start_idx + 1;
95            for (idx, range) in select_file_ranges.iter().skip(left) {
96                if select_file_size > self.max_select_bytes
97                    || *idx > end_idx
98                    || range.start >= target_level_overlap_range.end
99                {
100                    break;
101                }
102                select_file_size += select_tables[*idx].sst_size;
103                if range.end > target_level_overlap_range.end {
104                    for other in &target_tables[target_level_overlap_range.end..range.end] {
105                        total_file_size += other.sst_size;
106                    }
107                    target_level_overlap_range.end = range.end;
108                }
109                let score = if select_file_size == 0 {
110                    total_file_size
111                } else {
112                    total_file_size * 100 / select_file_size
113                };
114                end_idx = idx + 1;
115                if score < min_score
116                    || (score == min_score && select_file_size < min_score_select_file_size)
117                {
118                    min_score = score;
119                    min_score_select_range = start_idx..end_idx;
120                    min_score_target_range = target_level_overlap_range.clone();
121                    min_score_select_file_size = select_file_size;
122                }
123            }
124        }
125        if min_score == u64::MAX {
126            return (vec![], vec![]);
127        }
128        let select_input_ssts = select_tables[min_score_select_range].to_vec();
129        let target_input_ssts = target_tables[min_score_target_range].to_vec();
130        (select_input_ssts, target_input_ssts)
131    }
132}
133
134impl CompactionPicker for MinOverlappingPicker {
135    fn pick_compaction(
136        &mut self,
137        levels: &Levels,
138        level_handlers: &[LevelHandler],
139        stats: &mut LocalPickerStatistic,
140    ) -> Option<CompactionInput> {
141        assert!(self.level > 0);
142        let (select_input_ssts, target_input_ssts) = self.pick_tables(
143            &levels.get_level(self.level).table_infos,
144            &levels.get_level(self.target_level).table_infos,
145            level_handlers,
146        );
147        if select_input_ssts.is_empty() {
148            stats.skip_by_pending_files += 1;
149            return None;
150        }
151        Some(CompactionInput {
152            select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
153            target_input_size: target_input_ssts.iter().map(|sst| sst.sst_size).sum(),
154            total_file_count: (select_input_ssts.len() + target_input_ssts.len()) as u64,
155            input_levels: vec![
156                InputLevel {
157                    level_idx: self.level as u32,
158                    level_type: LevelType::Nonoverlapping,
159                    table_infos: select_input_ssts,
160                },
161                InputLevel {
162                    level_idx: self.target_level as u32,
163                    level_type: LevelType::Nonoverlapping,
164                    table_infos: target_input_ssts,
165                },
166            ],
167            target_level: self.target_level,
168            vnode_partition_count: self.vnode_partition_count,
169            ..Default::default()
170        })
171    }
172}
173
174#[derive(Default, Debug)]
175pub struct SubLevelSstables {
176    pub total_file_size: u64,
177    pub total_file_count: usize,
178    pub sstable_infos: Vec<(u64, Vec<SstableInfo>)>,
179    pub expected: bool,
180}
181
182pub struct NonOverlapSubLevelPicker {
183    min_compaction_bytes: u64,
184    max_compaction_bytes: u64,
185    min_expected_level_count: usize,
186    max_file_count: u64,
187    overlap_strategy: Arc<dyn OverlapStrategy>,
188    enable_check_task_level_overlap: bool,
189    max_expected_level_count: usize,
190    enable_optimize_l0_interval_selection: bool,
191}
192
193impl NonOverlapSubLevelPicker {
194    pub fn new(
195        min_compaction_bytes: u64,
196        max_compaction_bytes: u64,
197        min_expected_level_count: usize,
198        max_file_count: u64,
199        overlap_strategy: Arc<dyn OverlapStrategy>,
200        enable_check_task_level_overlap: bool,
201        max_expected_level_count: usize,
202        enable_optimize_l0_interval_selection: bool,
203    ) -> Self {
204        Self {
205            min_compaction_bytes,
206            max_compaction_bytes,
207            min_expected_level_count,
208            max_file_count,
209            overlap_strategy,
210            enable_check_task_level_overlap,
211            max_expected_level_count,
212            enable_optimize_l0_interval_selection,
213        }
214    }
215
216    #[cfg(test)]
217    pub fn for_test(
218        min_compaction_bytes: u64,
219        max_compaction_bytes: u64,
220        min_expected_level_count: usize,
221        max_file_count: u64,
222        overlap_strategy: Arc<dyn OverlapStrategy>,
223        enable_check_task_level_overlap: bool,
224        max_expected_level_count: usize,
225        enable_optimize_l0_interval_selection: bool,
226    ) -> Self {
227        Self {
228            min_compaction_bytes,
229            max_compaction_bytes,
230            min_expected_level_count,
231            max_file_count,
232            overlap_strategy,
233            enable_check_task_level_overlap,
234            max_expected_level_count,
235            enable_optimize_l0_interval_selection,
236        }
237    }
238
239    // Use the incoming sst as the basic range and select as many sub levels as possible
240    // 1. Build basic range based on sst
241    // 2. Add a new sub level in each round
242    // 3. Expand sst according to the basic range from new to old sub levels.
243    // 4. According to the size and count restrictions, select the plan that contains the most sub levels as much as possible
244    fn pick_sub_level(
245        &self,
246        levels: &[Level],
247        level_handler: &LevelHandler,
248        sst: &SstableInfo,
249    ) -> Option<SubLevelSstables> {
250        let mut ret = SubLevelSstables::default();
251        for sub_level in levels {
252            ret.sstable_infos.push((sub_level.sub_level_id, vec![]));
253        }
254
255        let mut pick_levels_range = Vec::default();
256        let mut max_select_level_count = 0;
257
258        // Pay attention to the order here: Make sure to select the lowest sub_level to meet the requirements of base compaction. If you break the assumption of this order, you need to redesign it.
259        // TODO: Use binary selection to replace the step algorithm to optimize algorithm complexity
260        'expand_new_level: for (target_index, target_level) in levels.iter().enumerate() {
261            if target_level.level_type != LevelType::Nonoverlapping {
262                break;
263            }
264
265            if ret
266                .sstable_infos
267                .iter()
268                .filter(|(_sub_level_id, ssts)| !ssts.is_empty())
269                .count()
270                > self.max_expected_level_count
271            {
272                break;
273            }
274
275            // reset the `basic_overlap_info` with basic sst
276            let mut basic_overlap_info = self.overlap_strategy.create_overlap_info();
277            basic_overlap_info.update(&sst.key_range);
278
279            let mut overlap_files_range =
280                basic_overlap_info.check_multiple_include(&target_level.table_infos);
281            if overlap_files_range.is_empty() {
282                overlap_files_range =
283                    basic_overlap_info.check_multiple_overlap(&target_level.table_infos);
284            }
285
286            if overlap_files_range.is_empty() {
287                continue;
288            }
289
290            let mut overlap_levels = vec![];
291
292            let mut add_files_size: u64 = 0;
293            let mut add_files_count: usize = 0;
294
295            let mut select_level_count = 0;
296            for reverse_index in (0..=target_index).rev() {
297                let target_tables = &levels[reverse_index].table_infos;
298
299                overlap_files_range = if target_index == reverse_index {
300                    overlap_files_range
301                } else {
302                    basic_overlap_info.check_multiple_overlap(target_tables)
303                };
304
305                // We allow a layer in the middle without overlap, so we need to continue to
306                // the next layer to search for overlap
307                if overlap_files_range.is_empty() {
308                    // empty level
309                    continue;
310                }
311
312                for other in &target_tables[overlap_files_range.clone()] {
313                    if level_handler.is_pending_compact(&other.sst_id) {
314                        break 'expand_new_level;
315                    }
316                    basic_overlap_info.update(&other.key_range);
317
318                    add_files_size += other.sst_size;
319                    add_files_count += 1;
320                }
321
322                overlap_levels.push((reverse_index, overlap_files_range.clone()));
323                select_level_count += 1;
324            }
325
326            if select_level_count > max_select_level_count {
327                max_select_level_count = select_level_count;
328                pick_levels_range = overlap_levels;
329            }
330
331            // When size / file count has exceeded the limit, we need to abandon this plan, it cannot be expanded to the last sub_level
332            if max_select_level_count >= self.min_expected_level_count
333                && (add_files_size >= self.max_compaction_bytes
334                    || add_files_count >= self.max_file_count as usize)
335            {
336                break 'expand_new_level;
337            }
338        }
339
340        if !pick_levels_range.is_empty() {
341            for (reverse_index, sst_range) in pick_levels_range {
342                let level_ssts = &levels[reverse_index].table_infos;
343                ret.sstable_infos[reverse_index].1 = level_ssts[sst_range].to_vec();
344                ret.total_file_count += ret.sstable_infos[reverse_index].1.len();
345                ret.total_file_size += ret.sstable_infos[reverse_index]
346                    .1
347                    .iter()
348                    .map(|sst| sst.sst_size)
349                    .sum::<u64>();
350            }
351
352            // sort sst per level due to reverse expand
353            ret.sstable_infos
354                .iter_mut()
355                .for_each(|(_sub_level_id, level_ssts)| {
356                    level_ssts.sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
357                });
358        } else {
359            ret.total_file_count = 0;
360            ret.total_file_size = 0;
361        }
362
363        if self.enable_check_task_level_overlap {
364            self.verify_task_level_overlap(&ret, levels);
365        }
366
367        ret.sstable_infos
368            .retain(|(_sub_level_id, ssts)| !ssts.is_empty());
369
370        // To check whether the task is expected
371        if ret.total_file_size > self.max_compaction_bytes
372            || ret.total_file_count as u64 > self.max_file_count
373            || ret.sstable_infos.len() > self.max_expected_level_count
374        {
375            // rotate the sstables to meet the `max_file_count` and `max_compaction_bytes` and `max_expected_level_count`
376            let mut total_file_count = 0;
377            let mut total_file_size = 0;
378            let mut total_level_count = 0;
379            for (index, (_sub_level_id, sstables)) in ret.sstable_infos.iter().enumerate() {
380                total_file_count += sstables.len();
381                total_file_size += sstables.iter().map(|sst| sst.sst_size).sum::<u64>();
382                total_level_count += 1;
383
384                // Atleast `min_expected_level_count`` level should be selected
385                if total_level_count >= self.min_expected_level_count
386                    && (total_file_count as u64 >= self.max_file_count
387                        || total_file_size >= self.max_compaction_bytes
388                        || total_level_count >= self.max_expected_level_count)
389                {
390                    ret.total_file_count = total_file_count;
391                    ret.total_file_size = total_file_size;
392                    ret.sstable_infos.truncate(index + 1);
393                    break;
394                }
395            }
396        }
397
398        if ret.sstable_infos.is_empty() {
399            return None;
400        }
401
402        Some(ret)
403    }
404
405    pub fn pick_l0_multi_non_overlap_level(
406        &self,
407        l0: &[Level],
408        level_handler: &LevelHandler,
409    ) -> Vec<SubLevelSstables> {
410        if l0.len() < self.min_expected_level_count {
411            return vec![];
412        }
413
414        let mut scores = vec![];
415        // To find the sub_level with the most files;
416        let intervals_level_idx = if self.enable_optimize_l0_interval_selection {
417            l0.iter()
418                .enumerate()
419                .max_by(|(idx1, levels1), (idx2, levels2)| {
420                    levels1
421                        .table_infos
422                        .len()
423                        .cmp(&levels2.table_infos.len())
424                        .then(idx2.cmp(idx1))
425                })
426                .map(|(idx, _)| idx)
427                .unwrap()
428        } else {
429            0
430        };
431
432        let intervals = &l0[intervals_level_idx].table_infos;
433        for sst in intervals {
434            if level_handler.is_pending_compact(&sst.sst_id) {
435                continue;
436            }
437
438            if let Some(score) = self.pick_sub_level(l0, level_handler, sst) {
439                scores.push(score);
440            }
441        }
442
443        if scores.is_empty() {
444            return vec![];
445        }
446
447        let mut expected = Vec::with_capacity(scores.len());
448        let mut unexpected = vec![];
449
450        for mut selected_task in scores {
451            if selected_task.sstable_infos.len() > self.max_expected_level_count
452                || selected_task.sstable_infos.len() < self.min_expected_level_count
453                || selected_task.total_file_size < self.min_compaction_bytes
454            {
455                selected_task.expected = false;
456                unexpected.push(selected_task);
457            } else {
458                selected_task.expected = true;
459                expected.push(selected_task);
460            }
461        }
462
463        // The logic of sorting depends on the interval we expect to select.
464        // 1. contain as many levels as possible
465        // 2. fewer files in the bottom sub level, containing as many smaller intervals as possible.
466        expected.sort_by(|a, b| {
467            b.sstable_infos
468                .len()
469                .cmp(&a.sstable_infos.len())
470                .then_with(|| a.total_file_count.cmp(&b.total_file_count))
471                .then_with(|| a.total_file_size.cmp(&b.total_file_size))
472        });
473
474        // For unexpected tasks, We devised a separate algorithm to evaluate the priority of a task, based on the limit passed in,
475        // we set tasks close to the limit to be high priority, here have three attributes:
476        // 1. The number of levels selected is close to the limit
477        // 2. The number of files selected is close to the limit
478        // 3. The size of the selected file is close to the limit
479        unexpected.sort_by(|a, b| {
480            let a_select_count_offset =
481                (a.sstable_infos.len() as i64 - self.max_expected_level_count as i64).abs();
482            let b_select_count_offset =
483                (b.sstable_infos.len() as i64 - self.max_expected_level_count as i64).abs();
484
485            let a_file_count_offset =
486                (a.total_file_count as i64 - self.max_file_count as i64).abs();
487            let b_file_count_offset =
488                (b.total_file_count as i64 - self.max_file_count as i64).abs();
489
490            let a_file_size_offset =
491                (a.total_file_size as i64 - self.max_compaction_bytes as i64).abs();
492            let b_file_size_offset =
493                (b.total_file_size as i64 - self.max_compaction_bytes as i64).abs();
494
495            a_select_count_offset
496                .cmp(&b_select_count_offset)
497                .then_with(|| a_file_count_offset.cmp(&b_file_count_offset))
498                .then_with(|| a_file_size_offset.cmp(&b_file_size_offset))
499        });
500
501        expected.extend(unexpected);
502
503        expected
504    }
505
506    fn verify_task_level_overlap(&self, ret: &SubLevelSstables, levels: &[Level]) {
507        use std::fmt::Write;
508
509        use itertools::Itertools;
510
511        use crate::hummock::compaction::overlap_strategy::OverlapInfo;
512        let mut overlap_info: Option<Box<dyn OverlapInfo>> = None;
513        for (level_idx, (_sub_level_id, ssts)) in ret.sstable_infos.iter().enumerate().rev() {
514            if let Some(overlap_info) = overlap_info.as_mut() {
515                // skip the check if `overlap_info` is not initialized (i.e. the first non-empty level is not met)
516                let level = levels.get(level_idx).unwrap();
517                let overlap_sst_range = overlap_info.check_multiple_overlap(&level.table_infos);
518                if !overlap_sst_range.is_empty() {
519                    let expected_sst_ids = level.table_infos[overlap_sst_range.clone()]
520                        .iter()
521                        .map(|s| s.object_id)
522                        .collect_vec();
523                    let actual_sst_ids = ssts.iter().map(|s| s.object_id).collect_vec();
524                    // `actual_sst_ids` can be larger than `expected_sst_ids` because we may use a larger key range to select SSTs.
525                    // `expected_sst_ids` must be a sub-range of `actual_sst_ids` to ensure correctness.
526                    let start_idx = actual_sst_ids
527                        .iter()
528                        .position(|sst_id| sst_id == expected_sst_ids.first().unwrap());
529                    if start_idx.is_none_or(|idx| {
530                        actual_sst_ids[idx..idx + expected_sst_ids.len()] != expected_sst_ids
531                    }) {
532                        // Print SstableInfo for `actual_sst_ids`
533                        let mut actual_sst_infos = String::new();
534                        ssts.iter()
535                            .for_each(|s| append_sstable_info_to_string(&mut actual_sst_infos, s));
536
537                        // Print SstableInfo for `expected_sst_ids`
538                        let mut expected_sst_infos = String::new();
539                        level.table_infos[overlap_sst_range.clone()]
540                            .iter()
541                            .for_each(|s| {
542                                append_sstable_info_to_string(&mut expected_sst_infos, s)
543                            });
544
545                        // Print SstableInfo for selected ssts in all sub-levels
546                        let mut ret_sst_infos = String::new();
547                        ret.sstable_infos.iter().enumerate().for_each(
548                            |(idx, (_sub_level_id, ssts))| {
549                                writeln!(
550                                    ret_sst_infos,
551                                    "sub level {}",
552                                    levels.get(idx).unwrap().sub_level_id
553                                )
554                                .unwrap();
555                                ssts.iter().for_each(|s| {
556                                    append_sstable_info_to_string(&mut ret_sst_infos, s)
557                                });
558                            },
559                        );
560                        panic!(
561                            "Compact task overlap check fails. Actual: {} Expected: {} Ret {}",
562                            actual_sst_infos, expected_sst_infos, ret_sst_infos
563                        );
564                    }
565                }
566            } else if !ssts.is_empty() {
567                // init the `overlap_info` when meeting the first non-empty level.
568                overlap_info = Some(self.overlap_strategy.create_overlap_info());
569            }
570
571            for sst in ssts {
572                overlap_info.as_mut().unwrap().update(&sst.key_range);
573            }
574        }
575    }
576}
577
578#[cfg(test)]
579pub mod tests {
580    use std::collections::BTreeSet;
581
582    use risingwave_common::config::default::compaction_config;
583
584    use super::*;
585    use crate::hummock::compaction::overlap_strategy::RangeOverlapStrategy;
586    use crate::hummock::compaction::selector::tests::{
587        generate_l0_nonoverlapping_sublevels, generate_table,
588    };
589
590    #[test]
591    fn test_compact_l1() {
592        let mut picker =
593            MinOverlappingPicker::new(1, 2, 10000, 0, Arc::new(RangeOverlapStrategy::default()));
594        let levels = vec![
595            Level {
596                level_idx: 1,
597                level_type: LevelType::Nonoverlapping,
598                table_infos: vec![
599                    generate_table(0, 1, 0, 100, 1),
600                    generate_table(1, 1, 101, 200, 1),
601                    generate_table(2, 1, 222, 300, 1),
602                ],
603                ..Default::default()
604            },
605            Level {
606                level_idx: 2,
607                level_type: LevelType::Nonoverlapping,
608                table_infos: vec![
609                    generate_table(4, 1, 0, 100, 1),
610                    generate_table(5, 1, 101, 150, 1),
611                    generate_table(6, 1, 151, 201, 1),
612                    generate_table(7, 1, 501, 800, 1),
613                    generate_table(8, 2, 301, 400, 1),
614                ],
615                ..Default::default()
616            },
617        ];
618        let levels = Levels {
619            levels,
620            l0: generate_l0_nonoverlapping_sublevels(vec![]),
621            ..Default::default()
622        };
623        let mut level_handlers = vec![
624            LevelHandler::new(0),
625            LevelHandler::new(1),
626            LevelHandler::new(2),
627        ];
628
629        // pick a non-overlapping files. It means that this file could be trivial move to next
630        // level.
631        let mut local_stats = LocalPickerStatistic::default();
632        let ret = picker
633            .pick_compaction(&levels, &level_handlers, &mut local_stats)
634            .unwrap();
635        assert_eq!(ret.input_levels[0].level_idx, 1);
636        assert_eq!(ret.target_level, 2);
637        assert_eq!(ret.input_levels[0].table_infos.len(), 1);
638        assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 2);
639        assert_eq!(ret.input_levels[1].table_infos.len(), 0);
640        ret.add_pending_task(0, &mut level_handlers);
641
642        let ret = picker
643            .pick_compaction(&levels, &level_handlers, &mut local_stats)
644            .unwrap();
645        assert_eq!(ret.input_levels[0].level_idx, 1);
646        assert_eq!(ret.target_level, 2);
647        assert_eq!(ret.input_levels[0].table_infos.len(), 1);
648        assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 0);
649        assert_eq!(ret.input_levels[1].table_infos.len(), 1);
650        assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 4);
651        ret.add_pending_task(1, &mut level_handlers);
652
653        let ret = picker
654            .pick_compaction(&levels, &level_handlers, &mut local_stats)
655            .unwrap();
656        assert_eq!(ret.input_levels[0].table_infos.len(), 1);
657        assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 1);
658        assert_eq!(ret.input_levels[1].table_infos.len(), 2);
659        assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 5);
660    }
661
662    #[test]
663    fn test_expand_l1_files() {
664        let mut picker =
665            MinOverlappingPicker::new(1, 2, 10000, 0, Arc::new(RangeOverlapStrategy::default()));
666        let levels = vec![
667            Level {
668                level_idx: 1,
669                level_type: LevelType::Nonoverlapping,
670                table_infos: vec![
671                    generate_table(0, 1, 50, 99, 2),
672                    generate_table(1, 1, 100, 149, 2),
673                    generate_table(2, 1, 150, 249, 2),
674                ],
675                ..Default::default()
676            },
677            Level {
678                level_idx: 2,
679                level_type: LevelType::Nonoverlapping,
680                table_infos: vec![
681                    generate_table(4, 1, 50, 199, 1),
682                    generate_table(5, 1, 200, 399, 1),
683                ],
684                ..Default::default()
685            },
686        ];
687        let levels = Levels {
688            levels,
689            l0: generate_l0_nonoverlapping_sublevels(vec![]),
690            ..Default::default()
691        };
692        let levels_handler = vec![
693            LevelHandler::new(0),
694            LevelHandler::new(1),
695            LevelHandler::new(2),
696        ];
697
698        // pick a non-overlapping files. It means that this file could be trivial move to next
699        // level.
700        let ret = picker
701            .pick_compaction(
702                &levels,
703                &levels_handler,
704                &mut LocalPickerStatistic::default(),
705            )
706            .unwrap();
707        assert_eq!(ret.input_levels[0].level_idx, 1);
708        assert_eq!(ret.input_levels[1].level_idx, 2);
709
710        assert_eq!(ret.input_levels[0].table_infos.len(), 2);
711        assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 0);
712        assert_eq!(ret.input_levels[0].table_infos[1].sst_id, 1);
713
714        assert_eq!(ret.input_levels[1].table_infos.len(), 1);
715        assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 4);
716    }
717
718    #[test]
719    fn test_pick_l0_multi_level() {
720        let levels = vec![
721            Level {
722                level_idx: 1,
723                level_type: LevelType::Nonoverlapping,
724                table_infos: vec![
725                    generate_table(0, 1, 50, 99, 2),
726                    generate_table(1, 1, 100, 149, 2),
727                    generate_table(2, 1, 150, 249, 2),
728                    generate_table(6, 1, 250, 300, 2),
729                    generate_table(7, 1, 350, 400, 2),
730                    generate_table(8, 1, 450, 500, 2),
731                ],
732                total_file_size: 800,
733                ..Default::default()
734            },
735            Level {
736                level_idx: 2,
737                level_type: LevelType::Nonoverlapping,
738                table_infos: vec![
739                    generate_table(4, 1, 50, 199, 1),
740                    generate_table(5, 1, 200, 249, 1),
741                    generate_table(9, 1, 250, 300, 2),
742                    generate_table(10, 1, 350, 400, 2),
743                    generate_table(11, 1, 450, 500, 2),
744                ],
745                total_file_size: 350,
746                ..Default::default()
747            },
748            Level {
749                level_idx: 3,
750                level_type: LevelType::Nonoverlapping,
751                table_infos: vec![
752                    generate_table(11, 1, 250, 300, 2),
753                    generate_table(12, 1, 350, 400, 2),
754                    generate_table(13, 1, 450, 500, 2),
755                ],
756                total_file_size: 150,
757                ..Default::default()
758            },
759            Level {
760                level_idx: 4,
761                level_type: LevelType::Nonoverlapping,
762                table_infos: vec![
763                    generate_table(14, 1, 250, 300, 2),
764                    generate_table(15, 1, 350, 400, 2),
765                    generate_table(16, 1, 450, 500, 2),
766                ],
767                total_file_size: 150,
768                ..Default::default()
769            },
770        ];
771
772        let levels_handlers = vec![
773            LevelHandler::new(0),
774            LevelHandler::new(1),
775            LevelHandler::new(2),
776        ];
777
778        {
779            // no limit
780            let picker = NonOverlapSubLevelPicker::new(
781                0,
782                10000,
783                1,
784                10000,
785                Arc::new(RangeOverlapStrategy::default()),
786                true,
787                compaction_config::max_l0_compact_level_count() as usize,
788                true,
789            );
790            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
791            assert_eq!(6, ret.len());
792        }
793
794        {
795            // limit max bytes
796            let picker = NonOverlapSubLevelPicker::new(
797                0,
798                100,
799                1,
800                10000,
801                Arc::new(RangeOverlapStrategy::default()),
802                true,
803                compaction_config::max_l0_compact_level_count() as usize,
804                true,
805            );
806            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
807            assert_eq!(6, ret.len());
808        }
809
810        {
811            // limit max file_count
812            let picker = NonOverlapSubLevelPicker::new(
813                0,
814                10000,
815                1,
816                5,
817                Arc::new(RangeOverlapStrategy::default()),
818                true,
819                compaction_config::max_l0_compact_level_count() as usize,
820                true,
821            );
822            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
823            assert_eq!(6, ret.len());
824        }
825    }
826
827    #[test]
828    fn test_pick_l0_multi_level2() {
829        let levels = vec![
830            Level {
831                level_idx: 1,
832                level_type: LevelType::Nonoverlapping,
833                table_infos: vec![
834                    generate_table(0, 1, 50, 99, 2),
835                    generate_table(1, 1, 100, 149, 2),
836                    generate_table(2, 1, 150, 249, 2),
837                    generate_table(6, 1, 250, 300, 2),
838                    generate_table(7, 1, 350, 400, 2),
839                    generate_table(8, 1, 450, 500, 2),
840                ],
841                total_file_size: 800,
842                ..Default::default()
843            },
844            Level {
845                level_idx: 2,
846                level_type: LevelType::Nonoverlapping,
847                table_infos: vec![
848                    generate_table(4, 1, 50, 99, 1),
849                    generate_table(5, 1, 150, 200, 1),
850                    generate_table(9, 1, 250, 300, 2),
851                    generate_table(10, 1, 350, 400, 2),
852                    generate_table(11, 1, 450, 500, 2),
853                ],
854                total_file_size: 250,
855                ..Default::default()
856            },
857            Level {
858                level_idx: 3,
859                level_type: LevelType::Nonoverlapping,
860                table_infos: vec![
861                    generate_table(11, 1, 250, 300, 2),
862                    generate_table(12, 1, 350, 400, 2),
863                    generate_table(13, 1, 450, 500, 2),
864                ],
865                total_file_size: 150,
866                ..Default::default()
867            },
868            Level {
869                level_idx: 4,
870                level_type: LevelType::Nonoverlapping,
871                table_infos: vec![
872                    generate_table(14, 1, 250, 300, 2),
873                    generate_table(15, 1, 350, 400, 2),
874                    generate_table(16, 1, 450, 500, 2),
875                ],
876                total_file_size: 150,
877                ..Default::default()
878            },
879        ];
880
881        let levels_handlers = vec![
882            LevelHandler::new(0),
883            LevelHandler::new(1),
884            LevelHandler::new(2),
885        ];
886
887        {
888            // no limit
889            let picker = NonOverlapSubLevelPicker::new(
890                0,
891                10000,
892                1,
893                10000,
894                Arc::new(RangeOverlapStrategy::default()),
895                true,
896                compaction_config::max_l0_compact_level_count() as usize,
897                true,
898            );
899            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
900            assert_eq!(6, ret.len());
901        }
902
903        {
904            // limit max bytes
905            let max_compaction_bytes = 100;
906            let picker = NonOverlapSubLevelPicker::new(
907                60,
908                max_compaction_bytes,
909                1,
910                10000,
911                Arc::new(RangeOverlapStrategy::default()),
912                true,
913                compaction_config::max_l0_compact_level_count() as usize,
914                true,
915            );
916            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
917            assert_eq!(6, ret.len());
918
919            for plan in &ret {
920                if plan.total_file_size >= max_compaction_bytes {
921                    assert!(plan.expected);
922                } else {
923                    assert!(!plan.expected);
924                }
925            }
926        }
927
928        {
929            // limit max file_count
930            let max_file_count = 2;
931            let picker = NonOverlapSubLevelPicker::new(
932                0,
933                10000,
934                1,
935                max_file_count,
936                Arc::new(RangeOverlapStrategy::default()),
937                true,
938                compaction_config::max_l0_compact_level_count() as usize,
939                true,
940            );
941            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
942            assert_eq!(6, ret.len());
943
944            for plan in ret {
945                let mut sst_id_set = BTreeSet::default();
946                for sst in &plan.sstable_infos {
947                    sst_id_set.insert(sst.1[0].sst_id);
948                }
949                assert!(sst_id_set.len() <= max_file_count as usize);
950            }
951        }
952
953        {
954            // limit min_depth
955            let min_depth = 3;
956            let picker = NonOverlapSubLevelPicker::new(
957                10,
958                10000,
959                min_depth,
960                100,
961                Arc::new(RangeOverlapStrategy::default()),
962                true,
963                compaction_config::max_l0_compact_level_count() as usize,
964                true,
965            );
966            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
967            assert_eq!(6, ret.len());
968
969            for plan in ret {
970                if plan.sstable_infos.len() >= min_depth {
971                    assert!(plan.expected);
972                } else {
973                    assert!(!plan.expected);
974                }
975            }
976        }
977    }
978
979    #[test]
980    fn test_trivial_move_bug() {
981        let levels = [
982            Level {
983                level_idx: 1,
984                level_type: LevelType::Nonoverlapping,
985                table_infos: vec![generate_table(0, 1, 400, 500, 2)],
986                total_file_size: 100,
987                ..Default::default()
988            },
989            Level {
990                level_idx: 2,
991                level_type: LevelType::Nonoverlapping,
992                table_infos: vec![
993                    generate_table(1, 1, 100, 200, 1),
994                    generate_table(2, 1, 600, 700, 1),
995                ],
996                total_file_size: 200,
997                ..Default::default()
998            },
999            Level {
1000                level_idx: 3,
1001                level_type: LevelType::Nonoverlapping,
1002                table_infos: vec![
1003                    generate_table(3, 1, 100, 300, 2),
1004                    generate_table(4, 1, 600, 800, 1),
1005                ],
1006                total_file_size: 400,
1007                ..Default::default()
1008            },
1009        ];
1010
1011        let levels_handlers = vec![
1012            LevelHandler::new(0),
1013            LevelHandler::new(1),
1014            LevelHandler::new(2),
1015            LevelHandler::new(3),
1016        ];
1017        // no limit
1018        let picker =
1019            MinOverlappingPicker::new(2, 3, 1000, 0, Arc::new(RangeOverlapStrategy::default()));
1020        let (select_files, target_files) = picker.pick_tables(
1021            &levels[1].table_infos,
1022            &levels[2].table_infos,
1023            &levels_handlers,
1024        );
1025        let overlap_strategy = Arc::new(RangeOverlapStrategy::default());
1026        let mut overlap_info = overlap_strategy.create_overlap_info();
1027        for sst in &select_files {
1028            overlap_info.update(&sst.key_range);
1029        }
1030        let range = overlap_info.check_multiple_overlap(&levels[0].table_infos);
1031        assert!(range.is_empty());
1032        assert_eq!(select_files.len(), 1);
1033        assert_eq!(target_files.len(), 1);
1034    }
1035
1036    #[test]
1037    fn test_pick_unexpected_task() {
1038        let levels = vec![
1039            Level {
1040                level_idx: 1,
1041                level_type: LevelType::Nonoverlapping,
1042                table_infos: vec![generate_table(0, 1, 50, 100, 2)], // 50
1043                total_file_size: 50,
1044                ..Default::default()
1045            },
1046            Level {
1047                level_idx: 2,
1048                level_type: LevelType::Nonoverlapping,
1049                table_infos: vec![
1050                    generate_table(1, 1, 101, 150, 1), // 50
1051                ],
1052                total_file_size: 50,
1053                ..Default::default()
1054            },
1055            Level {
1056                level_idx: 3,
1057                level_type: LevelType::Nonoverlapping,
1058                table_infos: vec![
1059                    generate_table(2, 1, 151, 200, 2), // 50
1060                ],
1061                total_file_size: 50,
1062                ..Default::default()
1063            },
1064            Level {
1065                level_idx: 4,
1066                level_type: LevelType::Nonoverlapping,
1067                table_infos: vec![
1068                    generate_table(3, 1, 50, 300, 2), // 250
1069                ],
1070                total_file_size: 250,
1071                ..Default::default()
1072            },
1073        ];
1074
1075        let levels_handlers = vec![
1076            LevelHandler::new(0),
1077            LevelHandler::new(1),
1078            LevelHandler::new(2),
1079            LevelHandler::new(3),
1080            LevelHandler::new(4),
1081        ];
1082
1083        {
1084            // no limit
1085            let picker = NonOverlapSubLevelPicker::new(
1086                0,
1087                10000,
1088                1,
1089                10000,
1090                Arc::new(RangeOverlapStrategy::default()),
1091                true,
1092                compaction_config::max_l0_compact_level_count() as usize,
1093                true,
1094            );
1095            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
1096            {
1097                let plan = &ret[0];
1098                assert_eq!(4, plan.sstable_infos.len());
1099            }
1100        }
1101
1102        {
1103            // limit size
1104            let picker = NonOverlapSubLevelPicker::new(
1105                0,
1106                150,
1107                1,
1108                10000,
1109                Arc::new(RangeOverlapStrategy::default()),
1110                true,
1111                compaction_config::max_l0_compact_level_count() as usize,
1112                true,
1113            );
1114            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
1115            {
1116                let plan = &ret[0];
1117                assert_eq!(3, plan.sstable_infos.len());
1118            }
1119        }
1120
1121        {
1122            // limit count
1123            let picker = NonOverlapSubLevelPicker::new(
1124                0,
1125                10000,
1126                1,
1127                3,
1128                Arc::new(RangeOverlapStrategy::default()),
1129                true,
1130                compaction_config::max_l0_compact_level_count() as usize,
1131                true,
1132            );
1133            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
1134            {
1135                let plan = &ret[0];
1136                assert_eq!(3, plan.sstable_infos.len());
1137            }
1138        }
1139
1140        {
1141            // limit expected level count
1142            let max_expected_level_count = 3;
1143            let picker = NonOverlapSubLevelPicker::for_test(
1144                0,
1145                10000,
1146                1,
1147                3,
1148                Arc::new(RangeOverlapStrategy::default()),
1149                true,
1150                max_expected_level_count,
1151                true,
1152            );
1153            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
1154            {
1155                let plan = &ret[0];
1156                assert_eq!(max_expected_level_count, plan.sstable_infos.len());
1157
1158                assert_eq!(0, plan.sstable_infos[0].1[0].sst_id);
1159                assert_eq!(1, plan.sstable_infos[1].1[0].sst_id);
1160                assert_eq!(2, plan.sstable_infos[2].1[0].sst_id);
1161            }
1162        }
1163
1164        {
1165            // limit min_compacaion_bytes
1166            let max_expected_level_count = 100;
1167            let picker = NonOverlapSubLevelPicker::for_test(
1168                1000,
1169                10000,
1170                1,
1171                100,
1172                Arc::new(RangeOverlapStrategy::default()),
1173                true,
1174                max_expected_level_count,
1175                true,
1176            );
1177            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
1178            {
1179                let plan = &ret[0];
1180
1181                assert_eq!(0, plan.sstable_infos[0].1[0].sst_id);
1182                assert_eq!(1, plan.sstable_infos[1].1[0].sst_id);
1183                assert_eq!(2, plan.sstable_infos[2].1[0].sst_id);
1184                assert_eq!(3, plan.sstable_infos[3].1[0].sst_id);
1185                assert!(!plan.expected);
1186            }
1187        }
1188    }
1189}