risingwave_meta/hummock/compaction/picker/
intra_compaction_picker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_common::config::default::compaction_config;
18use risingwave_hummock_sdk::level::{InputLevel, Levels, OverlappingLevel};
19use risingwave_pb::hummock::{CompactionConfig, LevelType};
20
21use super::min_overlap_compaction_picker::NonOverlapSubLevelPicker;
22use super::{
23    CompactionInput, CompactionPicker, CompactionTaskValidator, LocalPickerStatistic,
24    ValidationRuleType,
25};
26use crate::hummock::compaction::picker::TrivialMovePicker;
27use crate::hummock::compaction::{CompactionDeveloperConfig, create_overlap_strategy};
28use crate::hummock::level_handler::LevelHandler;
29
30pub struct IntraCompactionPicker {
31    config: Arc<CompactionConfig>,
32    compaction_task_validator: Arc<CompactionTaskValidator>,
33
34    developer_config: Arc<CompactionDeveloperConfig>,
35}
36
37impl CompactionPicker for IntraCompactionPicker {
38    fn pick_compaction(
39        &mut self,
40        levels: &Levels,
41        level_handlers: &[LevelHandler],
42        stats: &mut LocalPickerStatistic,
43    ) -> Option<CompactionInput> {
44        let l0 = &levels.l0;
45        if l0.sub_levels.is_empty() {
46            return None;
47        }
48
49        if let Some(ret) = self.pick_l0_trivial_move_file(l0, level_handlers, stats) {
50            return Some(ret);
51        }
52
53        let vnode_partition_count = self.config.split_weight_by_vnode;
54
55        if let Some(ret) =
56            self.pick_whole_level(l0, &level_handlers[0], vnode_partition_count, stats)
57        {
58            if ret.input_levels.len() < 2 {
59                tracing::error!(
60                    ?ret,
61                    vnode_partition_count,
62                    "pick_whole_level failed to pick enough levels"
63                );
64                return None;
65            }
66
67            return Some(ret);
68        }
69
70        if let Some(ret) = self.pick_l0_intra(l0, &level_handlers[0], vnode_partition_count, stats)
71        {
72            if ret.input_levels.len() < 2 {
73                tracing::error!(
74                    ?ret,
75                    vnode_partition_count,
76                    "pick_l0_intra failed to pick enough levels"
77                );
78                return None;
79            }
80
81            return Some(ret);
82        }
83
84        None
85    }
86}
87
88impl IntraCompactionPicker {
89    #[cfg(test)]
90    pub fn for_test(
91        config: Arc<CompactionConfig>,
92        developer_config: Arc<CompactionDeveloperConfig>,
93    ) -> IntraCompactionPicker {
94        IntraCompactionPicker {
95            compaction_task_validator: Arc::new(CompactionTaskValidator::new(config.clone())),
96            config,
97            developer_config,
98        }
99    }
100
101    pub fn new_with_validator(
102        config: Arc<CompactionConfig>,
103        compaction_task_validator: Arc<CompactionTaskValidator>,
104        developer_config: Arc<CompactionDeveloperConfig>,
105    ) -> IntraCompactionPicker {
106        assert!(config.level0_sub_level_compact_level_count > 1);
107        IntraCompactionPicker {
108            config,
109            compaction_task_validator,
110            developer_config,
111        }
112    }
113
114    fn pick_whole_level(
115        &self,
116        l0: &OverlappingLevel,
117        level_handler: &LevelHandler,
118        partition_count: u32,
119        stats: &mut LocalPickerStatistic,
120    ) -> Option<CompactionInput> {
121        let picker = WholeLevelCompactionPicker::new(
122            self.config.clone(),
123            self.compaction_task_validator.clone(),
124        );
125        picker.pick_whole_level(l0, level_handler, partition_count, stats)
126    }
127
128    fn pick_l0_intra(
129        &self,
130        l0: &OverlappingLevel,
131        level_handler: &LevelHandler,
132        vnode_partition_count: u32,
133        stats: &mut LocalPickerStatistic,
134    ) -> Option<CompactionInput> {
135        let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());
136        let mut max_vnode_partition_idx = 0;
137        for (idx, level) in l0.sub_levels.iter().enumerate() {
138            if level.vnode_partition_count < vnode_partition_count {
139                break;
140            }
141            max_vnode_partition_idx = idx;
142        }
143
144        for (idx, level) in l0.sub_levels.iter().enumerate() {
145            if level.level_type != LevelType::Nonoverlapping
146                || level.total_file_size > self.config.sub_level_max_compaction_bytes
147            {
148                continue;
149            }
150
151            if idx > max_vnode_partition_idx {
152                break;
153            }
154
155            if level_handler.is_level_all_pending_compact(level) {
156                continue;
157            }
158
159            let max_compaction_bytes = std::cmp::min(
160                self.config.max_compaction_bytes,
161                self.config.sub_level_max_compaction_bytes,
162            );
163
164            let non_overlap_sub_level_picker = NonOverlapSubLevelPicker::new(
165                self.config.sub_level_max_compaction_bytes / 2,
166                max_compaction_bytes,
167                self.config.level0_sub_level_compact_level_count as usize,
168                self.config.level0_max_compact_file_number,
169                overlap_strategy.clone(),
170                self.developer_config.enable_check_task_level_overlap,
171                self.config
172                    .max_l0_compact_level_count
173                    .unwrap_or(compaction_config::max_l0_compact_level_count())
174                    as usize,
175            );
176
177            let l0_select_tables_vec = non_overlap_sub_level_picker
178                .pick_l0_multi_non_overlap_level(
179                    &l0.sub_levels[idx..=max_vnode_partition_idx],
180                    level_handler,
181                );
182
183            if l0_select_tables_vec.is_empty() {
184                continue;
185            }
186
187            let mut select_input_size = 0;
188            let mut total_file_count = 0;
189            for input in l0_select_tables_vec {
190                let mut max_level_size = 0;
191                for level_select_table in &input.sstable_infos {
192                    let level_select_size = level_select_table
193                        .iter()
194                        .map(|sst| sst.sst_size)
195                        .sum::<u64>();
196
197                    max_level_size = std::cmp::max(max_level_size, level_select_size);
198                }
199
200                let mut select_level_inputs = Vec::with_capacity(input.sstable_infos.len());
201                for level_select_sst in input.sstable_infos {
202                    if level_select_sst.is_empty() {
203                        continue;
204                    }
205                    select_level_inputs.push(InputLevel {
206                        level_idx: 0,
207                        level_type: LevelType::Nonoverlapping,
208                        table_infos: level_select_sst,
209                    });
210
211                    select_input_size += input.total_file_size;
212                    total_file_count += input.total_file_count;
213                }
214                select_level_inputs.reverse();
215
216                let result = CompactionInput {
217                    input_levels: select_level_inputs,
218                    target_sub_level_id: level.sub_level_id,
219                    select_input_size,
220                    total_file_count: total_file_count as u64,
221                    ..Default::default()
222                };
223
224                if !self.compaction_task_validator.valid_compact_task(
225                    &result,
226                    ValidationRuleType::Intra,
227                    stats,
228                ) {
229                    continue;
230                }
231
232                return Some(result);
233            }
234        }
235
236        None
237    }
238
239    fn pick_l0_trivial_move_file(
240        &self,
241        l0: &OverlappingLevel,
242        level_handlers: &[LevelHandler],
243        stats: &mut LocalPickerStatistic,
244    ) -> Option<CompactionInput> {
245        if !self.developer_config.enable_trivial_move {
246            return None;
247        }
248
249        let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());
250
251        for (idx, level) in l0.sub_levels.iter().enumerate() {
252            if level.level_type == LevelType::Overlapping || idx + 1 >= l0.sub_levels.len() {
253                continue;
254            }
255
256            if l0.sub_levels[idx + 1].level_type == LevelType::Overlapping {
257                continue;
258            }
259
260            if level_handlers[0].is_level_pending_compact(level) {
261                continue;
262            }
263
264            if l0.sub_levels[idx + 1].vnode_partition_count
265                != l0.sub_levels[idx].vnode_partition_count
266            {
267                continue;
268            }
269
270            let trivial_move_picker = TrivialMovePicker::new(
271                0,
272                0,
273                overlap_strategy.clone(),
274                0,
275                self.config
276                    .sst_allowed_trivial_move_max_count
277                    .unwrap_or(compaction_config::sst_allowed_trivial_move_max_count())
278                    as usize,
279            );
280
281            if let Some(select_ssts) = trivial_move_picker.pick_multi_trivial_move_ssts(
282                &l0.sub_levels[idx + 1].table_infos,
283                &level.table_infos,
284                level_handlers,
285                stats,
286            ) {
287                let mut overlap = overlap_strategy.create_overlap_info();
288                select_ssts.iter().for_each(|ssts| overlap.update(ssts));
289
290                assert!(
291                    overlap
292                        .check_multiple_overlap(&l0.sub_levels[idx].table_infos)
293                        .is_empty()
294                );
295
296                let select_input_size = select_ssts.iter().map(|sst| sst.sst_size).sum();
297                let total_file_count = select_ssts.len() as u64;
298                let input_levels = vec![
299                    InputLevel {
300                        level_idx: 0,
301                        level_type: LevelType::Nonoverlapping,
302                        table_infos: select_ssts,
303                    },
304                    InputLevel {
305                        level_idx: 0,
306                        level_type: LevelType::Nonoverlapping,
307                        table_infos: vec![],
308                    },
309                ];
310                return Some(CompactionInput {
311                    input_levels,
312                    target_level: 0,
313                    target_sub_level_id: level.sub_level_id,
314                    select_input_size,
315                    total_file_count,
316                    ..Default::default()
317                });
318            }
319        }
320        None
321    }
322}
323
324pub struct WholeLevelCompactionPicker {
325    config: Arc<CompactionConfig>,
326    compaction_task_validator: Arc<CompactionTaskValidator>,
327}
328
329impl WholeLevelCompactionPicker {
330    pub fn new(
331        config: Arc<CompactionConfig>,
332        compaction_task_validator: Arc<CompactionTaskValidator>,
333    ) -> Self {
334        Self {
335            config,
336            compaction_task_validator,
337        }
338    }
339
340    pub fn pick_whole_level(
341        &self,
342        l0: &OverlappingLevel,
343        level_handler: &LevelHandler,
344        partition_count: u32,
345        stats: &mut LocalPickerStatistic,
346    ) -> Option<CompactionInput> {
347        if partition_count == 0 {
348            return None;
349        }
350        for (idx, level) in l0.sub_levels.iter().enumerate() {
351            if level.level_type != LevelType::Nonoverlapping
352                || level.vnode_partition_count == partition_count
353            {
354                continue;
355            }
356
357            let max_compaction_bytes = std::cmp::max(
358                self.config.max_bytes_for_level_base,
359                self.config.sub_level_max_compaction_bytes
360                    * (self.config.level0_sub_level_compact_level_count as u64),
361            );
362
363            let mut select_input_size = 0;
364
365            let mut select_level_inputs = vec![];
366            let mut total_file_count = 0;
367            let mut wait_enough = false;
368            for next_level in l0.sub_levels.iter().skip(idx) {
369                if (select_input_size > max_compaction_bytes
370                    || total_file_count > self.config.level0_max_compact_file_number
371                    || next_level.vnode_partition_count == partition_count)
372                    && select_level_inputs.len() > 1
373                {
374                    wait_enough = true;
375                    break;
376                }
377
378                if level_handler.is_level_pending_compact(next_level) {
379                    break;
380                }
381
382                select_input_size += next_level.total_file_size;
383                total_file_count += next_level.table_infos.len() as u64;
384
385                select_level_inputs.push(InputLevel {
386                    level_idx: 0,
387                    level_type: next_level.level_type,
388                    table_infos: next_level.table_infos.clone(),
389                });
390            }
391            if select_level_inputs.len() > 1 {
392                let vnode_partition_count =
393                    if select_input_size > self.config.sub_level_max_compaction_bytes / 2 {
394                        partition_count
395                    } else {
396                        0
397                    };
398                let result = CompactionInput {
399                    input_levels: select_level_inputs,
400                    target_sub_level_id: level.sub_level_id,
401                    select_input_size,
402                    total_file_count,
403                    vnode_partition_count,
404                    ..Default::default()
405                };
406                if wait_enough
407                    || self.compaction_task_validator.valid_compact_task(
408                        &result,
409                        ValidationRuleType::Intra,
410                        stats,
411                    )
412                {
413                    return Some(result);
414                }
415            }
416        }
417
418        None
419    }
420}
421
422#[cfg(test)]
423pub mod tests {
424    use risingwave_hummock_sdk::level::Level;
425
426    use super::*;
427    use crate::hummock::compaction::TierCompactionPicker;
428    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
429    use crate::hummock::compaction::selector::tests::{
430        generate_l0_nonoverlapping_multi_sublevels, generate_l0_nonoverlapping_sublevels,
431        generate_l0_overlapping_sublevels, generate_level, generate_table,
432        push_table_level0_overlapping, push_tables_level0_nonoverlapping,
433    };
434
435    fn create_compaction_picker_for_test() -> IntraCompactionPicker {
436        let config = Arc::new(
437            CompactionConfigBuilder::new()
438                .level0_tier_compact_file_number(2)
439                .level0_sub_level_compact_level_count(1)
440                .build(),
441        );
442        IntraCompactionPicker::for_test(config, Arc::new(CompactionDeveloperConfig::default()))
443    }
444
445    #[test]
446    fn test_l0_to_l1_compact_conflict() {
447        // When picking L0->L1, L0's selecting_key_range should not be overlapped with L0's
448        // compacting_key_range.
449        let levels = vec![Level {
450            level_idx: 1,
451            level_type: LevelType::Nonoverlapping,
452            table_infos: vec![],
453            ..Default::default()
454        }];
455        let mut levels = Levels {
456            levels,
457            l0: OverlappingLevel {
458                sub_levels: vec![],
459                total_file_size: 0,
460                uncompressed_file_size: 0,
461            },
462            ..Default::default()
463        };
464        push_tables_level0_nonoverlapping(
465            &mut levels,
466            vec![
467                generate_table(1, 1, 100, 300, 2),
468                generate_table(2, 1, 350, 500, 2),
469            ],
470        );
471        let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
472
473        let mut local_stats = LocalPickerStatistic::default();
474        push_tables_level0_nonoverlapping(&mut levels, vec![generate_table(3, 1, 250, 300, 3)]);
475        let config: CompactionConfig = CompactionConfigBuilder::new()
476            .level0_tier_compact_file_number(2)
477            .max_compaction_bytes(1000)
478            .sub_level_max_compaction_bytes(150)
479            .max_bytes_for_level_multiplier(1)
480            .level0_sub_level_compact_level_count(3)
481            .build();
482        let mut picker = TierCompactionPicker::new(Arc::new(config));
483
484        let ret: Option<CompactionInput> =
485            picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
486        assert!(ret.is_none());
487    }
488
489    #[test]
490    fn test_compacting_key_range_overlap_intra_l0() {
491        // When picking L0->L0, L0's selecting_key_range should not be overlapped with L0's
492        // compacting_key_range.
493        let mut picker = create_compaction_picker_for_test();
494
495        let mut levels = Levels {
496            levels: vec![Level {
497                level_idx: 1,
498                level_type: LevelType::Nonoverlapping,
499                table_infos: vec![generate_table(3, 1, 200, 300, 2)],
500                ..Default::default()
501            }],
502            l0: generate_l0_nonoverlapping_sublevels(vec![
503                generate_table(1, 1, 100, 210, 2),
504                generate_table(2, 1, 200, 250, 2),
505            ]),
506            ..Default::default()
507        };
508        let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
509
510        let mut local_stats = LocalPickerStatistic::default();
511        let ret = picker
512            .pick_compaction(&levels, &levels_handler, &mut local_stats)
513            .unwrap();
514        ret.add_pending_task(0, &mut levels_handler);
515
516        push_table_level0_overlapping(&mut levels, generate_table(4, 1, 170, 180, 3));
517        assert!(
518            picker
519                .pick_compaction(&levels, &levels_handler, &mut local_stats)
520                .is_none()
521        );
522    }
523
524    #[test]
525    fn test_pick_l0_intra() {
526        {
527            let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
528                vec![
529                    generate_table(6, 1, 50, 99, 1),
530                    generate_table(1, 1, 100, 200, 1),
531                    generate_table(2, 1, 250, 300, 1),
532                ],
533                vec![
534                    generate_table(3, 1, 10, 90, 1),
535                    generate_table(6, 1, 100, 110, 1),
536                ],
537                vec![
538                    generate_table(4, 1, 50, 99, 1),
539                    generate_table(5, 1, 100, 200, 1),
540                ],
541            ]);
542            let levels = Levels {
543                l0,
544                levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
545                ..Default::default()
546            };
547            let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
548            levels_handler[1].add_pending_task(100, 1, &levels.levels[0].table_infos);
549            let config = Arc::new(
550                CompactionConfigBuilder::new()
551                    .level0_sub_level_compact_level_count(1)
552                    .level0_overlapping_sub_level_compact_level_count(4)
553                    .build(),
554            );
555            let mut picker = IntraCompactionPicker::for_test(
556                config,
557                Arc::new(CompactionDeveloperConfig::default()),
558            );
559            let mut local_stats = LocalPickerStatistic::default();
560            let ret = picker
561                .pick_compaction(&levels, &levels_handler, &mut local_stats)
562                .unwrap();
563            ret.add_pending_task(1, &mut levels_handler);
564            assert_eq!(
565                ret.input_levels
566                    .iter()
567                    .map(|i| i.table_infos.len())
568                    .sum::<usize>(),
569                3
570            );
571        }
572
573        {
574            // Suppose keyguard [100, 200] [300, 400]
575            // will pick sst [1, 3, 4]
576            let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
577                vec![
578                    generate_table(1, 1, 100, 200, 1),
579                    generate_table(2, 1, 300, 400, 1),
580                ],
581                vec![
582                    generate_table(3, 1, 100, 200, 1),
583                    generate_table(6, 1, 300, 500, 1),
584                ],
585                vec![
586                    generate_table(4, 1, 100, 200, 1),
587                    generate_table(5, 1, 300, 400, 1),
588                ],
589            ]);
590            let levels = Levels {
591                l0,
592                levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
593                ..Default::default()
594            };
595            let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
596            levels_handler[1].add_pending_task(100, 1, &levels.levels[0].table_infos);
597            let config = Arc::new(
598                CompactionConfigBuilder::new()
599                    .level0_sub_level_compact_level_count(1)
600                    .sub_level_max_compaction_bytes(300)
601                    .build(),
602            );
603            let mut picker = IntraCompactionPicker::for_test(
604                config,
605                Arc::new(CompactionDeveloperConfig::default()),
606            );
607            let mut local_stats = LocalPickerStatistic::default();
608            let ret = picker
609                .pick_compaction(&levels, &levels_handler, &mut local_stats)
610                .unwrap();
611            ret.add_pending_task(1, &mut levels_handler);
612            assert_eq!(
613                ret.input_levels
614                    .iter()
615                    .map(|i| i.table_infos.len())
616                    .sum::<usize>(),
617                3
618            );
619
620            assert_eq!(4, ret.input_levels[0].table_infos[0].sst_id);
621            assert_eq!(3, ret.input_levels[1].table_infos[0].sst_id);
622            assert_eq!(1, ret.input_levels[2].table_infos[0].sst_id);
623
624            // will pick sst [2, 6]
625            let ret2 = picker
626                .pick_compaction(&levels, &levels_handler, &mut local_stats)
627                .unwrap();
628
629            assert_eq!(
630                ret2.input_levels
631                    .iter()
632                    .map(|i| i.table_infos.len())
633                    .sum::<usize>(),
634                2
635            );
636
637            assert_eq!(6, ret2.input_levels[0].table_infos[0].sst_id);
638            assert_eq!(2, ret2.input_levels[1].table_infos[0].sst_id);
639        }
640
641        {
642            let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
643                vec![
644                    generate_table(1, 1, 100, 149, 1),
645                    generate_table(6, 1, 150, 199, 1),
646                    generate_table(7, 1, 200, 250, 1),
647                    generate_table(2, 1, 300, 400, 1),
648                ],
649                vec![
650                    generate_table(3, 1, 100, 149, 1),
651                    generate_table(8, 1, 150, 199, 1),
652                    generate_table(9, 1, 200, 250, 1),
653                    generate_table(10, 1, 300, 400, 1),
654                ],
655                vec![
656                    generate_table(4, 1, 100, 199, 1),
657                    generate_table(11, 1, 200, 250, 1),
658                    generate_table(5, 1, 300, 350, 1),
659                ],
660            ]);
661            let levels = Levels {
662                l0,
663                levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
664                ..Default::default()
665            };
666            let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
667            levels_handler[1].add_pending_task(100, 1, &levels.levels[0].table_infos);
668            let config = Arc::new(
669                CompactionConfigBuilder::new()
670                    .level0_sub_level_compact_level_count(1)
671                    .sub_level_max_compaction_bytes(300)
672                    .build(),
673            );
674            let mut picker = IntraCompactionPicker::for_test(
675                config,
676                Arc::new(CompactionDeveloperConfig::default()),
677            );
678            let mut local_stats = LocalPickerStatistic::default();
679            let ret = picker
680                .pick_compaction(&levels, &levels_handler, &mut local_stats)
681                .unwrap();
682            ret.add_pending_task(1, &mut levels_handler);
683            assert_eq!(
684                ret.input_levels
685                    .iter()
686                    .map(|i| i.table_infos.len())
687                    .sum::<usize>(),
688                3
689            );
690
691            assert_eq!(11, ret.input_levels[0].table_infos[0].sst_id);
692            assert_eq!(9, ret.input_levels[1].table_infos[0].sst_id);
693            assert_eq!(7, ret.input_levels[2].table_infos[0].sst_id);
694
695            let ret2 = picker
696                .pick_compaction(&levels, &levels_handler, &mut local_stats)
697                .unwrap();
698
699            assert_eq!(
700                ret2.input_levels
701                    .iter()
702                    .map(|i| i.table_infos.len())
703                    .sum::<usize>(),
704                3
705            );
706
707            assert_eq!(5, ret2.input_levels[0].table_infos[0].sst_id);
708            assert_eq!(10, ret2.input_levels[1].table_infos[0].sst_id);
709            assert_eq!(2, ret2.input_levels[2].table_infos[0].sst_id);
710        }
711    }
712
713    fn is_l0_trivial_move(compaction_input: &CompactionInput) -> bool {
714        compaction_input.input_levels.len() == 2
715            && !compaction_input.input_levels[0].table_infos.is_empty()
716            && compaction_input.input_levels[1].table_infos.is_empty()
717    }
718
719    #[test]
720    fn test_trivial_move() {
721        let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
722        let config = Arc::new(
723            CompactionConfigBuilder::new()
724                .level0_tier_compact_file_number(2)
725                .target_file_size_base(30)
726                .level0_sub_level_compact_level_count(20) // reject intra
727                .build(),
728        );
729        let mut picker =
730            IntraCompactionPicker::for_test(config, Arc::new(CompactionDeveloperConfig::default()));
731
732        // Cannot trivial move because there is only 1 sub-level.
733        let l0 = generate_l0_overlapping_sublevels(vec![vec![
734            generate_table(1, 1, 100, 110, 1),
735            generate_table(2, 1, 150, 250, 1),
736        ]]);
737        let levels = Levels {
738            l0,
739            levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
740            ..Default::default()
741        };
742        levels_handler[1].add_pending_task(100, 1, &levels.levels[0].table_infos);
743        let mut local_stats = LocalPickerStatistic::default();
744        let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
745        assert!(ret.is_none());
746
747        // Cannot trivial move because sub-levels are overlapping
748        let l0: OverlappingLevel = generate_l0_overlapping_sublevels(vec![
749            vec![
750                generate_table(1, 1, 100, 110, 1),
751                generate_table(2, 1, 150, 250, 1),
752            ],
753            vec![generate_table(3, 1, 10, 90, 1)],
754            vec![generate_table(4, 1, 10, 90, 1)],
755            vec![generate_table(5, 1, 10, 90, 1)],
756        ]);
757        let mut levels = Levels {
758            l0,
759            levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
760            ..Default::default()
761        };
762        assert!(
763            picker
764                .pick_compaction(&levels, &levels_handler, &mut local_stats)
765                .is_none()
766        );
767
768        // Cannot trivial move because latter sub-level is overlapping
769        levels.l0.sub_levels[0].level_type = LevelType::Nonoverlapping;
770        levels.l0.sub_levels[1].level_type = LevelType::Overlapping;
771        let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
772        assert!(ret.is_none());
773
774        // Cannot trivial move because former sub-level is overlapping
775        levels.l0.sub_levels[0].level_type = LevelType::Overlapping;
776        levels.l0.sub_levels[1].level_type = LevelType::Nonoverlapping;
777        let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
778        assert!(ret.is_none());
779
780        // trivial move
781        levels.l0.sub_levels[0].level_type = LevelType::Nonoverlapping;
782        levels.l0.sub_levels[1].level_type = LevelType::Nonoverlapping;
783        let ret = picker
784            .pick_compaction(&levels, &levels_handler, &mut local_stats)
785            .unwrap();
786        assert!(is_l0_trivial_move(&ret));
787        assert_eq!(ret.input_levels[0].table_infos.len(), 1);
788    }
789    #[test]
790    fn test_pick_whole_level() {
791        let config = Arc::new(
792            CompactionConfigBuilder::new()
793                .level0_max_compact_file_number(20)
794                .build(),
795        );
796        let mut table_infos = vec![];
797        for epoch in 1..3 {
798            let base = epoch * 100;
799            let mut ssts = vec![];
800            for i in 1..50 {
801                let left = (i as usize) * 100;
802                let right = left + 100;
803                ssts.push(generate_table(base + i, 1, left, right, epoch));
804            }
805            table_infos.push(ssts);
806        }
807
808        let l0 = generate_l0_nonoverlapping_multi_sublevels(table_infos);
809        let compaction_task_validator = Arc::new(CompactionTaskValidator::new(config.clone()));
810        let picker = WholeLevelCompactionPicker::new(config, compaction_task_validator);
811        let level_handler = LevelHandler::new(0);
812        let ret = picker
813            .pick_whole_level(&l0, &level_handler, 4, &mut LocalPickerStatistic::default())
814            .unwrap();
815        assert_eq!(ret.input_levels.len(), 2);
816    }
817
818    #[test]
819    fn test_priority() {
820        let config = Arc::new(
821            CompactionConfigBuilder::new()
822                .level0_max_compact_file_number(20)
823                .sub_level_max_compaction_bytes(1)
824                .level0_sub_level_compact_level_count(2)
825                .build(),
826        );
827        let mut table_infos = vec![];
828        for epoch in 1..3 {
829            let base = epoch * 100;
830            let mut ssts = vec![];
831            for i in 1..50 {
832                let left = (i as usize) * 100;
833                let right = left + 100;
834                ssts.push(generate_table(base + i, 1, left, right, epoch));
835            }
836            table_infos.push(ssts);
837        }
838
839        let mut l0 = generate_l0_nonoverlapping_multi_sublevels(table_infos);
840        // trivial-move
841        l0.sub_levels[1]
842            .table_infos
843            .push(generate_table(9999, 900000000, 0, 100, 1));
844
845        l0.sub_levels[0].total_file_size = 1;
846        l0.sub_levels[1].total_file_size = 1;
847
848        let mut picker = IntraCompactionPicker::new_with_validator(
849            config,
850            Arc::new(CompactionTaskValidator::unused()),
851            Arc::new(CompactionDeveloperConfig::default()),
852        );
853        let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
854        let mut local_stats = LocalPickerStatistic::default();
855
856        let levels = Levels {
857            l0,
858            levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
859            ..Default::default()
860        };
861
862        let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
863        assert!(is_l0_trivial_move(ret.as_ref().unwrap()));
864        ret.as_ref()
865            .unwrap()
866            .add_pending_task(1, &mut levels_handler);
867        let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
868        assert!(ret.is_some());
869        let input = ret.as_ref().unwrap();
870        assert_eq!(input.input_levels.len(), 2);
871        assert_ne!(
872            levels.l0.sub_levels[0].table_infos.len(),
873            input.input_levels[0].table_infos.len()
874        );
875        assert_ne!(
876            levels.l0.sub_levels[1].table_infos.len(),
877            input.input_levels[1].table_infos.len()
878        );
879    }
880}