risingwave_meta/hummock/compaction/picker/
intra_compaction_picker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_common::config::default::compaction_config;
18use risingwave_hummock_sdk::level::{InputLevel, Levels, OverlappingLevel};
19use risingwave_pb::hummock::{CompactionConfig, LevelType};
20
21use super::min_overlap_compaction_picker::NonOverlapSubLevelPicker;
22use super::{
23    CompactionInput, CompactionPicker, CompactionTaskValidator, LocalPickerStatistic,
24    ValidationRuleType,
25};
26use crate::hummock::compaction::picker::TrivialMovePicker;
27use crate::hummock::compaction::{CompactionDeveloperConfig, create_overlap_strategy};
28use crate::hummock::level_handler::LevelHandler;
29
30pub struct IntraCompactionPicker {
31    config: Arc<CompactionConfig>,
32    compaction_task_validator: Arc<CompactionTaskValidator>,
33
34    developer_config: Arc<CompactionDeveloperConfig>,
35}
36
37impl CompactionPicker for IntraCompactionPicker {
38    fn pick_compaction(
39        &mut self,
40        levels: &Levels,
41        level_handlers: &[LevelHandler],
42        stats: &mut LocalPickerStatistic,
43    ) -> Option<CompactionInput> {
44        let l0 = &levels.l0;
45        if l0.sub_levels.is_empty() {
46            return None;
47        }
48
49        if let Some(ret) = self.pick_l0_trivial_move_file(l0, level_handlers, stats) {
50            return Some(ret);
51        }
52
53        let vnode_partition_count = self.config.split_weight_by_vnode;
54
55        if let Some(ret) =
56            self.pick_whole_level(l0, &level_handlers[0], vnode_partition_count, stats)
57        {
58            if ret.input_levels.len() < 2 {
59                tracing::error!(
60                    ?ret,
61                    vnode_partition_count,
62                    "pick_whole_level failed to pick enough levels"
63                );
64                return None;
65            }
66
67            return Some(ret);
68        }
69
70        if let Some(ret) = self.pick_l0_intra(l0, &level_handlers[0], vnode_partition_count, stats)
71        {
72            if ret.input_levels.len() < 2 {
73                tracing::error!(
74                    ?ret,
75                    vnode_partition_count,
76                    "pick_l0_intra failed to pick enough levels"
77                );
78                return None;
79            }
80
81            return Some(ret);
82        }
83
84        None
85    }
86}
87
88impl IntraCompactionPicker {
89    #[cfg(test)]
90    pub fn for_test(
91        config: Arc<CompactionConfig>,
92        developer_config: Arc<CompactionDeveloperConfig>,
93    ) -> IntraCompactionPicker {
94        IntraCompactionPicker {
95            compaction_task_validator: Arc::new(CompactionTaskValidator::new(config.clone())),
96            config,
97            developer_config,
98        }
99    }
100
101    pub fn new_with_validator(
102        config: Arc<CompactionConfig>,
103        compaction_task_validator: Arc<CompactionTaskValidator>,
104        developer_config: Arc<CompactionDeveloperConfig>,
105    ) -> IntraCompactionPicker {
106        assert!(config.level0_sub_level_compact_level_count > 1);
107        IntraCompactionPicker {
108            config,
109            compaction_task_validator,
110            developer_config,
111        }
112    }
113
114    fn pick_whole_level(
115        &self,
116        l0: &OverlappingLevel,
117        level_handler: &LevelHandler,
118        partition_count: u32,
119        stats: &mut LocalPickerStatistic,
120    ) -> Option<CompactionInput> {
121        let picker = WholeLevelCompactionPicker::new(
122            self.config.clone(),
123            self.compaction_task_validator.clone(),
124        );
125        picker.pick_whole_level(l0, level_handler, partition_count, stats)
126    }
127
128    fn pick_l0_intra(
129        &self,
130        l0: &OverlappingLevel,
131        level_handler: &LevelHandler,
132        vnode_partition_count: u32,
133        stats: &mut LocalPickerStatistic,
134    ) -> Option<CompactionInput> {
135        let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());
136        let mut max_vnode_partition_idx = 0;
137        for (idx, level) in l0.sub_levels.iter().enumerate() {
138            if level.vnode_partition_count < vnode_partition_count {
139                break;
140            }
141            max_vnode_partition_idx = idx;
142        }
143
144        for (idx, level) in l0.sub_levels.iter().enumerate() {
145            if level.level_type != LevelType::Nonoverlapping
146                || level.total_file_size > self.config.sub_level_max_compaction_bytes
147            {
148                continue;
149            }
150
151            if idx > max_vnode_partition_idx {
152                break;
153            }
154
155            if level_handler.is_level_all_pending_compact(level) {
156                continue;
157            }
158
159            let max_compaction_bytes = std::cmp::min(
160                self.config.max_compaction_bytes,
161                self.config.sub_level_max_compaction_bytes,
162            );
163
164            let non_overlap_sub_level_picker = NonOverlapSubLevelPicker::new(
165                self.config.sub_level_max_compaction_bytes / 2,
166                max_compaction_bytes,
167                self.config.level0_sub_level_compact_level_count as usize,
168                self.config.level0_max_compact_file_number,
169                overlap_strategy.clone(),
170                self.developer_config.enable_check_task_level_overlap,
171                self.config
172                    .max_l0_compact_level_count
173                    .unwrap_or(compaction_config::max_l0_compact_level_count())
174                    as usize,
175                self.config
176                    .enable_optimize_l0_interval_selection
177                    .unwrap_or(compaction_config::enable_optimize_l0_interval_selection()),
178            );
179
180            let l0_select_tables_vec = non_overlap_sub_level_picker
181                .pick_l0_multi_non_overlap_level(
182                    &l0.sub_levels[idx..=max_vnode_partition_idx],
183                    level_handler,
184                );
185
186            if l0_select_tables_vec.is_empty() {
187                continue;
188            }
189
190            let mut select_input_size = 0;
191            let mut total_file_count = 0;
192            for input in l0_select_tables_vec {
193                let mut select_level_inputs = Vec::with_capacity(input.sstable_infos.len());
194                let mut target_sub_level_id = None;
195                for (sub_level_id, level_select_sst) in input.sstable_infos {
196                    if level_select_sst.is_empty() {
197                        continue;
198                    }
199
200                    if target_sub_level_id.is_none() {
201                        target_sub_level_id = Some(sub_level_id);
202                    }
203
204                    select_level_inputs.push(InputLevel {
205                        level_idx: 0,
206                        level_type: LevelType::Nonoverlapping,
207                        table_infos: level_select_sst,
208                    });
209
210                    select_input_size += input.total_file_size;
211                    total_file_count += input.total_file_count;
212                }
213                select_level_inputs.reverse();
214
215                let result = CompactionInput {
216                    input_levels: select_level_inputs,
217                    target_sub_level_id: target_sub_level_id.unwrap(),
218                    select_input_size,
219                    total_file_count: total_file_count as u64,
220                    ..Default::default()
221                };
222
223                if !self.compaction_task_validator.valid_compact_task(
224                    &result,
225                    ValidationRuleType::Intra,
226                    stats,
227                ) {
228                    continue;
229                }
230
231                return Some(result);
232            }
233        }
234
235        None
236    }
237
238    fn pick_l0_trivial_move_file(
239        &self,
240        l0: &OverlappingLevel,
241        level_handlers: &[LevelHandler],
242        stats: &mut LocalPickerStatistic,
243    ) -> Option<CompactionInput> {
244        if !self.developer_config.enable_trivial_move {
245            return None;
246        }
247
248        let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());
249
250        for (idx, level) in l0.sub_levels.iter().enumerate() {
251            if level.level_type == LevelType::Overlapping || idx + 1 >= l0.sub_levels.len() {
252                continue;
253            }
254
255            if l0.sub_levels[idx + 1].level_type == LevelType::Overlapping {
256                continue;
257            }
258
259            if level_handlers[0].is_level_pending_compact(level) {
260                continue;
261            }
262
263            if l0.sub_levels[idx + 1].vnode_partition_count
264                != l0.sub_levels[idx].vnode_partition_count
265            {
266                continue;
267            }
268
269            let trivial_move_picker = TrivialMovePicker::new(
270                0,
271                0,
272                overlap_strategy.clone(),
273                0,
274                self.config
275                    .sst_allowed_trivial_move_max_count
276                    .unwrap_or(compaction_config::sst_allowed_trivial_move_max_count())
277                    as usize,
278            );
279
280            if let Some(select_ssts) = trivial_move_picker.pick_multi_trivial_move_ssts(
281                &l0.sub_levels[idx + 1].table_infos,
282                &level.table_infos,
283                level_handlers,
284                stats,
285            ) {
286                let mut overlap = overlap_strategy.create_overlap_info();
287                select_ssts
288                    .iter()
289                    .for_each(|sst| overlap.update(&sst.key_range));
290
291                assert!(
292                    overlap
293                        .check_multiple_overlap(&l0.sub_levels[idx].table_infos)
294                        .is_empty()
295                );
296
297                let select_input_size = select_ssts.iter().map(|sst| sst.sst_size).sum();
298                let total_file_count = select_ssts.len() as u64;
299                let input_levels = vec![
300                    InputLevel {
301                        level_idx: 0,
302                        level_type: LevelType::Nonoverlapping,
303                        table_infos: select_ssts,
304                    },
305                    InputLevel {
306                        level_idx: 0,
307                        level_type: LevelType::Nonoverlapping,
308                        table_infos: vec![],
309                    },
310                ];
311                return Some(CompactionInput {
312                    input_levels,
313                    target_level: 0,
314                    target_sub_level_id: level.sub_level_id,
315                    select_input_size,
316                    total_file_count,
317                    ..Default::default()
318                });
319            }
320        }
321        None
322    }
323}
324
325pub struct WholeLevelCompactionPicker {
326    config: Arc<CompactionConfig>,
327    compaction_task_validator: Arc<CompactionTaskValidator>,
328}
329
330impl WholeLevelCompactionPicker {
331    pub fn new(
332        config: Arc<CompactionConfig>,
333        compaction_task_validator: Arc<CompactionTaskValidator>,
334    ) -> Self {
335        Self {
336            config,
337            compaction_task_validator,
338        }
339    }
340
341    pub fn pick_whole_level(
342        &self,
343        l0: &OverlappingLevel,
344        level_handler: &LevelHandler,
345        partition_count: u32,
346        stats: &mut LocalPickerStatistic,
347    ) -> Option<CompactionInput> {
348        if partition_count == 0 {
349            return None;
350        }
351        for (idx, level) in l0.sub_levels.iter().enumerate() {
352            if level.level_type != LevelType::Nonoverlapping
353                || level.vnode_partition_count == partition_count
354            {
355                continue;
356            }
357
358            let max_compaction_bytes = std::cmp::max(
359                self.config.max_bytes_for_level_base,
360                self.config.sub_level_max_compaction_bytes
361                    * (self.config.level0_sub_level_compact_level_count as u64),
362            );
363
364            let mut select_input_size = 0;
365
366            let mut select_level_inputs = vec![];
367            let mut total_file_count = 0;
368            let mut wait_enough = false;
369            for next_level in l0.sub_levels.iter().skip(idx) {
370                if (select_input_size > max_compaction_bytes
371                    || total_file_count > self.config.level0_max_compact_file_number
372                    || next_level.vnode_partition_count == partition_count)
373                    && select_level_inputs.len() > 1
374                {
375                    wait_enough = true;
376                    break;
377                }
378
379                if level_handler.is_level_pending_compact(next_level) {
380                    break;
381                }
382
383                select_input_size += next_level.total_file_size;
384                total_file_count += next_level.table_infos.len() as u64;
385
386                select_level_inputs.push(InputLevel {
387                    level_idx: 0,
388                    level_type: next_level.level_type,
389                    table_infos: next_level.table_infos.clone(),
390                });
391            }
392            if select_level_inputs.len() > 1 {
393                let vnode_partition_count =
394                    if select_input_size > self.config.sub_level_max_compaction_bytes / 2 {
395                        partition_count
396                    } else {
397                        0
398                    };
399                let result = CompactionInput {
400                    input_levels: select_level_inputs,
401                    target_sub_level_id: level.sub_level_id,
402                    select_input_size,
403                    total_file_count,
404                    vnode_partition_count,
405                    ..Default::default()
406                };
407                if wait_enough
408                    || self.compaction_task_validator.valid_compact_task(
409                        &result,
410                        ValidationRuleType::Intra,
411                        stats,
412                    )
413                {
414                    return Some(result);
415                }
416            }
417        }
418
419        None
420    }
421}
422
423#[cfg(test)]
424pub mod tests {
425    use risingwave_hummock_sdk::level::Level;
426
427    use super::*;
428    use crate::hummock::compaction::TierCompactionPicker;
429    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
430    use crate::hummock::compaction::selector::tests::{
431        generate_l0_nonoverlapping_multi_sublevels, generate_l0_nonoverlapping_sublevels,
432        generate_l0_overlapping_sublevels, generate_level, generate_table,
433        push_table_level0_overlapping, push_tables_level0_nonoverlapping,
434    };
435
436    fn create_compaction_picker_for_test() -> IntraCompactionPicker {
437        let config = Arc::new(
438            CompactionConfigBuilder::new()
439                .level0_tier_compact_file_number(2)
440                .level0_sub_level_compact_level_count(1)
441                .build(),
442        );
443        IntraCompactionPicker::for_test(config, Arc::new(CompactionDeveloperConfig::default()))
444    }
445
446    #[test]
447    fn test_l0_to_l1_compact_conflict() {
448        // When picking L0->L1, L0's selecting_key_range should not be overlapped with L0's
449        // compacting_key_range.
450        let levels = vec![Level {
451            level_idx: 1,
452            level_type: LevelType::Nonoverlapping,
453            table_infos: vec![],
454            ..Default::default()
455        }];
456        let mut levels = Levels {
457            levels,
458            l0: OverlappingLevel {
459                sub_levels: vec![],
460                total_file_size: 0,
461                uncompressed_file_size: 0,
462            },
463            ..Default::default()
464        };
465        push_tables_level0_nonoverlapping(
466            &mut levels,
467            vec![
468                generate_table(1, 1, 100, 300, 2),
469                generate_table(2, 1, 350, 500, 2),
470            ],
471        );
472        let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
473
474        let mut local_stats = LocalPickerStatistic::default();
475        push_tables_level0_nonoverlapping(&mut levels, vec![generate_table(3, 1, 250, 300, 3)]);
476        let config: CompactionConfig = CompactionConfigBuilder::new()
477            .level0_tier_compact_file_number(2)
478            .max_compaction_bytes(1000)
479            .sub_level_max_compaction_bytes(150)
480            .max_bytes_for_level_multiplier(1)
481            .level0_sub_level_compact_level_count(3)
482            .build();
483        let mut picker = TierCompactionPicker::new(Arc::new(config));
484
485        let ret: Option<CompactionInput> =
486            picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
487        assert!(ret.is_none());
488    }
489
490    #[test]
491    fn test_compacting_key_range_overlap_intra_l0() {
492        // When picking L0->L0, L0's selecting_key_range should not be overlapped with L0's
493        // compacting_key_range.
494        let mut picker = create_compaction_picker_for_test();
495
496        let mut levels = Levels {
497            levels: vec![Level {
498                level_idx: 1,
499                level_type: LevelType::Nonoverlapping,
500                table_infos: vec![generate_table(3, 1, 200, 300, 2)],
501                ..Default::default()
502            }],
503            l0: generate_l0_nonoverlapping_sublevels(vec![
504                generate_table(1, 1, 100, 210, 2),
505                generate_table(2, 1, 200, 250, 2),
506            ]),
507            ..Default::default()
508        };
509        let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
510
511        let mut local_stats = LocalPickerStatistic::default();
512        let ret = picker
513            .pick_compaction(&levels, &levels_handler, &mut local_stats)
514            .unwrap();
515        ret.add_pending_task(0, &mut levels_handler);
516
517        push_table_level0_overlapping(&mut levels, generate_table(4, 1, 170, 180, 3));
518        assert!(
519            picker
520                .pick_compaction(&levels, &levels_handler, &mut local_stats)
521                .is_none()
522        );
523    }
524
525    #[test]
526    fn test_pick_l0_intra() {
527        {
528            let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
529                vec![
530                    generate_table(6, 1, 50, 99, 1),
531                    generate_table(1, 1, 100, 200, 1),
532                    generate_table(2, 1, 250, 300, 1),
533                ],
534                vec![
535                    generate_table(3, 1, 10, 90, 1),
536                    generate_table(6, 1, 100, 110, 1),
537                ],
538                vec![
539                    generate_table(4, 1, 50, 99, 1),
540                    generate_table(5, 1, 100, 200, 1),
541                ],
542            ]);
543            let levels = Levels {
544                l0,
545                levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
546                ..Default::default()
547            };
548            let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
549            levels_handler[1].add_pending_task(100, 1, &levels.levels[0].table_infos);
550            let config = Arc::new(
551                CompactionConfigBuilder::new()
552                    .level0_sub_level_compact_level_count(1)
553                    .level0_overlapping_sub_level_compact_level_count(4)
554                    .build(),
555            );
556            let mut picker = IntraCompactionPicker::for_test(
557                config,
558                Arc::new(CompactionDeveloperConfig::default()),
559            );
560            let mut local_stats = LocalPickerStatistic::default();
561            let ret = picker
562                .pick_compaction(&levels, &levels_handler, &mut local_stats)
563                .unwrap();
564            ret.add_pending_task(1, &mut levels_handler);
565            assert_eq!(
566                ret.input_levels
567                    .iter()
568                    .map(|i| i.table_infos.len())
569                    .sum::<usize>(),
570                3
571            );
572        }
573
574        {
575            // Suppose keyguard [100, 200] [300, 400]
576            // will pick sst [1, 3, 4]
577            let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
578                vec![
579                    generate_table(1, 1, 100, 200, 1),
580                    generate_table(2, 1, 300, 400, 1),
581                ],
582                vec![
583                    generate_table(3, 1, 100, 200, 1),
584                    generate_table(6, 1, 300, 500, 1),
585                ],
586                vec![
587                    generate_table(4, 1, 100, 200, 1),
588                    generate_table(5, 1, 300, 400, 1),
589                ],
590            ]);
591            let levels = Levels {
592                l0,
593                levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
594                ..Default::default()
595            };
596            let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
597            levels_handler[1].add_pending_task(100, 1, &levels.levels[0].table_infos);
598            let config = Arc::new(
599                CompactionConfigBuilder::new()
600                    .level0_sub_level_compact_level_count(1)
601                    .sub_level_max_compaction_bytes(300)
602                    .build(),
603            );
604            let mut picker = IntraCompactionPicker::for_test(
605                config,
606                Arc::new(CompactionDeveloperConfig::default()),
607            );
608            let mut local_stats = LocalPickerStatistic::default();
609            let ret = picker
610                .pick_compaction(&levels, &levels_handler, &mut local_stats)
611                .unwrap();
612            ret.add_pending_task(1, &mut levels_handler);
613            assert_eq!(
614                ret.input_levels
615                    .iter()
616                    .map(|i| i.table_infos.len())
617                    .sum::<usize>(),
618                3
619            );
620
621            assert_eq!(4, ret.input_levels[0].table_infos[0].sst_id);
622            assert_eq!(3, ret.input_levels[1].table_infos[0].sst_id);
623            assert_eq!(1, ret.input_levels[2].table_infos[0].sst_id);
624
625            // will pick sst [2, 6]
626            let ret2 = picker
627                .pick_compaction(&levels, &levels_handler, &mut local_stats)
628                .unwrap();
629
630            assert_eq!(
631                ret2.input_levels
632                    .iter()
633                    .map(|i| i.table_infos.len())
634                    .sum::<usize>(),
635                2
636            );
637
638            assert_eq!(6, ret2.input_levels[0].table_infos[0].sst_id);
639            assert_eq!(2, ret2.input_levels[1].table_infos[0].sst_id);
640        }
641
642        {
643            let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
644                vec![
645                    generate_table(1, 1, 100, 149, 1),
646                    generate_table(6, 1, 150, 199, 1),
647                    generate_table(7, 1, 200, 250, 1),
648                    generate_table(2, 1, 300, 400, 1),
649                ],
650                vec![
651                    generate_table(3, 1, 100, 149, 1),
652                    generate_table(8, 1, 150, 199, 1),
653                    generate_table(9, 1, 200, 250, 1),
654                    generate_table(10, 1, 300, 400, 1),
655                ],
656                vec![
657                    generate_table(4, 1, 100, 199, 1),
658                    generate_table(11, 1, 200, 250, 1),
659                    generate_table(5, 1, 300, 350, 1),
660                ],
661            ]);
662            let levels = Levels {
663                l0,
664                levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
665                ..Default::default()
666            };
667            let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
668            levels_handler[1].add_pending_task(100, 1, &levels.levels[0].table_infos);
669            let config = Arc::new(
670                CompactionConfigBuilder::new()
671                    .level0_sub_level_compact_level_count(1)
672                    .sub_level_max_compaction_bytes(300)
673                    .build(),
674            );
675            let mut picker = IntraCompactionPicker::for_test(
676                config,
677                Arc::new(CompactionDeveloperConfig::default()),
678            );
679            let mut local_stats = LocalPickerStatistic::default();
680            let ret = picker
681                .pick_compaction(&levels, &levels_handler, &mut local_stats)
682                .unwrap();
683            ret.add_pending_task(1, &mut levels_handler);
684            assert_eq!(
685                ret.input_levels
686                    .iter()
687                    .map(|i| i.table_infos.len())
688                    .sum::<usize>(),
689                3
690            );
691
692            assert_eq!(11, ret.input_levels[0].table_infos[0].sst_id);
693            assert_eq!(9, ret.input_levels[1].table_infos[0].sst_id);
694            assert_eq!(7, ret.input_levels[2].table_infos[0].sst_id);
695
696            let ret2 = picker
697                .pick_compaction(&levels, &levels_handler, &mut local_stats)
698                .unwrap();
699
700            assert_eq!(
701                ret2.input_levels
702                    .iter()
703                    .map(|i| i.table_infos.len())
704                    .sum::<usize>(),
705                3
706            );
707
708            assert_eq!(5, ret2.input_levels[0].table_infos[0].sst_id);
709            assert_eq!(10, ret2.input_levels[1].table_infos[0].sst_id);
710            assert_eq!(2, ret2.input_levels[2].table_infos[0].sst_id);
711        }
712    }
713
714    fn is_l0_trivial_move(compaction_input: &CompactionInput) -> bool {
715        compaction_input.input_levels.len() == 2
716            && !compaction_input.input_levels[0].table_infos.is_empty()
717            && compaction_input.input_levels[1].table_infos.is_empty()
718    }
719
720    #[test]
721    fn test_trivial_move() {
722        let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
723        let config = Arc::new(
724            CompactionConfigBuilder::new()
725                .level0_tier_compact_file_number(2)
726                .target_file_size_base(30)
727                .level0_sub_level_compact_level_count(20) // reject intra
728                .build(),
729        );
730        let mut picker =
731            IntraCompactionPicker::for_test(config, Arc::new(CompactionDeveloperConfig::default()));
732
733        // Cannot trivial move because there is only 1 sub-level.
734        let l0 = generate_l0_overlapping_sublevels(vec![vec![
735            generate_table(1, 1, 100, 110, 1),
736            generate_table(2, 1, 150, 250, 1),
737        ]]);
738        let levels = Levels {
739            l0,
740            levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
741            ..Default::default()
742        };
743        levels_handler[1].add_pending_task(100, 1, &levels.levels[0].table_infos);
744        let mut local_stats = LocalPickerStatistic::default();
745        let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
746        assert!(ret.is_none());
747
748        // Cannot trivial move because sub-levels are overlapping
749        let l0: OverlappingLevel = generate_l0_overlapping_sublevels(vec![
750            vec![
751                generate_table(1, 1, 100, 110, 1),
752                generate_table(2, 1, 150, 250, 1),
753            ],
754            vec![generate_table(3, 1, 10, 90, 1)],
755            vec![generate_table(4, 1, 10, 90, 1)],
756            vec![generate_table(5, 1, 10, 90, 1)],
757        ]);
758        let mut levels = Levels {
759            l0,
760            levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
761            ..Default::default()
762        };
763        assert!(
764            picker
765                .pick_compaction(&levels, &levels_handler, &mut local_stats)
766                .is_none()
767        );
768
769        // Cannot trivial move because latter sub-level is overlapping
770        levels.l0.sub_levels[0].level_type = LevelType::Nonoverlapping;
771        levels.l0.sub_levels[1].level_type = LevelType::Overlapping;
772        let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
773        assert!(ret.is_none());
774
775        // Cannot trivial move because former sub-level is overlapping
776        levels.l0.sub_levels[0].level_type = LevelType::Overlapping;
777        levels.l0.sub_levels[1].level_type = LevelType::Nonoverlapping;
778        let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
779        assert!(ret.is_none());
780
781        // trivial move
782        levels.l0.sub_levels[0].level_type = LevelType::Nonoverlapping;
783        levels.l0.sub_levels[1].level_type = LevelType::Nonoverlapping;
784        let ret = picker
785            .pick_compaction(&levels, &levels_handler, &mut local_stats)
786            .unwrap();
787        assert!(is_l0_trivial_move(&ret));
788        assert_eq!(ret.input_levels[0].table_infos.len(), 1);
789    }
790    #[test]
791    fn test_pick_whole_level() {
792        let config = Arc::new(
793            CompactionConfigBuilder::new()
794                .level0_max_compact_file_number(20)
795                .build(),
796        );
797        let mut table_infos = vec![];
798        for epoch in 1..3 {
799            let base = epoch * 100;
800            let mut ssts = vec![];
801            for i in 1..50 {
802                let left = (i as usize) * 100;
803                let right = left + 100;
804                ssts.push(generate_table(base + i, 1, left, right, epoch));
805            }
806            table_infos.push(ssts);
807        }
808
809        let l0 = generate_l0_nonoverlapping_multi_sublevels(table_infos);
810        let compaction_task_validator = Arc::new(CompactionTaskValidator::new(config.clone()));
811        let picker = WholeLevelCompactionPicker::new(config, compaction_task_validator);
812        let level_handler = LevelHandler::new(0);
813        let ret = picker
814            .pick_whole_level(&l0, &level_handler, 4, &mut LocalPickerStatistic::default())
815            .unwrap();
816        assert_eq!(ret.input_levels.len(), 2);
817    }
818
819    #[test]
820    fn test_priority() {
821        let config = Arc::new(
822            CompactionConfigBuilder::new()
823                .level0_max_compact_file_number(20)
824                .sub_level_max_compaction_bytes(1)
825                .level0_sub_level_compact_level_count(2)
826                .build(),
827        );
828        let mut table_infos = vec![];
829        for epoch in 1..3 {
830            let base = epoch * 100;
831            let mut ssts = vec![];
832            for i in 1..50 {
833                let left = (i as usize) * 100;
834                let right = left + 100;
835                ssts.push(generate_table(base + i, 1, left, right, epoch));
836            }
837            table_infos.push(ssts);
838        }
839
840        let mut l0 = generate_l0_nonoverlapping_multi_sublevels(table_infos);
841        // trivial-move
842        l0.sub_levels[1]
843            .table_infos
844            .push(generate_table(9999, 900000000, 0, 100, 1));
845
846        l0.sub_levels[0].total_file_size = 1;
847        l0.sub_levels[1].total_file_size = 1;
848
849        let mut picker = IntraCompactionPicker::new_with_validator(
850            config,
851            Arc::new(CompactionTaskValidator::unused()),
852            Arc::new(CompactionDeveloperConfig::default()),
853        );
854        let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
855        let mut local_stats = LocalPickerStatistic::default();
856
857        let levels = Levels {
858            l0,
859            levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
860            ..Default::default()
861        };
862
863        let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
864        assert!(is_l0_trivial_move(ret.as_ref().unwrap()));
865        ret.as_ref()
866            .unwrap()
867            .add_pending_task(1, &mut levels_handler);
868        let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
869        assert!(ret.is_some());
870        let input = ret.as_ref().unwrap();
871        assert_eq!(input.input_levels.len(), 2);
872        assert_ne!(
873            levels.l0.sub_levels[0].table_infos.len(),
874            input.input_levels[0].table_infos.len()
875        );
876        assert_ne!(
877            levels.l0.sub_levels[1].table_infos.len(),
878            input.input_levels[1].table_infos.len()
879        );
880    }
881}