risingwave_meta/hummock/compaction/picker/
min_overlap_compaction_picker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_hummock_sdk::append_sstable_info_to_string;
18use risingwave_hummock_sdk::level::{InputLevel, Level, Levels};
19use risingwave_hummock_sdk::sstable_info::SstableInfo;
20use risingwave_pb::hummock::LevelType;
21
22use super::{CompactionInput, CompactionPicker, LocalPickerStatistic};
23use crate::hummock::compaction::overlap_strategy::OverlapStrategy;
24use crate::hummock::level_handler::LevelHandler;
25
26pub struct MinOverlappingPicker {
27    level: usize,
28    target_level: usize,
29    max_select_bytes: u64,
30    vnode_partition_count: u32,
31    overlap_strategy: Arc<dyn OverlapStrategy>,
32}
33
34impl MinOverlappingPicker {
35    pub fn new(
36        level: usize,
37        target_level: usize,
38        max_select_bytes: u64,
39        vnode_partition_count: u32,
40        overlap_strategy: Arc<dyn OverlapStrategy>,
41    ) -> MinOverlappingPicker {
42        MinOverlappingPicker {
43            level,
44            target_level,
45            max_select_bytes,
46            vnode_partition_count,
47            overlap_strategy,
48        }
49    }
50
51    pub fn pick_tables(
52        &self,
53        select_tables: &[SstableInfo],
54        target_tables: &[SstableInfo],
55        level_handlers: &[LevelHandler],
56    ) -> (Vec<SstableInfo>, Vec<SstableInfo>) {
57        let mut select_file_ranges = vec![];
58        for (idx, sst) in select_tables.iter().enumerate() {
59            if level_handlers[self.level].is_pending_compact(&sst.sst_id) {
60                continue;
61            }
62            let mut overlap_info = self.overlap_strategy.create_overlap_info();
63            overlap_info.update(sst);
64            let overlap_files_range = overlap_info.check_multiple_overlap(target_tables);
65
66            if overlap_files_range.is_empty() {
67                return (vec![sst.clone()], vec![]);
68            }
69            select_file_ranges.push((idx, overlap_files_range));
70        }
71        select_file_ranges.retain(|(_, range)| {
72            let mut pending_compact = false;
73            for other in &target_tables[range.clone()] {
74                if level_handlers[self.target_level].is_pending_compact(&other.sst_id) {
75                    pending_compact = true;
76                    break;
77                }
78            }
79            !pending_compact
80        });
81
82        let mut min_score = u64::MAX;
83        let mut min_score_select_range = 0..0;
84        let mut min_score_target_range = 0..0;
85        let mut min_score_select_file_size = 0;
86        for left in 0..select_file_ranges.len() {
87            let mut select_file_size = 0;
88            let mut target_level_overlap_range = select_file_ranges[left].1.clone();
89            let mut total_file_size = 0;
90            for other in &target_tables[target_level_overlap_range.clone()] {
91                total_file_size += other.sst_size;
92            }
93            let start_idx = select_file_ranges[left].0;
94            let mut end_idx = start_idx + 1;
95            for (idx, range) in select_file_ranges.iter().skip(left) {
96                if select_file_size > self.max_select_bytes
97                    || *idx > end_idx
98                    || range.start >= target_level_overlap_range.end
99                {
100                    break;
101                }
102                select_file_size += select_tables[*idx].sst_size;
103                if range.end > target_level_overlap_range.end {
104                    for other in &target_tables[target_level_overlap_range.end..range.end] {
105                        total_file_size += other.sst_size;
106                    }
107                    target_level_overlap_range.end = range.end;
108                }
109                let score = if select_file_size == 0 {
110                    total_file_size
111                } else {
112                    total_file_size * 100 / select_file_size
113                };
114                end_idx = idx + 1;
115                if score < min_score
116                    || (score == min_score && select_file_size < min_score_select_file_size)
117                {
118                    min_score = score;
119                    min_score_select_range = start_idx..end_idx;
120                    min_score_target_range = target_level_overlap_range.clone();
121                    min_score_select_file_size = select_file_size;
122                }
123            }
124        }
125        if min_score == u64::MAX {
126            return (vec![], vec![]);
127        }
128        let select_input_ssts = select_tables[min_score_select_range].to_vec();
129        let target_input_ssts = target_tables[min_score_target_range].to_vec();
130        (select_input_ssts, target_input_ssts)
131    }
132}
133
134impl CompactionPicker for MinOverlappingPicker {
135    fn pick_compaction(
136        &mut self,
137        levels: &Levels,
138        level_handlers: &[LevelHandler],
139        stats: &mut LocalPickerStatistic,
140    ) -> Option<CompactionInput> {
141        assert!(self.level > 0);
142        let (select_input_ssts, target_input_ssts) = self.pick_tables(
143            &levels.get_level(self.level).table_infos,
144            &levels.get_level(self.target_level).table_infos,
145            level_handlers,
146        );
147        if select_input_ssts.is_empty() {
148            stats.skip_by_pending_files += 1;
149            return None;
150        }
151        Some(CompactionInput {
152            select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
153            target_input_size: target_input_ssts.iter().map(|sst| sst.sst_size).sum(),
154            total_file_count: (select_input_ssts.len() + target_input_ssts.len()) as u64,
155            input_levels: vec![
156                InputLevel {
157                    level_idx: self.level as u32,
158                    level_type: LevelType::Nonoverlapping,
159                    table_infos: select_input_ssts,
160                },
161                InputLevel {
162                    level_idx: self.target_level as u32,
163                    level_type: LevelType::Nonoverlapping,
164                    table_infos: target_input_ssts,
165                },
166            ],
167            target_level: self.target_level,
168            vnode_partition_count: self.vnode_partition_count,
169            ..Default::default()
170        })
171    }
172}
173
174#[derive(Default, Debug)]
175pub struct SubLevelSstables {
176    pub total_file_size: u64,
177    pub total_file_count: usize,
178    pub sstable_infos: Vec<Vec<SstableInfo>>,
179    pub expected: bool,
180}
181
182pub struct NonOverlapSubLevelPicker {
183    min_compaction_bytes: u64,
184    max_compaction_bytes: u64,
185    min_expected_level_count: usize,
186    max_file_count: u64,
187    overlap_strategy: Arc<dyn OverlapStrategy>,
188    enable_check_task_level_overlap: bool,
189    max_expected_level_count: usize,
190}
191
192impl NonOverlapSubLevelPicker {
193    pub fn new(
194        min_compaction_bytes: u64,
195        max_compaction_bytes: u64,
196        min_expected_level_count: usize,
197        max_file_count: u64,
198        overlap_strategy: Arc<dyn OverlapStrategy>,
199        enable_check_task_level_overlap: bool,
200        max_expected_level_count: usize,
201    ) -> Self {
202        Self {
203            min_compaction_bytes,
204            max_compaction_bytes,
205            min_expected_level_count,
206            max_file_count,
207            overlap_strategy,
208            enable_check_task_level_overlap,
209            max_expected_level_count,
210        }
211    }
212
213    #[cfg(test)]
214    pub fn for_test(
215        min_compaction_bytes: u64,
216        max_compaction_bytes: u64,
217        min_expected_level_count: usize,
218        max_file_count: u64,
219        overlap_strategy: Arc<dyn OverlapStrategy>,
220        enable_check_task_level_overlap: bool,
221        max_expected_level_count: usize,
222    ) -> Self {
223        Self {
224            min_compaction_bytes,
225            max_compaction_bytes,
226            min_expected_level_count,
227            max_file_count,
228            overlap_strategy,
229            enable_check_task_level_overlap,
230            max_expected_level_count,
231        }
232    }
233
234    // Use the incoming sst as the basic range and select as many sub levels as possible
235    // 1. Build basic range based on sst
236    // 2. Add a new sub level in each round
237    // 3. Expand sst according to the basic range from new to old sub levels.
238    // 4. According to the size and count restrictions, select the plan that contains the most sub levels as much as possible
239    fn pick_sub_level(
240        &self,
241        levels: &[Level],
242        level_handler: &LevelHandler,
243        sst: &SstableInfo,
244    ) -> SubLevelSstables {
245        let mut ret = SubLevelSstables {
246            sstable_infos: vec![vec![]; levels.len()],
247            ..Default::default()
248        };
249
250        let mut pick_levels_range = Vec::default();
251        let mut max_select_level_count = 0;
252
253        // Pay attention to the order here: Make sure to select the lowest sub_level to meet the requirements of base compaction. If you break the assumption of this order, you need to redesign it.
254        // TODO: Use binary selection to replace the step algorithm to optimize algorithm complexity
255        'expand_new_level: for (target_index, target_level) in levels.iter().enumerate().skip(1) {
256            if target_level.level_type != LevelType::Nonoverlapping {
257                break;
258            }
259
260            if ret
261                .sstable_infos
262                .iter()
263                .filter(|ssts| !ssts.is_empty())
264                .count()
265                > self.max_expected_level_count
266            {
267                break;
268            }
269
270            // reset the `basic_overlap_info` with basic sst
271            let mut basic_overlap_info = self.overlap_strategy.create_overlap_info();
272            basic_overlap_info.update(sst);
273
274            let mut overlap_files_range =
275                basic_overlap_info.check_multiple_include(&target_level.table_infos);
276            if overlap_files_range.is_empty() {
277                overlap_files_range =
278                    basic_overlap_info.check_multiple_overlap(&target_level.table_infos);
279            }
280
281            if overlap_files_range.is_empty() {
282                continue;
283            }
284
285            let mut overlap_levels = vec![];
286
287            let mut add_files_size: u64 = 0;
288            let mut add_files_count: usize = 0;
289
290            let mut select_level_count = 0;
291            for reverse_index in (0..=target_index).rev() {
292                let target_tables = &levels[reverse_index].table_infos;
293
294                overlap_files_range = if target_index == reverse_index {
295                    overlap_files_range
296                } else {
297                    basic_overlap_info.check_multiple_overlap(target_tables)
298                };
299
300                // We allow a layer in the middle without overlap, so we need to continue to
301                // the next layer to search for overlap
302                if overlap_files_range.is_empty() {
303                    // empty level
304                    continue;
305                }
306
307                for other in &target_tables[overlap_files_range.clone()] {
308                    if level_handler.is_pending_compact(&other.sst_id) {
309                        break 'expand_new_level;
310                    }
311                    basic_overlap_info.update(other);
312
313                    add_files_size += other.sst_size;
314                    add_files_count += 1;
315                }
316
317                overlap_levels.push((reverse_index, overlap_files_range.clone()));
318                select_level_count += 1;
319            }
320
321            if select_level_count > max_select_level_count {
322                max_select_level_count = select_level_count;
323                pick_levels_range = overlap_levels;
324            }
325
326            // When size / file count has exceeded the limit, we need to abandon this plan, it cannot be expanded to the last sub_level
327            if max_select_level_count >= self.min_expected_level_count
328                && (add_files_size >= self.max_compaction_bytes
329                    || add_files_count >= self.max_file_count as usize)
330            {
331                break 'expand_new_level;
332            }
333        }
334
335        if !pick_levels_range.is_empty() {
336            for (reverse_index, sst_range) in pick_levels_range {
337                let level_ssts = &levels[reverse_index].table_infos;
338                ret.sstable_infos[reverse_index] = level_ssts[sst_range].to_vec();
339                ret.total_file_count += ret.sstable_infos[reverse_index].len();
340                ret.total_file_size += ret.sstable_infos[reverse_index]
341                    .iter()
342                    .map(|sst| sst.sst_size)
343                    .sum::<u64>();
344            }
345
346            // sort sst per level due to reverse expand
347            ret.sstable_infos.iter_mut().for_each(|level_ssts| {
348                level_ssts.sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
349            });
350        } else {
351            ret.total_file_count = 1;
352            ret.total_file_size = sst.sst_size;
353            ret.sstable_infos[0].extend(vec![sst.clone()]);
354        }
355
356        if self.enable_check_task_level_overlap {
357            self.verify_task_level_overlap(&ret, levels);
358        }
359
360        ret.sstable_infos.retain(|ssts| !ssts.is_empty());
361
362        // To check whether the task is expected
363        if ret.total_file_size > self.max_compaction_bytes
364            || ret.total_file_count as u64 > self.max_file_count
365            || ret.sstable_infos.len() > self.max_expected_level_count
366        {
367            // rotate the sstables to meet the `max_file_count` and `max_compaction_bytes` and `max_expected_level_count`
368            let mut total_file_count = 0;
369            let mut total_file_size = 0;
370            let mut total_level_count = 0;
371            for (index, sstables) in ret.sstable_infos.iter().enumerate() {
372                total_file_count += sstables.len();
373                total_file_size += sstables.iter().map(|sst| sst.sst_size).sum::<u64>();
374                total_level_count += 1;
375
376                // Atleast `min_expected_level_count`` level should be selected
377                if total_level_count >= self.min_expected_level_count
378                    && (total_file_count as u64 >= self.max_file_count
379                        || total_file_size >= self.max_compaction_bytes
380                        || total_level_count >= self.max_expected_level_count)
381                {
382                    ret.total_file_count = total_file_count;
383                    ret.total_file_size = total_file_size;
384                    ret.sstable_infos.truncate(index + 1);
385                    break;
386                }
387            }
388        }
389
390        ret
391    }
392
393    pub fn pick_l0_multi_non_overlap_level(
394        &self,
395        l0: &[Level],
396        level_handler: &LevelHandler,
397    ) -> Vec<SubLevelSstables> {
398        if l0.len() < self.min_expected_level_count {
399            return vec![];
400        }
401
402        let mut scores = vec![];
403        for sst in &l0[0].table_infos {
404            if level_handler.is_pending_compact(&sst.sst_id) {
405                continue;
406            }
407
408            scores.push(self.pick_sub_level(l0, level_handler, sst));
409        }
410
411        if scores.is_empty() {
412            return vec![];
413        }
414
415        let mut expected = Vec::with_capacity(scores.len());
416        let mut unexpected = vec![];
417
418        for mut selected_task in scores {
419            if selected_task.sstable_infos.len() > self.max_expected_level_count
420                || selected_task.sstable_infos.len() < self.min_expected_level_count
421                || selected_task.total_file_size < self.min_compaction_bytes
422            {
423                selected_task.expected = false;
424                unexpected.push(selected_task);
425            } else {
426                selected_task.expected = true;
427                expected.push(selected_task);
428            }
429        }
430
431        // The logic of sorting depends on the interval we expect to select.
432        // 1. contain as many levels as possible
433        // 2. fewer files in the bottom sub level, containing as many smaller intervals as possible.
434        expected.sort_by(|a, b| {
435            b.sstable_infos
436                .len()
437                .cmp(&a.sstable_infos.len())
438                .then_with(|| a.total_file_count.cmp(&b.total_file_count))
439                .then_with(|| a.total_file_size.cmp(&b.total_file_size))
440        });
441
442        // For unexpected tasks, We devised a separate algorithm to evaluate the priority of a task, based on the limit passed in,
443        // we set tasks close to the limit to be high priority, here have three attributes:
444        // 1. The number of levels selected is close to the limit
445        // 2. The number of files selected is close to the limit
446        // 3. The size of the selected file is close to the limit
447        unexpected.sort_by(|a, b| {
448            let a_select_count_offset =
449                (a.sstable_infos.len() as i64 - self.max_expected_level_count as i64).abs();
450            let b_select_count_offset =
451                (b.sstable_infos.len() as i64 - self.max_expected_level_count as i64).abs();
452
453            let a_file_count_offset =
454                (a.total_file_count as i64 - self.max_file_count as i64).abs();
455            let b_file_count_offset =
456                (b.total_file_count as i64 - self.max_file_count as i64).abs();
457
458            let a_file_size_offset =
459                (a.total_file_size as i64 - self.max_compaction_bytes as i64).abs();
460            let b_file_size_offset =
461                (b.total_file_size as i64 - self.max_compaction_bytes as i64).abs();
462
463            a_select_count_offset
464                .cmp(&b_select_count_offset)
465                .then_with(|| a_file_count_offset.cmp(&b_file_count_offset))
466                .then_with(|| a_file_size_offset.cmp(&b_file_size_offset))
467        });
468
469        expected.extend(unexpected);
470
471        expected
472    }
473
474    fn verify_task_level_overlap(&self, ret: &SubLevelSstables, levels: &[Level]) {
475        use std::fmt::Write;
476
477        use itertools::Itertools;
478
479        use crate::hummock::compaction::overlap_strategy::OverlapInfo;
480        let mut overlap_info: Option<Box<dyn OverlapInfo>> = None;
481        for (level_idx, ssts) in ret.sstable_infos.iter().enumerate().rev() {
482            if let Some(overlap_info) = overlap_info.as_mut() {
483                // skip the check if `overlap_info` is not initialized (i.e. the first non-empty level is not met)
484                let level = levels.get(level_idx).unwrap();
485                let overlap_sst_range = overlap_info.check_multiple_overlap(&level.table_infos);
486                if !overlap_sst_range.is_empty() {
487                    let expected_sst_ids = level.table_infos[overlap_sst_range.clone()]
488                        .iter()
489                        .map(|s| s.object_id)
490                        .collect_vec();
491                    let actual_sst_ids = ssts.iter().map(|s| s.object_id).collect_vec();
492                    // `actual_sst_ids` can be larger than `expected_sst_ids` because we may use a larger key range to select SSTs.
493                    // `expected_sst_ids` must be a sub-range of `actual_sst_ids` to ensure correctness.
494                    let start_idx = actual_sst_ids
495                        .iter()
496                        .position(|sst_id| sst_id == expected_sst_ids.first().unwrap());
497                    if start_idx.is_none_or(|idx| {
498                        actual_sst_ids[idx..idx + expected_sst_ids.len()] != expected_sst_ids
499                    }) {
500                        // Print SstableInfo for `actual_sst_ids`
501                        let mut actual_sst_infos = String::new();
502                        ssts.iter()
503                            .for_each(|s| append_sstable_info_to_string(&mut actual_sst_infos, s));
504
505                        // Print SstableInfo for `expected_sst_ids`
506                        let mut expected_sst_infos = String::new();
507                        level.table_infos[overlap_sst_range.clone()]
508                            .iter()
509                            .for_each(|s| {
510                                append_sstable_info_to_string(&mut expected_sst_infos, s)
511                            });
512
513                        // Print SstableInfo for selected ssts in all sub-levels
514                        let mut ret_sst_infos = String::new();
515                        ret.sstable_infos
516                            .iter()
517                            .enumerate()
518                            .for_each(|(idx, ssts)| {
519                                writeln!(
520                                    ret_sst_infos,
521                                    "sub level {}",
522                                    levels.get(idx).unwrap().sub_level_id
523                                )
524                                .unwrap();
525                                ssts.iter().for_each(|s| {
526                                    append_sstable_info_to_string(&mut ret_sst_infos, s)
527                                });
528                            });
529                        panic!(
530                            "Compact task overlap check fails. Actual: {} Expected: {} Ret {}",
531                            actual_sst_infos, expected_sst_infos, ret_sst_infos
532                        );
533                    }
534                }
535            } else if !ssts.is_empty() {
536                // init the `overlap_info` when meeting the first non-empty level.
537                overlap_info = Some(self.overlap_strategy.create_overlap_info());
538            }
539
540            for sst in ssts {
541                overlap_info.as_mut().unwrap().update(sst);
542            }
543        }
544    }
545}
546
547#[cfg(test)]
548pub mod tests {
549    use std::collections::BTreeSet;
550
551    use risingwave_common::config::default::compaction_config;
552
553    use super::*;
554    use crate::hummock::compaction::overlap_strategy::RangeOverlapStrategy;
555    use crate::hummock::compaction::selector::tests::{
556        generate_l0_nonoverlapping_sublevels, generate_table,
557    };
558
559    #[test]
560    fn test_compact_l1() {
561        let mut picker =
562            MinOverlappingPicker::new(1, 2, 10000, 0, Arc::new(RangeOverlapStrategy::default()));
563        let levels = vec![
564            Level {
565                level_idx: 1,
566                level_type: LevelType::Nonoverlapping,
567                table_infos: vec![
568                    generate_table(0, 1, 0, 100, 1),
569                    generate_table(1, 1, 101, 200, 1),
570                    generate_table(2, 1, 222, 300, 1),
571                ],
572                ..Default::default()
573            },
574            Level {
575                level_idx: 2,
576                level_type: LevelType::Nonoverlapping,
577                table_infos: vec![
578                    generate_table(4, 1, 0, 100, 1),
579                    generate_table(5, 1, 101, 150, 1),
580                    generate_table(6, 1, 151, 201, 1),
581                    generate_table(7, 1, 501, 800, 1),
582                    generate_table(8, 2, 301, 400, 1),
583                ],
584                ..Default::default()
585            },
586        ];
587        let levels = Levels {
588            levels,
589            l0: generate_l0_nonoverlapping_sublevels(vec![]),
590            ..Default::default()
591        };
592        let mut level_handlers = vec![
593            LevelHandler::new(0),
594            LevelHandler::new(1),
595            LevelHandler::new(2),
596        ];
597
598        // pick a non-overlapping files. It means that this file could be trivial move to next
599        // level.
600        let mut local_stats = LocalPickerStatistic::default();
601        let ret = picker
602            .pick_compaction(&levels, &level_handlers, &mut local_stats)
603            .unwrap();
604        assert_eq!(ret.input_levels[0].level_idx, 1);
605        assert_eq!(ret.target_level, 2);
606        assert_eq!(ret.input_levels[0].table_infos.len(), 1);
607        assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 2);
608        assert_eq!(ret.input_levels[1].table_infos.len(), 0);
609        ret.add_pending_task(0, &mut level_handlers);
610
611        let ret = picker
612            .pick_compaction(&levels, &level_handlers, &mut local_stats)
613            .unwrap();
614        assert_eq!(ret.input_levels[0].level_idx, 1);
615        assert_eq!(ret.target_level, 2);
616        assert_eq!(ret.input_levels[0].table_infos.len(), 1);
617        assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 0);
618        assert_eq!(ret.input_levels[1].table_infos.len(), 1);
619        assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 4);
620        ret.add_pending_task(1, &mut level_handlers);
621
622        let ret = picker
623            .pick_compaction(&levels, &level_handlers, &mut local_stats)
624            .unwrap();
625        assert_eq!(ret.input_levels[0].table_infos.len(), 1);
626        assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 1);
627        assert_eq!(ret.input_levels[1].table_infos.len(), 2);
628        assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 5);
629    }
630
631    #[test]
632    fn test_expand_l1_files() {
633        let mut picker =
634            MinOverlappingPicker::new(1, 2, 10000, 0, Arc::new(RangeOverlapStrategy::default()));
635        let levels = vec![
636            Level {
637                level_idx: 1,
638                level_type: LevelType::Nonoverlapping,
639                table_infos: vec![
640                    generate_table(0, 1, 50, 99, 2),
641                    generate_table(1, 1, 100, 149, 2),
642                    generate_table(2, 1, 150, 249, 2),
643                ],
644                ..Default::default()
645            },
646            Level {
647                level_idx: 2,
648                level_type: LevelType::Nonoverlapping,
649                table_infos: vec![
650                    generate_table(4, 1, 50, 199, 1),
651                    generate_table(5, 1, 200, 399, 1),
652                ],
653                ..Default::default()
654            },
655        ];
656        let levels = Levels {
657            levels,
658            l0: generate_l0_nonoverlapping_sublevels(vec![]),
659            ..Default::default()
660        };
661        let levels_handler = vec![
662            LevelHandler::new(0),
663            LevelHandler::new(1),
664            LevelHandler::new(2),
665        ];
666
667        // pick a non-overlapping files. It means that this file could be trivial move to next
668        // level.
669        let ret = picker
670            .pick_compaction(
671                &levels,
672                &levels_handler,
673                &mut LocalPickerStatistic::default(),
674            )
675            .unwrap();
676        assert_eq!(ret.input_levels[0].level_idx, 1);
677        assert_eq!(ret.input_levels[1].level_idx, 2);
678
679        assert_eq!(ret.input_levels[0].table_infos.len(), 2);
680        assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 0);
681        assert_eq!(ret.input_levels[0].table_infos[1].sst_id, 1);
682
683        assert_eq!(ret.input_levels[1].table_infos.len(), 1);
684        assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 4);
685    }
686
687    #[test]
688    fn test_pick_l0_multi_level() {
689        let levels = vec![
690            Level {
691                level_idx: 1,
692                level_type: LevelType::Nonoverlapping,
693                table_infos: vec![
694                    generate_table(0, 1, 50, 99, 2),
695                    generate_table(1, 1, 100, 149, 2),
696                    generate_table(2, 1, 150, 249, 2),
697                    generate_table(6, 1, 250, 300, 2),
698                    generate_table(7, 1, 350, 400, 2),
699                    generate_table(8, 1, 450, 500, 2),
700                ],
701                total_file_size: 800,
702                ..Default::default()
703            },
704            Level {
705                level_idx: 2,
706                level_type: LevelType::Nonoverlapping,
707                table_infos: vec![
708                    generate_table(4, 1, 50, 199, 1),
709                    generate_table(5, 1, 200, 249, 1),
710                    generate_table(9, 1, 250, 300, 2),
711                    generate_table(10, 1, 350, 400, 2),
712                    generate_table(11, 1, 450, 500, 2),
713                ],
714                total_file_size: 350,
715                ..Default::default()
716            },
717            Level {
718                level_idx: 3,
719                level_type: LevelType::Nonoverlapping,
720                table_infos: vec![
721                    generate_table(11, 1, 250, 300, 2),
722                    generate_table(12, 1, 350, 400, 2),
723                    generate_table(13, 1, 450, 500, 2),
724                ],
725                total_file_size: 150,
726                ..Default::default()
727            },
728            Level {
729                level_idx: 4,
730                level_type: LevelType::Nonoverlapping,
731                table_infos: vec![
732                    generate_table(14, 1, 250, 300, 2),
733                    generate_table(15, 1, 350, 400, 2),
734                    generate_table(16, 1, 450, 500, 2),
735                ],
736                total_file_size: 150,
737                ..Default::default()
738            },
739        ];
740
741        let levels_handlers = vec![
742            LevelHandler::new(0),
743            LevelHandler::new(1),
744            LevelHandler::new(2),
745        ];
746
747        {
748            // no limit
749            let picker = NonOverlapSubLevelPicker::new(
750                0,
751                10000,
752                1,
753                10000,
754                Arc::new(RangeOverlapStrategy::default()),
755                true,
756                compaction_config::max_l0_compact_level_count() as usize,
757            );
758            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
759            assert_eq!(6, ret.len());
760        }
761
762        {
763            // limit max bytes
764            let picker = NonOverlapSubLevelPicker::new(
765                0,
766                100,
767                1,
768                10000,
769                Arc::new(RangeOverlapStrategy::default()),
770                true,
771                compaction_config::max_l0_compact_level_count() as usize,
772            );
773            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
774            assert_eq!(6, ret.len());
775        }
776
777        {
778            // limit max file_count
779            let picker = NonOverlapSubLevelPicker::new(
780                0,
781                10000,
782                1,
783                5,
784                Arc::new(RangeOverlapStrategy::default()),
785                true,
786                compaction_config::max_l0_compact_level_count() as usize,
787            );
788            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
789            assert_eq!(6, ret.len());
790        }
791    }
792
793    #[test]
794    fn test_pick_l0_multi_level2() {
795        let levels = vec![
796            Level {
797                level_idx: 1,
798                level_type: LevelType::Nonoverlapping,
799                table_infos: vec![
800                    generate_table(0, 1, 50, 99, 2),
801                    generate_table(1, 1, 100, 149, 2),
802                    generate_table(2, 1, 150, 249, 2),
803                    generate_table(6, 1, 250, 300, 2),
804                    generate_table(7, 1, 350, 400, 2),
805                    generate_table(8, 1, 450, 500, 2),
806                ],
807                total_file_size: 800,
808                ..Default::default()
809            },
810            Level {
811                level_idx: 2,
812                level_type: LevelType::Nonoverlapping,
813                table_infos: vec![
814                    generate_table(4, 1, 50, 99, 1),
815                    generate_table(5, 1, 150, 200, 1),
816                    generate_table(9, 1, 250, 300, 2),
817                    generate_table(10, 1, 350, 400, 2),
818                    generate_table(11, 1, 450, 500, 2),
819                ],
820                total_file_size: 250,
821                ..Default::default()
822            },
823            Level {
824                level_idx: 3,
825                level_type: LevelType::Nonoverlapping,
826                table_infos: vec![
827                    generate_table(11, 1, 250, 300, 2),
828                    generate_table(12, 1, 350, 400, 2),
829                    generate_table(13, 1, 450, 500, 2),
830                ],
831                total_file_size: 150,
832                ..Default::default()
833            },
834            Level {
835                level_idx: 4,
836                level_type: LevelType::Nonoverlapping,
837                table_infos: vec![
838                    generate_table(14, 1, 250, 300, 2),
839                    generate_table(15, 1, 350, 400, 2),
840                    generate_table(16, 1, 450, 500, 2),
841                ],
842                total_file_size: 150,
843                ..Default::default()
844            },
845        ];
846
847        let levels_handlers = vec![
848            LevelHandler::new(0),
849            LevelHandler::new(1),
850            LevelHandler::new(2),
851        ];
852
853        {
854            // no limit
855            let picker = NonOverlapSubLevelPicker::new(
856                0,
857                10000,
858                1,
859                10000,
860                Arc::new(RangeOverlapStrategy::default()),
861                true,
862                compaction_config::max_l0_compact_level_count() as usize,
863            );
864            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
865            assert_eq!(6, ret.len());
866        }
867
868        {
869            // limit max bytes
870            let max_compaction_bytes = 100;
871            let picker = NonOverlapSubLevelPicker::new(
872                60,
873                max_compaction_bytes,
874                1,
875                10000,
876                Arc::new(RangeOverlapStrategy::default()),
877                true,
878                compaction_config::max_l0_compact_level_count() as usize,
879            );
880            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
881            assert_eq!(6, ret.len());
882
883            for plan in &ret {
884                if plan.total_file_size >= max_compaction_bytes {
885                    assert!(plan.expected);
886                } else {
887                    assert!(!plan.expected);
888                }
889            }
890        }
891
892        {
893            // limit max file_count
894            let max_file_count = 2;
895            let picker = NonOverlapSubLevelPicker::new(
896                0,
897                10000,
898                1,
899                max_file_count,
900                Arc::new(RangeOverlapStrategy::default()),
901                true,
902                compaction_config::max_l0_compact_level_count() as usize,
903            );
904            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
905            assert_eq!(6, ret.len());
906
907            for plan in ret {
908                let mut sst_id_set = BTreeSet::default();
909                for sst in &plan.sstable_infos {
910                    sst_id_set.insert(sst[0].sst_id);
911                }
912                assert!(sst_id_set.len() <= max_file_count as usize);
913            }
914        }
915
916        {
917            // limit min_depth
918            let min_depth = 3;
919            let picker = NonOverlapSubLevelPicker::new(
920                10,
921                10000,
922                min_depth,
923                100,
924                Arc::new(RangeOverlapStrategy::default()),
925                true,
926                compaction_config::max_l0_compact_level_count() as usize,
927            );
928            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
929            assert_eq!(6, ret.len());
930
931            for plan in ret {
932                if plan.sstable_infos.len() >= min_depth {
933                    assert!(plan.expected);
934                } else {
935                    assert!(!plan.expected);
936                }
937            }
938        }
939    }
940
941    #[test]
942    fn test_trivial_move_bug() {
943        let levels = [
944            Level {
945                level_idx: 1,
946                level_type: LevelType::Nonoverlapping,
947                table_infos: vec![generate_table(0, 1, 400, 500, 2)],
948                total_file_size: 100,
949                ..Default::default()
950            },
951            Level {
952                level_idx: 2,
953                level_type: LevelType::Nonoverlapping,
954                table_infos: vec![
955                    generate_table(1, 1, 100, 200, 1),
956                    generate_table(2, 1, 600, 700, 1),
957                ],
958                total_file_size: 200,
959                ..Default::default()
960            },
961            Level {
962                level_idx: 3,
963                level_type: LevelType::Nonoverlapping,
964                table_infos: vec![
965                    generate_table(3, 1, 100, 300, 2),
966                    generate_table(4, 1, 600, 800, 1),
967                ],
968                total_file_size: 400,
969                ..Default::default()
970            },
971        ];
972
973        let levels_handlers = vec![
974            LevelHandler::new(0),
975            LevelHandler::new(1),
976            LevelHandler::new(2),
977            LevelHandler::new(3),
978        ];
979        // no limit
980        let picker =
981            MinOverlappingPicker::new(2, 3, 1000, 0, Arc::new(RangeOverlapStrategy::default()));
982        let (select_files, target_files) = picker.pick_tables(
983            &levels[1].table_infos,
984            &levels[2].table_infos,
985            &levels_handlers,
986        );
987        let overlap_strategy = Arc::new(RangeOverlapStrategy::default());
988        let mut overlap_info = overlap_strategy.create_overlap_info();
989        for sst in &select_files {
990            overlap_info.update(sst);
991        }
992        let range = overlap_info.check_multiple_overlap(&levels[0].table_infos);
993        assert!(range.is_empty());
994        assert_eq!(select_files.len(), 1);
995        assert_eq!(target_files.len(), 1);
996    }
997
998    #[test]
999    fn test_pick_unexpected_task() {
1000        let levels = vec![
1001            Level {
1002                level_idx: 1,
1003                level_type: LevelType::Nonoverlapping,
1004                table_infos: vec![generate_table(0, 1, 50, 100, 2)], // 50
1005                total_file_size: 50,
1006                ..Default::default()
1007            },
1008            Level {
1009                level_idx: 2,
1010                level_type: LevelType::Nonoverlapping,
1011                table_infos: vec![
1012                    generate_table(1, 1, 101, 150, 1), // 50
1013                ],
1014                total_file_size: 50,
1015                ..Default::default()
1016            },
1017            Level {
1018                level_idx: 3,
1019                level_type: LevelType::Nonoverlapping,
1020                table_infos: vec![
1021                    generate_table(2, 1, 151, 200, 2), // 50
1022                ],
1023                total_file_size: 50,
1024                ..Default::default()
1025            },
1026            Level {
1027                level_idx: 4,
1028                level_type: LevelType::Nonoverlapping,
1029                table_infos: vec![
1030                    generate_table(3, 1, 50, 300, 2), // 250
1031                ],
1032                total_file_size: 250,
1033                ..Default::default()
1034            },
1035        ];
1036
1037        let levels_handlers = vec![
1038            LevelHandler::new(0),
1039            LevelHandler::new(1),
1040            LevelHandler::new(2),
1041            LevelHandler::new(3),
1042            LevelHandler::new(4),
1043        ];
1044
1045        {
1046            // no limit
1047            let picker = NonOverlapSubLevelPicker::new(
1048                0,
1049                10000,
1050                1,
1051                10000,
1052                Arc::new(RangeOverlapStrategy::default()),
1053                true,
1054                compaction_config::max_l0_compact_level_count() as usize,
1055            );
1056            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
1057            {
1058                let plan = &ret[0];
1059                assert_eq!(4, plan.sstable_infos.len());
1060            }
1061        }
1062
1063        {
1064            // limit size
1065            let picker = NonOverlapSubLevelPicker::new(
1066                0,
1067                150,
1068                1,
1069                10000,
1070                Arc::new(RangeOverlapStrategy::default()),
1071                true,
1072                compaction_config::max_l0_compact_level_count() as usize,
1073            );
1074            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
1075            {
1076                let plan = &ret[0];
1077                assert_eq!(3, plan.sstable_infos.len());
1078            }
1079        }
1080
1081        {
1082            // limit count
1083            let picker = NonOverlapSubLevelPicker::new(
1084                0,
1085                10000,
1086                1,
1087                3,
1088                Arc::new(RangeOverlapStrategy::default()),
1089                true,
1090                compaction_config::max_l0_compact_level_count() as usize,
1091            );
1092            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
1093            {
1094                let plan = &ret[0];
1095                assert_eq!(3, plan.sstable_infos.len());
1096            }
1097        }
1098
1099        {
1100            // limit expected level count
1101            let max_expected_level_count = 3;
1102            let picker = NonOverlapSubLevelPicker::for_test(
1103                0,
1104                10000,
1105                1,
1106                3,
1107                Arc::new(RangeOverlapStrategy::default()),
1108                true,
1109                max_expected_level_count,
1110            );
1111            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
1112            {
1113                let plan = &ret[0];
1114                assert_eq!(max_expected_level_count, plan.sstable_infos.len());
1115
1116                assert_eq!(0, plan.sstable_infos[0][0].sst_id);
1117                assert_eq!(1, plan.sstable_infos[1][0].sst_id);
1118                assert_eq!(2, plan.sstable_infos[2][0].sst_id);
1119            }
1120        }
1121
1122        {
1123            // limit min_compacaion_bytes
1124            let max_expected_level_count = 100;
1125            let picker = NonOverlapSubLevelPicker::for_test(
1126                1000,
1127                10000,
1128                1,
1129                100,
1130                Arc::new(RangeOverlapStrategy::default()),
1131                true,
1132                max_expected_level_count,
1133            );
1134            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
1135            {
1136                let plan = &ret[0];
1137
1138                assert_eq!(0, plan.sstable_infos[0][0].sst_id);
1139                assert_eq!(1, plan.sstable_infos[1][0].sst_id);
1140                assert_eq!(2, plan.sstable_infos[2][0].sst_id);
1141                assert_eq!(3, plan.sstable_infos[3][0].sst_id);
1142                assert!(!plan.expected);
1143            }
1144        }
1145    }
1146}