risingwave_meta/hummock/compaction/picker/
non_overlap_sub_level_picker.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Write;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_hummock_sdk::append_sstable_info_to_string;
20use risingwave_hummock_sdk::key_range::KeyRange;
21use risingwave_hummock_sdk::level::Level;
22use risingwave_hummock_sdk::sstable_info::SstableInfo;
23use risingwave_pb::hummock::LevelType;
24
25use crate::hummock::compaction::overlap_strategy::{OverlapInfo, OverlapStrategy};
26use crate::hummock::level_handler::LevelHandler;
27
28#[derive(Default, Debug)]
29pub struct SubLevelSstables {
30    pub total_file_size: u64,
31    pub total_file_count: usize,
32    pub sstable_infos: Vec<(u64, Vec<SstableInfo>)>,
33    pub expected: bool,
34}
35
36pub struct NonOverlapSubLevelPicker {
37    min_compaction_bytes: u64,
38    max_compaction_bytes: u64,
39    min_expected_level_count: usize,
40    max_file_count: u64,
41    overlap_strategy: Arc<dyn OverlapStrategy>,
42    enable_check_task_level_overlap: bool,
43    max_expected_level_count: usize,
44    enable_optimize_l0_interval_selection: bool,
45}
46
47impl NonOverlapSubLevelPicker {
48    /// Pick the sub-level used to iterate candidate intervals:
49    /// - ignore overlapping sub-levels
50    /// - without optimization: choose the first non-overlapping sub-level
51    /// - with optimization: choose the non-overlapping sub-level with the most non-pending sstables
52    ///   (then by total file count, then by index)
53    ///   Returns `None` when no non-overlapping sub-level exists.
54    fn select_intervals<'a>(
55        &self,
56        l0: &'a [Level],
57        level_handler: &LevelHandler,
58    ) -> Option<&'a [SstableInfo]> {
59        if !self.enable_optimize_l0_interval_selection {
60            return l0.first().map(|level| {
61                debug_assert_eq!(level.level_type, LevelType::Nonoverlapping);
62                level.table_infos.as_slice()
63            });
64        }
65
66        // Directly find the best non-overlapping level without allocating intermediate Vec
67        let level_idx = l0
68            .iter()
69            .enumerate()
70            .filter(|(_, level)| level.level_type == LevelType::Nonoverlapping)
71            .max_by(|(idx1, levels1), (idx2, levels2)| {
72                let non_pending1 = levels1
73                    .table_infos
74                    .iter()
75                    .filter(|sst| !level_handler.is_pending_compact(&sst.sst_id))
76                    .count();
77                let non_pending2 = levels2
78                    .table_infos
79                    .iter()
80                    .filter(|sst| !level_handler.is_pending_compact(&sst.sst_id))
81                    .count();
82
83                non_pending1
84                    .cmp(&non_pending2)
85                    .then_with(|| levels1.table_infos.len().cmp(&levels2.table_infos.len()))
86                    .then(idx2.cmp(idx1))
87            })
88            .map(|(idx, _)| idx)?;
89
90        Some(&l0[level_idx].table_infos)
91    }
92
93    pub fn new(
94        min_compaction_bytes: u64,
95        max_compaction_bytes: u64,
96        min_expected_level_count: usize,
97        max_file_count: u64,
98        overlap_strategy: Arc<dyn OverlapStrategy>,
99        enable_check_task_level_overlap: bool,
100        max_expected_level_count: usize,
101        enable_optimize_l0_interval_selection: bool,
102    ) -> Self {
103        Self {
104            min_compaction_bytes,
105            max_compaction_bytes,
106            min_expected_level_count,
107            max_file_count,
108            overlap_strategy,
109            enable_check_task_level_overlap,
110            max_expected_level_count,
111            enable_optimize_l0_interval_selection,
112        }
113    }
114
115    #[cfg(test)]
116    pub fn for_test(
117        min_compaction_bytes: u64,
118        max_compaction_bytes: u64,
119        min_expected_level_count: usize,
120        max_file_count: u64,
121        overlap_strategy: Arc<dyn OverlapStrategy>,
122        enable_check_task_level_overlap: bool,
123        max_expected_level_count: usize,
124        enable_optimize_l0_interval_selection: bool,
125    ) -> Self {
126        Self {
127            min_compaction_bytes,
128            max_compaction_bytes,
129            min_expected_level_count,
130            max_file_count,
131            overlap_strategy,
132            enable_check_task_level_overlap,
133            max_expected_level_count,
134            enable_optimize_l0_interval_selection,
135        }
136    }
137
138    /// Selects overlapping SSTs across multiple L0 sub-levels for a given key range.
139    ///
140    /// For each possible target sub-level, expands backwards (toward older sub-levels)
141    /// to collect overlapping SSTs. Returns the expansion covering the most sub-levels
142    /// within configured size and count limits.
143    ///
144    /// # Arguments
145    ///
146    /// * `levels` - L0 sub-levels ordered from oldest to newest
147    /// * `level_handler` - Tracks which SSTs are currently being compacted
148    /// * `key_range` - Initial key range to search for overlaps
149    ///
150    /// # Returns
151    ///
152    /// `Some` with selected SSTs grouped by sub-level, or `None` if no valid selection exists.
153    fn pick_sub_level(
154        &self,
155        levels: &[Level],
156        level_handler: &LevelHandler,
157        key_range: &KeyRange,
158    ) -> Option<SubLevelSstables> {
159        let mut ret = SubLevelSstables::default();
160        for sub_level in levels {
161            ret.sstable_infos.push((sub_level.sub_level_id, vec![]));
162        }
163
164        let mut pick_levels_range = Vec::default();
165        let mut max_select_level_count = 0;
166
167        // Pay attention to the order here: Make sure to select the lowest sub_level to meet the requirements of base compaction. If you break the assumption of this order, you need to redesign it.
168        // TODO: Use binary selection to replace the step algorithm to optimize algorithm complexity
169        'expand_new_level: for (target_index, target_level) in levels.iter().enumerate() {
170            if target_level.level_type != LevelType::Nonoverlapping {
171                break;
172            }
173
174            if ret
175                .sstable_infos
176                .iter()
177                .filter(|(_sub_level_id, ssts)| !ssts.is_empty())
178                .count()
179                > self.max_expected_level_count
180            {
181                break;
182            }
183
184            // reset the `basic_overlap_info` with basic sst
185            let mut basic_overlap_info = self.overlap_strategy.create_overlap_info();
186            basic_overlap_info.update(key_range);
187
188            let mut overlap_files_range =
189                basic_overlap_info.check_multiple_include(&target_level.table_infos);
190            if overlap_files_range.is_empty() {
191                overlap_files_range =
192                    basic_overlap_info.check_multiple_overlap(&target_level.table_infos);
193            }
194
195            if overlap_files_range.is_empty() {
196                continue;
197            }
198
199            let mut overlap_levels = vec![];
200
201            let mut add_files_size: u64 = 0;
202            let mut add_files_count: usize = 0;
203
204            let mut select_level_count = 0;
205            for reverse_index in (0..=target_index).rev() {
206                let target_tables = &levels[reverse_index].table_infos;
207
208                overlap_files_range = if target_index == reverse_index {
209                    overlap_files_range
210                } else {
211                    basic_overlap_info.check_multiple_overlap(target_tables)
212                };
213
214                // We allow a layer in the middle without overlap, so we need to continue to
215                // the next layer to search for overlap
216                if overlap_files_range.is_empty() {
217                    // empty level
218                    continue;
219                }
220
221                for other in &target_tables[overlap_files_range.clone()] {
222                    if level_handler.is_pending_compact(&other.sst_id) {
223                        break 'expand_new_level;
224                    }
225                    basic_overlap_info.update(&other.key_range);
226
227                    add_files_size += other.sst_size;
228                    add_files_count += 1;
229                }
230
231                overlap_levels.push((reverse_index, overlap_files_range.clone()));
232                select_level_count += 1;
233            }
234
235            if select_level_count > max_select_level_count {
236                max_select_level_count = select_level_count;
237                pick_levels_range = overlap_levels;
238            }
239
240            // When size / file count has exceeded the limit, we need to abandon this plan, it cannot be expanded to the last sub_level
241            if max_select_level_count >= self.min_expected_level_count
242                && (add_files_size >= self.max_compaction_bytes
243                    || add_files_count >= self.max_file_count as usize)
244            {
245                break 'expand_new_level;
246            }
247        }
248
249        if !pick_levels_range.is_empty() {
250            for (reverse_index, sst_range) in pick_levels_range {
251                let level_ssts = &levels[reverse_index].table_infos;
252                ret.sstable_infos[reverse_index].1 = level_ssts[sst_range].to_vec();
253                ret.total_file_count += ret.sstable_infos[reverse_index].1.len();
254                ret.total_file_size += ret.sstable_infos[reverse_index]
255                    .1
256                    .iter()
257                    .map(|sst| sst.sst_size)
258                    .sum::<u64>();
259            }
260
261            // sort sst per level due to reverse expand
262            ret.sstable_infos
263                .iter_mut()
264                .for_each(|(_sub_level_id, level_ssts)| {
265                    level_ssts.sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
266                });
267        } else {
268            ret.total_file_count = 0;
269            ret.total_file_size = 0;
270        }
271
272        if self.enable_check_task_level_overlap {
273            self.verify_task_level_overlap(&ret, levels);
274        }
275
276        ret.sstable_infos
277            .retain(|(_sub_level_id, ssts)| !ssts.is_empty());
278
279        // To check whether the task is expected
280        if ret.total_file_size > self.max_compaction_bytes
281            || ret.total_file_count as u64 > self.max_file_count
282            || ret.sstable_infos.len() > self.max_expected_level_count
283        {
284            // rotate the sstables to meet the `max_file_count` and `max_compaction_bytes` and `max_expected_level_count`
285            let mut total_file_count = 0;
286            let mut total_file_size = 0;
287            let mut total_level_count = 0;
288            for (index, (_sub_level_id, sstables)) in ret.sstable_infos.iter().enumerate() {
289                total_file_count += sstables.len();
290                total_file_size += sstables.iter().map(|sst| sst.sst_size).sum::<u64>();
291                total_level_count += 1;
292
293                // Atleast `min_expected_level_count`` level should be selected
294                if total_level_count >= self.min_expected_level_count
295                    && (total_file_count as u64 >= self.max_file_count
296                        || total_file_size >= self.max_compaction_bytes
297                        || total_level_count >= self.max_expected_level_count)
298                {
299                    ret.total_file_count = total_file_count;
300                    ret.total_file_size = total_file_size;
301                    ret.sstable_infos.truncate(index + 1);
302                    break;
303                }
304            }
305        }
306
307        if ret.sstable_infos.is_empty() {
308            return None;
309        }
310
311        Some(ret)
312    }
313
314    /// Generates multiple compaction task candidates from L0 sub-levels.
315    ///
316    /// Iterates through candidate SSTs from a selected interval sub-level, calls
317    /// `pick_sub_level` for each to form tasks, then classifies and sorts them
318    /// by priority ("expected" tasks before "unexpected" ones).
319    ///
320    /// # Arguments
321    ///
322    /// * `l0` - L0 sub-levels ordered from oldest to newest
323    /// * `level_handler` - Tracks which SSTs are currently being compacted
324    ///
325    /// # Returns
326    ///
327    /// Sorted vector of task candidates, or empty if insufficient sub-levels exist.
328    /// Tasks are marked "expected" when satisfying configured size and level count thresholds.
329    pub fn pick_l0_multi_non_overlap_level(
330        &self,
331        l0: &[Level],
332        level_handler: &LevelHandler,
333    ) -> Vec<SubLevelSstables> {
334        if l0.len() < self.min_expected_level_count {
335            return vec![];
336        }
337
338        let mut scores = vec![];
339        let Some(intervals) = self.select_intervals(l0, level_handler) else {
340            return vec![];
341        };
342        for sst in intervals {
343            if level_handler.is_pending_compact(&sst.sst_id) {
344                continue;
345            }
346
347            if let Some(score) = self.pick_sub_level(l0, level_handler, &sst.key_range) {
348                scores.push(score);
349            }
350        }
351
352        if scores.is_empty() {
353            return vec![];
354        }
355
356        let mut expected = Vec::with_capacity(scores.len());
357        let mut unexpected = vec![];
358
359        for mut selected_task in scores {
360            if selected_task.sstable_infos.len() > self.max_expected_level_count
361                || selected_task.sstable_infos.len() < self.min_expected_level_count
362                || selected_task.total_file_size < self.min_compaction_bytes
363            {
364                selected_task.expected = false;
365                unexpected.push(selected_task);
366            } else {
367                selected_task.expected = true;
368                expected.push(selected_task);
369            }
370        }
371
372        // The logic of sorting depends on the interval we expect to select.
373        // 1. contain as many levels as possible
374        // 2. fewer files in the bottom sub level, containing as many smaller intervals as possible.
375        expected.sort_by(|a, b| {
376            b.sstable_infos
377                .len()
378                .cmp(&a.sstable_infos.len())
379                .then_with(|| a.total_file_count.cmp(&b.total_file_count))
380                .then_with(|| a.total_file_size.cmp(&b.total_file_size))
381        });
382
383        // For unexpected tasks, We devised a separate algorithm to evaluate the priority of a task, based on the limit passed in,
384        // we set tasks close to the limit to be high priority, here have three attributes:
385        // 1. The number of levels selected is close to the limit
386        // 2. The number of files selected is close to the limit
387        // 3. The size of the selected file is close to the limit
388        unexpected.sort_by(|a, b| {
389            let a_select_count_offset =
390                (a.sstable_infos.len() as i64 - self.max_expected_level_count as i64).abs();
391            let b_select_count_offset =
392                (b.sstable_infos.len() as i64 - self.max_expected_level_count as i64).abs();
393
394            let a_file_count_offset =
395                (a.total_file_count as i64 - self.max_file_count as i64).abs();
396            let b_file_count_offset =
397                (b.total_file_count as i64 - self.max_file_count as i64).abs();
398
399            let a_file_size_offset =
400                (a.total_file_size as i64 - self.max_compaction_bytes as i64).abs();
401            let b_file_size_offset =
402                (b.total_file_size as i64 - self.max_compaction_bytes as i64).abs();
403
404            a_select_count_offset
405                .cmp(&b_select_count_offset)
406                .then_with(|| a_file_count_offset.cmp(&b_file_count_offset))
407                .then_with(|| a_file_size_offset.cmp(&b_file_size_offset))
408        });
409
410        expected.extend(unexpected);
411
412        expected
413    }
414
415    fn verify_task_level_overlap(&self, ret: &SubLevelSstables, levels: &[Level]) {
416        let mut overlap_info: Option<Box<dyn OverlapInfo>> = None;
417        for (level_idx, (_sub_level_id, ssts)) in ret.sstable_infos.iter().enumerate().rev() {
418            if let Some(overlap_info) = overlap_info.as_mut() {
419                // skip the check if `overlap_info` is not initialized (i.e. the first non-empty level is not met)
420                let level = levels.get(level_idx).unwrap();
421                let overlap_sst_range = overlap_info.check_multiple_overlap(&level.table_infos);
422                if !overlap_sst_range.is_empty() {
423                    let expected_sst_ids = level.table_infos[overlap_sst_range.clone()]
424                        .iter()
425                        .map(|s| s.object_id)
426                        .collect_vec();
427                    let actual_sst_ids = ssts.iter().map(|s| s.object_id).collect_vec();
428                    // `actual_sst_ids` can be larger than `expected_sst_ids` because we may use a larger key range to select SSTs.
429                    // `expected_sst_ids` must be a sub-range of `actual_sst_ids` to ensure correctness.
430                    let start_idx = actual_sst_ids
431                        .iter()
432                        .position(|sst_id| sst_id == expected_sst_ids.first().unwrap());
433                    if start_idx.is_none_or(|idx| {
434                        actual_sst_ids[idx..idx + expected_sst_ids.len()] != expected_sst_ids
435                    }) {
436                        // Print SstableInfo for `actual_sst_ids`
437                        let mut actual_sst_infos = String::new();
438                        ssts.iter()
439                            .for_each(|s| append_sstable_info_to_string(&mut actual_sst_infos, s));
440
441                        // Print SstableInfo for `expected_sst_ids`
442                        let mut expected_sst_infos = String::new();
443                        level.table_infos[overlap_sst_range].iter().for_each(|s| {
444                            append_sstable_info_to_string(&mut expected_sst_infos, s)
445                        });
446
447                        // Print SstableInfo for selected ssts in all sub-levels
448                        let mut ret_sst_infos = String::new();
449                        ret.sstable_infos.iter().enumerate().for_each(
450                            |(idx, (_sub_level_id, ssts))| {
451                                writeln!(
452                                    ret_sst_infos,
453                                    "sub level {}",
454                                    levels.get(idx).unwrap().sub_level_id
455                                )
456                                .unwrap();
457                                ssts.iter().for_each(|s| {
458                                    append_sstable_info_to_string(&mut ret_sst_infos, s)
459                                });
460                            },
461                        );
462                        panic!(
463                            "Compact task overlap check fails. Actual: {} Expected: {} Ret {}",
464                            actual_sst_infos, expected_sst_infos, ret_sst_infos
465                        );
466                    }
467                }
468            } else if !ssts.is_empty() {
469                // init the `overlap_info` when meeting the first non-empty level.
470                overlap_info = Some(self.overlap_strategy.create_overlap_info());
471            }
472
473            for sst in ssts {
474                overlap_info.as_mut().unwrap().update(&sst.key_range);
475            }
476        }
477    }
478}
479
480#[cfg(test)]
481pub mod tests {
482    use std::collections::BTreeSet;
483
484    use risingwave_common::config::meta::default::compaction_config;
485
486    use super::*;
487    use crate::hummock::compaction::overlap_strategy::RangeOverlapStrategy;
488    use crate::hummock::compaction::picker::non_overlap_sub_level_picker::NonOverlapSubLevelPicker;
489    use crate::hummock::compaction::selector::tests::generate_table;
490
491    #[test]
492    fn test_pick_l0_multi_level() {
493        let levels = vec![
494            Level {
495                level_idx: 1,
496                level_type: LevelType::Nonoverlapping,
497                table_infos: vec![
498                    generate_table(0, 1, 50, 99, 2),
499                    generate_table(1, 1, 100, 149, 2),
500                    generate_table(2, 1, 150, 249, 2),
501                    generate_table(6, 1, 250, 300, 2),
502                    generate_table(7, 1, 350, 400, 2),
503                    generate_table(8, 1, 450, 500, 2),
504                ],
505                total_file_size: 800,
506                ..Default::default()
507            },
508            Level {
509                level_idx: 2,
510                level_type: LevelType::Nonoverlapping,
511                table_infos: vec![
512                    generate_table(4, 1, 50, 199, 1),
513                    generate_table(5, 1, 200, 249, 1),
514                    generate_table(9, 1, 250, 300, 2),
515                    generate_table(10, 1, 350, 400, 2),
516                    generate_table(11, 1, 450, 500, 2),
517                ],
518                total_file_size: 350,
519                ..Default::default()
520            },
521            Level {
522                level_idx: 3,
523                level_type: LevelType::Nonoverlapping,
524                table_infos: vec![
525                    generate_table(11, 1, 250, 300, 2),
526                    generate_table(12, 1, 350, 400, 2),
527                    generate_table(13, 1, 450, 500, 2),
528                ],
529                total_file_size: 150,
530                ..Default::default()
531            },
532            Level {
533                level_idx: 4,
534                level_type: LevelType::Nonoverlapping,
535                table_infos: vec![
536                    generate_table(14, 1, 250, 300, 2),
537                    generate_table(15, 1, 350, 400, 2),
538                    generate_table(16, 1, 450, 500, 2),
539                ],
540                total_file_size: 150,
541                ..Default::default()
542            },
543        ];
544
545        let levels_handlers = [
546            LevelHandler::new(0),
547            LevelHandler::new(1),
548            LevelHandler::new(2),
549        ];
550
551        {
552            // no limit
553            let picker = NonOverlapSubLevelPicker::new(
554                0,
555                10000,
556                1,
557                10000,
558                Arc::new(RangeOverlapStrategy::default()),
559                true,
560                compaction_config::max_l0_compact_level_count() as usize,
561                true,
562            );
563            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
564            assert_eq!(6, ret.len());
565        }
566
567        {
568            // limit max bytes
569            let picker = NonOverlapSubLevelPicker::new(
570                0,
571                100,
572                1,
573                10000,
574                Arc::new(RangeOverlapStrategy::default()),
575                true,
576                compaction_config::max_l0_compact_level_count() as usize,
577                true,
578            );
579            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
580            assert_eq!(6, ret.len());
581        }
582
583        {
584            // limit max file_count
585            let picker = NonOverlapSubLevelPicker::new(
586                0,
587                10000,
588                1,
589                5,
590                Arc::new(RangeOverlapStrategy::default()),
591                true,
592                compaction_config::max_l0_compact_level_count() as usize,
593                true,
594            );
595            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
596            assert_eq!(6, ret.len());
597        }
598    }
599
600    #[test]
601    fn test_pick_l0_multi_level2() {
602        let levels = vec![
603            Level {
604                level_idx: 1,
605                level_type: LevelType::Nonoverlapping,
606                table_infos: vec![
607                    generate_table(0, 1, 50, 99, 2),
608                    generate_table(1, 1, 100, 149, 2),
609                    generate_table(2, 1, 150, 249, 2),
610                    generate_table(6, 1, 250, 300, 2),
611                    generate_table(7, 1, 350, 400, 2),
612                    generate_table(8, 1, 450, 500, 2),
613                ],
614                total_file_size: 800,
615                ..Default::default()
616            },
617            Level {
618                level_idx: 2,
619                level_type: LevelType::Nonoverlapping,
620                table_infos: vec![
621                    generate_table(4, 1, 50, 99, 1),
622                    generate_table(5, 1, 150, 200, 1),
623                    generate_table(9, 1, 250, 300, 2),
624                    generate_table(10, 1, 350, 400, 2),
625                    generate_table(11, 1, 450, 500, 2),
626                ],
627                total_file_size: 250,
628                ..Default::default()
629            },
630            Level {
631                level_idx: 3,
632                level_type: LevelType::Nonoverlapping,
633                table_infos: vec![
634                    generate_table(11, 1, 250, 300, 2),
635                    generate_table(12, 1, 350, 400, 2),
636                    generate_table(13, 1, 450, 500, 2),
637                ],
638                total_file_size: 150,
639                ..Default::default()
640            },
641            Level {
642                level_idx: 4,
643                level_type: LevelType::Nonoverlapping,
644                table_infos: vec![
645                    generate_table(14, 1, 250, 300, 2),
646                    generate_table(15, 1, 350, 400, 2),
647                    generate_table(16, 1, 450, 500, 2),
648                ],
649                total_file_size: 150,
650                ..Default::default()
651            },
652        ];
653
654        let levels_handlers = [
655            LevelHandler::new(0),
656            LevelHandler::new(1),
657            LevelHandler::new(2),
658        ];
659
660        {
661            // no limit
662            let picker = NonOverlapSubLevelPicker::new(
663                0,
664                10000,
665                1,
666                10000,
667                Arc::new(RangeOverlapStrategy::default()),
668                true,
669                compaction_config::max_l0_compact_level_count() as usize,
670                true,
671            );
672            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
673            assert_eq!(6, ret.len());
674        }
675
676        {
677            // limit max bytes
678            let max_compaction_bytes = 100;
679            let picker = NonOverlapSubLevelPicker::new(
680                60,
681                max_compaction_bytes,
682                1,
683                10000,
684                Arc::new(RangeOverlapStrategy::default()),
685                true,
686                compaction_config::max_l0_compact_level_count() as usize,
687                true,
688            );
689            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
690            assert_eq!(6, ret.len());
691
692            for plan in &ret {
693                if plan.total_file_size >= max_compaction_bytes {
694                    assert!(plan.expected);
695                } else {
696                    assert!(!plan.expected);
697                }
698            }
699        }
700
701        {
702            // limit max file_count
703            let max_file_count = 2;
704            let picker = NonOverlapSubLevelPicker::new(
705                0,
706                10000,
707                1,
708                max_file_count,
709                Arc::new(RangeOverlapStrategy::default()),
710                true,
711                compaction_config::max_l0_compact_level_count() as usize,
712                true,
713            );
714            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
715            assert_eq!(6, ret.len());
716
717            for plan in ret {
718                let mut sst_id_set = BTreeSet::default();
719                for sst in &plan.sstable_infos {
720                    sst_id_set.insert(sst.1[0].sst_id);
721                }
722                assert!(sst_id_set.len() <= max_file_count as usize);
723            }
724        }
725
726        {
727            // limit min_depth
728            let min_depth = 3;
729            let picker = NonOverlapSubLevelPicker::new(
730                10,
731                10000,
732                min_depth,
733                100,
734                Arc::new(RangeOverlapStrategy::default()),
735                true,
736                compaction_config::max_l0_compact_level_count() as usize,
737                true,
738            );
739            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
740            assert_eq!(6, ret.len());
741
742            for plan in ret {
743                if plan.sstable_infos.len() >= min_depth {
744                    assert!(plan.expected);
745                } else {
746                    assert!(!plan.expected);
747                }
748            }
749        }
750    }
751
752    #[test]
753    fn test_pick_unexpected_task() {
754        let levels = vec![
755            Level {
756                level_idx: 1,
757                level_type: LevelType::Nonoverlapping,
758                table_infos: vec![generate_table(0, 1, 50, 100, 2)], // 50
759                total_file_size: 50,
760                ..Default::default()
761            },
762            Level {
763                level_idx: 2,
764                level_type: LevelType::Nonoverlapping,
765                table_infos: vec![
766                    generate_table(1, 1, 101, 150, 1), // 50
767                ],
768                total_file_size: 50,
769                ..Default::default()
770            },
771            Level {
772                level_idx: 3,
773                level_type: LevelType::Nonoverlapping,
774                table_infos: vec![
775                    generate_table(2, 1, 151, 200, 2), // 50
776                ],
777                total_file_size: 50,
778                ..Default::default()
779            },
780            Level {
781                level_idx: 4,
782                level_type: LevelType::Nonoverlapping,
783                table_infos: vec![
784                    generate_table(3, 1, 50, 300, 2), // 250
785                ],
786                total_file_size: 250,
787                ..Default::default()
788            },
789        ];
790
791        let levels_handlers = [
792            LevelHandler::new(0),
793            LevelHandler::new(1),
794            LevelHandler::new(2),
795            LevelHandler::new(3),
796            LevelHandler::new(4),
797        ];
798
799        {
800            // no limit
801            let picker = NonOverlapSubLevelPicker::new(
802                0,
803                10000,
804                1,
805                10000,
806                Arc::new(RangeOverlapStrategy::default()),
807                true,
808                compaction_config::max_l0_compact_level_count() as usize,
809                true,
810            );
811            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
812            {
813                let plan = &ret[0];
814                assert_eq!(4, plan.sstable_infos.len());
815            }
816        }
817
818        {
819            // limit size
820            let picker = NonOverlapSubLevelPicker::new(
821                0,
822                150,
823                1,
824                10000,
825                Arc::new(RangeOverlapStrategy::default()),
826                true,
827                compaction_config::max_l0_compact_level_count() as usize,
828                true,
829            );
830            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
831            {
832                let plan = &ret[0];
833                assert_eq!(3, plan.sstable_infos.len());
834            }
835        }
836
837        {
838            // limit count
839            let picker = NonOverlapSubLevelPicker::new(
840                0,
841                10000,
842                1,
843                3,
844                Arc::new(RangeOverlapStrategy::default()),
845                true,
846                compaction_config::max_l0_compact_level_count() as usize,
847                true,
848            );
849            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
850            {
851                let plan = &ret[0];
852                assert_eq!(3, plan.sstable_infos.len());
853            }
854        }
855
856        {
857            // limit expected level count
858            let max_expected_level_count = 3;
859            let picker = NonOverlapSubLevelPicker::for_test(
860                0,
861                10000,
862                1,
863                3,
864                Arc::new(RangeOverlapStrategy::default()),
865                true,
866                max_expected_level_count,
867                true,
868            );
869            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
870            {
871                let plan = &ret[0];
872                assert_eq!(max_expected_level_count, plan.sstable_infos.len());
873
874                assert_eq!(0, plan.sstable_infos[0].1[0].sst_id);
875                assert_eq!(1, plan.sstable_infos[1].1[0].sst_id);
876                assert_eq!(2, plan.sstable_infos[2].1[0].sst_id);
877            }
878        }
879
880        {
881            // limit min_compacaion_bytes
882            let max_expected_level_count = 100;
883            let picker = NonOverlapSubLevelPicker::for_test(
884                1000,
885                10000,
886                1,
887                100,
888                Arc::new(RangeOverlapStrategy::default()),
889                true,
890                max_expected_level_count,
891                true,
892            );
893            let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
894            {
895                let plan = &ret[0];
896
897                assert_eq!(0, plan.sstable_infos[0].1[0].sst_id);
898                assert_eq!(1, plan.sstable_infos[1].1[0].sst_id);
899                assert_eq!(2, plan.sstable_infos[2].1[0].sst_id);
900                assert_eq!(3, plan.sstable_infos[3].1[0].sst_id);
901                assert!(!plan.expected);
902            }
903        }
904    }
905
906    #[test]
907    fn test_select_intervals_combined() {
908        // no optimization: pick the first sub-level
909        let l0 = vec![
910            Level {
911                level_idx: 1,
912                level_type: LevelType::Nonoverlapping,
913                table_infos: vec![generate_table(1, 1, 0, 10, 1)],
914                ..Default::default()
915            },
916            Level {
917                level_idx: 2,
918                level_type: LevelType::Overlapping,
919                table_infos: vec![generate_table(2, 1, 0, 10, 1)],
920                ..Default::default()
921            },
922        ];
923        let picker = NonOverlapSubLevelPicker::new(
924            0,
925            10000,
926            1,
927            10000,
928            Arc::new(RangeOverlapStrategy::default()),
929            true,
930            compaction_config::max_l0_compact_level_count() as usize,
931            false,
932        );
933        let level_handler = LevelHandler::new(0);
934        let intervals = picker.select_intervals(&l0, &level_handler).unwrap();
935        assert_eq!(intervals.len(), 1);
936        assert_eq!(intervals[0].sst_id, 1);
937
938        // with optimization: prefer the sub-level with more non-pending files
939        let l0 = vec![
940            Level {
941                level_idx: 1,
942                level_type: LevelType::Nonoverlapping,
943                table_infos: vec![generate_table(1, 1, 0, 10, 1)], // 1 file
944                ..Default::default()
945            },
946            Level {
947                level_idx: 2,
948                level_type: LevelType::Nonoverlapping,
949                table_infos: vec![
950                    generate_table(2, 1, 0, 10, 1),
951                    generate_table(3, 1, 20, 30, 1),
952                ], // 2 files
953                ..Default::default()
954            },
955        ];
956        let picker = NonOverlapSubLevelPicker::new(
957            0,
958            10000,
959            1,
960            10000,
961            Arc::new(RangeOverlapStrategy::default()),
962            true,
963            compaction_config::max_l0_compact_level_count() as usize,
964            true,
965        );
966        let intervals = picker.select_intervals(&l0, &level_handler).unwrap();
967        assert_eq!(intervals.len(), 2);
968        assert_eq!(
969            intervals.iter().map(|s| s.sst_id).collect::<Vec<_>>(),
970            vec![2, 3]
971        );
972
973        // all overlapping should return None
974        let l0 = vec![Level {
975            level_idx: 1,
976            level_type: LevelType::Overlapping,
977            table_infos: vec![generate_table(1, 1, 0, 10, 1)],
978            ..Default::default()
979        }];
980        let picker = NonOverlapSubLevelPicker::new(
981            0,
982            10000,
983            1,
984            10000,
985            Arc::new(RangeOverlapStrategy::default()),
986            true,
987            compaction_config::max_l0_compact_level_count() as usize,
988            true,
989        );
990        assert!(picker.select_intervals(&l0, &level_handler).is_none());
991    }
992}