risingwave_meta/hummock/compaction/picker/
base_level_compaction_picker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cell::RefCell;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::config::default::compaction_config;
20use risingwave_hummock_sdk::level::{InputLevel, Level, Levels, OverlappingLevel};
21use risingwave_pb::hummock::{CompactionConfig, LevelType};
22
23use super::min_overlap_compaction_picker::NonOverlapSubLevelPicker;
24use super::{
25    CompactionInput, CompactionPicker, CompactionTaskValidator, LocalPickerStatistic,
26    ValidationRuleType,
27};
28use crate::hummock::compaction::picker::TrivialMovePicker;
29use crate::hummock::compaction::{CompactionDeveloperConfig, create_overlap_strategy};
30use crate::hummock::level_handler::LevelHandler;
31
32std::thread_local! {
33    static LOG_COUNTER: RefCell<usize> = const { RefCell::new(0) };
34}
35
36pub struct LevelCompactionPicker {
37    target_level: usize,
38    config: Arc<CompactionConfig>,
39    compaction_task_validator: Arc<CompactionTaskValidator>,
40    developer_config: Arc<CompactionDeveloperConfig>,
41}
42
43impl CompactionPicker for LevelCompactionPicker {
44    fn pick_compaction(
45        &mut self,
46        levels: &Levels,
47        level_handlers: &[LevelHandler],
48        stats: &mut LocalPickerStatistic,
49    ) -> Option<CompactionInput> {
50        let l0 = &levels.l0;
51        if l0.sub_levels.is_empty() {
52            return None;
53        }
54        if l0.sub_levels[0].level_type != LevelType::Nonoverlapping
55            && l0.sub_levels[0].table_infos.len() > 1
56        {
57            stats.skip_by_overlapping += 1;
58            return None;
59        }
60
61        let is_l0_pending_compact =
62            level_handlers[0].is_level_all_pending_compact(&l0.sub_levels[0]);
63
64        if is_l0_pending_compact {
65            stats.skip_by_pending_files += 1;
66            return None;
67        }
68
69        if let Some(mut ret) = self.pick_base_trivial_move(
70            l0,
71            levels.get_level(self.target_level),
72            level_handlers,
73            stats,
74        ) {
75            ret.vnode_partition_count = self.config.split_weight_by_vnode;
76            return Some(ret);
77        }
78
79        debug_assert!(self.target_level == levels.get_level(self.target_level).level_idx as usize);
80        if let Some(ret) = self.pick_multi_level_to_base(
81            l0,
82            levels.get_level(self.target_level),
83            self.config.split_weight_by_vnode,
84            level_handlers,
85            stats,
86        ) {
87            return Some(ret);
88        }
89
90        None
91    }
92}
93
94impl LevelCompactionPicker {
95    #[cfg(test)]
96    pub fn new(
97        target_level: usize,
98        config: Arc<CompactionConfig>,
99        developer_config: Arc<CompactionDeveloperConfig>,
100    ) -> LevelCompactionPicker {
101        LevelCompactionPicker {
102            target_level,
103            compaction_task_validator: Arc::new(CompactionTaskValidator::new(config.clone())),
104            config,
105            developer_config,
106        }
107    }
108
109    pub fn new_with_validator(
110        target_level: usize,
111        config: Arc<CompactionConfig>,
112        compaction_task_validator: Arc<CompactionTaskValidator>,
113        developer_config: Arc<CompactionDeveloperConfig>,
114    ) -> LevelCompactionPicker {
115        LevelCompactionPicker {
116            target_level,
117            config,
118            compaction_task_validator,
119            developer_config,
120        }
121    }
122
123    fn pick_base_trivial_move(
124        &self,
125        l0: &OverlappingLevel,
126        target_level: &Level,
127        level_handlers: &[LevelHandler],
128        stats: &mut LocalPickerStatistic,
129    ) -> Option<CompactionInput> {
130        if !self.developer_config.enable_trivial_move {
131            return None;
132        }
133
134        let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());
135        let trivial_move_picker = TrivialMovePicker::new(
136            0,
137            self.target_level,
138            overlap_strategy.clone(),
139            if self.compaction_task_validator.is_enable() {
140                // tips: Older versions of the compaction group will be upgraded without this configuration, we leave it with its default behaviour and enable it manually when needed.
141                self.config.sst_allowed_trivial_move_min_size.unwrap_or(0)
142            } else {
143                0
144            },
145            self.config
146                .sst_allowed_trivial_move_max_count
147                .unwrap_or(compaction_config::sst_allowed_trivial_move_max_count())
148                as usize,
149        );
150
151        trivial_move_picker.pick_trivial_move_task(
152            &l0.sub_levels[0].table_infos,
153            &target_level.table_infos,
154            level_handlers,
155            stats,
156        )
157    }
158
159    fn pick_multi_level_to_base(
160        &self,
161        l0: &OverlappingLevel,
162        target_level: &Level,
163        vnode_partition_count: u32,
164        level_handlers: &[LevelHandler],
165        stats: &mut LocalPickerStatistic,
166    ) -> Option<CompactionInput> {
167        let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());
168        let min_compaction_bytes = self.config.sub_level_max_compaction_bytes;
169        let non_overlap_sub_level_picker = NonOverlapSubLevelPicker::new(
170            min_compaction_bytes,
171            // divide by 2 because we need to select files of base level and it need use the other
172            // half quota.
173            std::cmp::max(
174                self.config.max_bytes_for_level_base,
175                self.config.max_compaction_bytes / 2,
176            ),
177            1,
178            // The maximum number of sub_level compact level per task
179            self.config.level0_max_compact_file_number,
180            overlap_strategy.clone(),
181            self.developer_config.enable_check_task_level_overlap,
182            self.config
183                .max_l0_compact_level_count
184                .unwrap_or(compaction_config::max_l0_compact_level_count()) as usize,
185        );
186
187        let mut max_vnode_partition_idx = 0;
188        for (idx, level) in l0.sub_levels.iter().enumerate() {
189            if level.vnode_partition_count < vnode_partition_count {
190                break;
191            }
192            max_vnode_partition_idx = idx;
193        }
194
195        let l0_select_tables_vec = non_overlap_sub_level_picker.pick_l0_multi_non_overlap_level(
196            &l0.sub_levels[..=max_vnode_partition_idx],
197            &level_handlers[0],
198        );
199        if l0_select_tables_vec.is_empty() {
200            stats.skip_by_pending_files += 1;
201            return None;
202        }
203
204        let mut skip_by_pending = false;
205        let mut input_levels = vec![];
206
207        for input in l0_select_tables_vec {
208            let l0_select_tables = input
209                .sstable_infos
210                .iter()
211                .flat_map(|select_tables| select_tables.clone())
212                .collect_vec();
213
214            let target_level_ssts = overlap_strategy
215                .check_base_level_overlap(&l0_select_tables, &target_level.table_infos);
216
217            let mut target_level_size = 0;
218            let mut pending_compact = false;
219            for sst in &target_level_ssts {
220                if level_handlers[target_level.level_idx as usize].is_pending_compact(&sst.sst_id) {
221                    pending_compact = true;
222                    break;
223                }
224
225                target_level_size += sst.sst_size;
226            }
227
228            if pending_compact {
229                skip_by_pending = true;
230                continue;
231            }
232
233            input_levels.push((input, target_level_size, target_level_ssts));
234        }
235
236        if input_levels.is_empty() {
237            if skip_by_pending {
238                stats.skip_by_pending_files += 1;
239            }
240            return None;
241        }
242
243        for (input, target_file_size, target_level_files) in input_levels {
244            let mut select_level_inputs = input
245                .sstable_infos
246                .into_iter()
247                .map(|table_infos| InputLevel {
248                    level_idx: 0,
249                    level_type: LevelType::Nonoverlapping,
250                    table_infos,
251                })
252                .collect_vec();
253            select_level_inputs.reverse();
254            let target_file_count = target_level_files.len();
255            select_level_inputs.push(InputLevel {
256                level_idx: target_level.level_idx,
257                level_type: target_level.level_type,
258                table_infos: target_level_files,
259            });
260
261            let result = CompactionInput {
262                input_levels: select_level_inputs,
263                target_level: self.target_level,
264                select_input_size: input.total_file_size,
265                target_input_size: target_file_size,
266                total_file_count: (input.total_file_count + target_file_count) as u64,
267                vnode_partition_count,
268                ..Default::default()
269            };
270
271            if !self.compaction_task_validator.valid_compact_task(
272                &result,
273                ValidationRuleType::ToBase,
274                stats,
275            ) {
276                if l0.total_file_size > target_level.total_file_size * 8 {
277                    let log_counter = LOG_COUNTER.with_borrow_mut(|counter| {
278                        *counter += 1;
279                        *counter
280                    });
281
282                    // reduce log
283                    if log_counter % 100 == 0 {
284                        tracing::warn!(
285                            "skip task with level count: {}, file count: {}, select size: {}, target size: {}, target level size: {}",
286                            result.input_levels.len(),
287                            result.total_file_count,
288                            result.select_input_size,
289                            result.target_input_size,
290                            target_level.total_file_size,
291                        );
292                    }
293                }
294                continue;
295            }
296
297            return Some(result);
298        }
299        None
300    }
301}
302
303#[cfg(test)]
304pub mod tests {
305
306    use super::*;
307    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
308    use crate::hummock::compaction::selector::tests::*;
309    use crate::hummock::compaction::{CompactionMode, TierCompactionPicker};
310
311    fn create_compaction_picker_for_test() -> LevelCompactionPicker {
312        let config = Arc::new(
313            CompactionConfigBuilder::new()
314                .level0_tier_compact_file_number(2)
315                .level0_sub_level_compact_level_count(1)
316                .build(),
317        );
318        LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()))
319    }
320
321    #[test]
322    fn test_compact_l0_to_l1() {
323        let mut picker = create_compaction_picker_for_test();
324        let l0 = generate_level(
325            0,
326            vec![
327                generate_table(5, 1, 100, 200, 2),
328                generate_table(4, 1, 201, 300, 2),
329            ],
330        );
331        let mut levels = Levels {
332            l0: OverlappingLevel {
333                total_file_size: l0.total_file_size,
334                uncompressed_file_size: l0.total_file_size,
335                sub_levels: vec![l0],
336            },
337            levels: vec![generate_level(
338                1,
339                vec![
340                    generate_table(3, 1, 1, 100, 1),
341                    generate_table(2, 1, 101, 150, 1),
342                    generate_table(1, 1, 201, 210, 1),
343                ],
344            )],
345            ..Default::default()
346        };
347        let mut local_stats = LocalPickerStatistic::default();
348        let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
349
350        let ret = picker
351            .pick_compaction(&levels, &levels_handler, &mut local_stats)
352            .unwrap();
353        assert_eq!(ret.input_levels[0].table_infos.len(), 1);
354        assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 4);
355        assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 1);
356
357        ret.add_pending_task(0, &mut levels_handler);
358        {
359            push_table_level0_nonoverlapping(&mut levels, generate_table(6, 1, 100, 200, 2));
360            push_table_level0_nonoverlapping(&mut levels, generate_table(7, 1, 301, 333, 4));
361            let ret2 = picker
362                .pick_compaction(&levels, &levels_handler, &mut local_stats)
363                .unwrap();
364
365            assert_eq!(ret2.input_levels[0].table_infos.len(), 1);
366            assert_eq!(ret2.input_levels[0].table_infos[0].sst_id, 6);
367            assert_eq!(ret2.input_levels[1].table_infos[0].sst_id, 5);
368        }
369
370        levels.l0.sub_levels[0]
371            .table_infos
372            .retain(|table| table.sst_id != 4);
373        levels.l0.total_file_size -= ret.input_levels[0].table_infos[0].file_size;
374
375        levels_handler[0].remove_task(0);
376        levels_handler[1].remove_task(0);
377
378        let ret = picker
379            .pick_compaction(&levels, &levels_handler, &mut local_stats)
380            .unwrap();
381        assert_eq!(ret.input_levels.len(), 3);
382        assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 6);
383        assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 5);
384        assert_eq!(ret.input_levels[2].table_infos.len(), 2);
385        assert_eq!(ret.input_levels[2].table_infos[0].sst_id, 3);
386        assert_eq!(ret.input_levels[2].table_infos[1].sst_id, 2);
387        ret.add_pending_task(1, &mut levels_handler);
388
389        let mut local_stats = LocalPickerStatistic::default();
390        // Cannot pick because no idle table in sub-level[0]. (And sub-level[0] is pending
391        // actually).
392        push_table_level0_overlapping(&mut levels, generate_table(8, 1, 199, 233, 3));
393        let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
394        assert!(ret.is_none());
395
396        // Don't pick overlapping sub-level 8
397        levels_handler[0].remove_task(1);
398        levels_handler[1].remove_task(1);
399        let ret = picker
400            .pick_compaction(&levels, &levels_handler, &mut local_stats)
401            .unwrap();
402        assert_eq!(ret.input_levels.len(), 3);
403        assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 6);
404        assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 5);
405        assert_eq!(ret.input_levels[2].table_infos.len(), 2);
406    }
407    #[test]
408    fn test_selecting_key_range_overlap() {
409        // When picking L0->L1, all L1 files overlapped with selecting_key_range should be picked.
410        let config = Arc::new(
411            CompactionConfigBuilder::new()
412                .level0_tier_compact_file_number(2)
413                .compaction_mode(CompactionMode::Range as i32)
414                .level0_sub_level_compact_level_count(1)
415                .build(),
416        );
417        let mut picker =
418            LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
419
420        let levels = vec![Level {
421            level_idx: 1,
422            level_type: LevelType::Nonoverlapping,
423            table_infos: vec![
424                generate_table(3, 1, 0, 50, 1),
425                generate_table(4, 1, 150, 200, 1),
426                generate_table(5, 1, 250, 300, 1),
427            ],
428            ..Default::default()
429        }];
430        let mut levels = Levels {
431            levels,
432            l0: OverlappingLevel {
433                sub_levels: vec![],
434                total_file_size: 0,
435                uncompressed_file_size: 0,
436            },
437            ..Default::default()
438        };
439        push_tables_level0_nonoverlapping(&mut levels, vec![generate_table(1, 1, 50, 140, 2)]);
440        push_tables_level0_nonoverlapping(
441            &mut levels,
442            vec![
443                generate_table(7, 1, 200, 250, 2),
444                generate_table(8, 1, 400, 500, 2),
445            ],
446        );
447
448        let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
449
450        let mut local_stats = LocalPickerStatistic::default();
451        let ret = picker
452            .pick_compaction(&levels, &levels_handler, &mut local_stats)
453            .unwrap();
454
455        // pick
456        // l0 [sst_1]
457        // l1 [sst_3]
458        assert_eq!(ret.input_levels.len(), 2);
459        assert_eq!(
460            ret.input_levels[0]
461                .table_infos
462                .iter()
463                .map(|t| t.sst_id)
464                .collect_vec(),
465            vec![1]
466        );
467
468        assert_eq!(
469            ret.input_levels[1]
470                .table_infos
471                .iter()
472                .map(|t| t.sst_id)
473                .collect_vec(),
474            vec![3,]
475        );
476    }
477
478    #[test]
479    fn test_l0_to_l1_compact_conflict() {
480        // When picking L0->L1, L0's selecting_key_range should not be overlapped with L0's
481        // compacting_key_range.
482        let mut picker = create_compaction_picker_for_test();
483        let levels = vec![Level {
484            level_idx: 1,
485            level_type: LevelType::Nonoverlapping,
486            table_infos: vec![],
487            total_file_size: 0,
488            sub_level_id: 0,
489            uncompressed_file_size: 0,
490            ..Default::default()
491        }];
492        let mut levels = Levels {
493            levels,
494            l0: OverlappingLevel {
495                sub_levels: vec![],
496                total_file_size: 0,
497                uncompressed_file_size: 0,
498            },
499            ..Default::default()
500        };
501        push_tables_level0_nonoverlapping(
502            &mut levels,
503            vec![
504                generate_table(1, 1, 100, 300, 2),
505                generate_table(2, 1, 350, 500, 2),
506            ],
507        );
508        let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
509
510        let mut local_stats = LocalPickerStatistic::default();
511        let ret = picker
512            .pick_compaction(&levels, &levels_handler, &mut local_stats)
513            .unwrap();
514        // trivial_move
515        ret.add_pending_task(0, &mut levels_handler); // pending only for test
516        push_tables_level0_nonoverlapping(&mut levels, vec![generate_table(3, 1, 250, 300, 3)]);
517        let config: CompactionConfig = CompactionConfigBuilder::new()
518            .level0_tier_compact_file_number(2)
519            .max_compaction_bytes(1000)
520            .sub_level_max_compaction_bytes(150)
521            .max_bytes_for_level_multiplier(1)
522            .level0_sub_level_compact_level_count(3)
523            .build();
524        let mut picker = TierCompactionPicker::new(Arc::new(config));
525
526        let ret: Option<CompactionInput> =
527            picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
528        assert!(ret.is_none());
529    }
530
531    #[test]
532    fn test_skip_compact_write_amplification_limit() {
533        let config: CompactionConfig = CompactionConfigBuilder::new()
534            .level0_tier_compact_file_number(2)
535            .max_compaction_bytes(1000)
536            .sub_level_max_compaction_bytes(150)
537            .max_bytes_for_level_multiplier(1)
538            .level0_sub_level_compact_level_count(2)
539            .build();
540        let mut picker = LevelCompactionPicker::new(
541            1,
542            Arc::new(config),
543            Arc::new(CompactionDeveloperConfig::default()),
544        );
545
546        let mut levels = Levels {
547            levels: vec![Level {
548                level_idx: 1,
549                level_type: LevelType::Nonoverlapping,
550                table_infos: vec![
551                    generate_table(1, 1, 100, 399, 2),
552                    generate_table(2, 1, 400, 699, 2),
553                    generate_table(3, 1, 700, 999, 2),
554                ],
555                total_file_size: 900,
556                sub_level_id: 0,
557                uncompressed_file_size: 900,
558                ..Default::default()
559            }],
560            l0: generate_l0_nonoverlapping_sublevels(vec![]),
561            ..Default::default()
562        };
563        push_tables_level0_nonoverlapping(
564            &mut levels,
565            vec![
566                generate_table(4, 1, 100, 180, 2),
567                generate_table(5, 1, 400, 450, 2),
568                generate_table(6, 1, 600, 700, 2),
569            ],
570        );
571
572        let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
573        let mut local_stats = LocalPickerStatistic::default();
574        levels_handler[0].add_pending_task(1, 4, &levels.l0.sub_levels[0].table_infos);
575        let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
576        // Skip this compaction because the write amplification is too large.
577        assert!(ret.is_none());
578    }
579
580    #[test]
581    fn test_l0_to_l1_break_on_exceed_compaction_size() {
582        let mut local_stats = LocalPickerStatistic::default();
583        let mut l0 = generate_l0_overlapping_sublevels(vec![
584            vec![
585                generate_table(4, 1, 10, 90, 1),
586                generate_table(5, 1, 210, 220, 1),
587            ],
588            vec![generate_table(6, 1, 0, 100000, 1)],
589            vec![generate_table(7, 1, 0, 100000, 1)],
590        ]);
591        // We can set level_type only because the input above is valid.
592        for s in &mut l0.sub_levels {
593            s.level_type = LevelType::Nonoverlapping;
594        }
595        let levels = Levels {
596            l0,
597            levels: vec![generate_level(1, vec![generate_table(3, 1, 0, 100000, 1)])],
598            ..Default::default()
599        };
600        let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
601
602        // Pick with large max_compaction_bytes results all sub levels included in input.
603        let config = Arc::new(
604            CompactionConfigBuilder::new()
605                .max_compaction_bytes(500000)
606                .sub_level_max_compaction_bytes(50000)
607                .max_bytes_for_level_base(500000)
608                .level0_sub_level_compact_level_count(1)
609                .build(),
610        );
611        // Only include sub-level 0 results will violate MAX_WRITE_AMPLIFICATION.
612        // So all sub-levels are included to make write amplification < MAX_WRITE_AMPLIFICATION.
613        let mut picker =
614            LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
615        let ret = picker
616            .pick_compaction(&levels, &levels_handler, &mut local_stats)
617            .unwrap();
618        assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 7);
619        assert_eq!(
620            3,
621            ret.input_levels.iter().filter(|l| l.level_idx == 0).count()
622        );
623        assert_eq!(
624            4,
625            ret.input_levels
626                .iter()
627                .filter(|l| l.level_idx == 0)
628                .map(|l| l.table_infos.len())
629                .sum::<usize>()
630        );
631
632        // Pick with small max_compaction_bytes results partial sub levels included in input.
633        let config = Arc::new(
634            CompactionConfigBuilder::new()
635                .max_compaction_bytes(100010)
636                .max_bytes_for_level_base(512)
637                .level0_sub_level_compact_level_count(1)
638                .build(),
639        );
640        let mut picker =
641            LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
642
643        let ret = picker
644            .pick_compaction(&levels, &levels_handler, &mut local_stats)
645            .unwrap();
646        assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 6);
647        assert_eq!(
648            2,
649            ret.input_levels.iter().filter(|l| l.level_idx == 0).count()
650        );
651        assert_eq!(
652            3,
653            ret.input_levels
654                .iter()
655                .filter(|l| l.level_idx == 0)
656                .map(|l| l.table_infos.len())
657                .sum::<usize>()
658        );
659    }
660
661    #[test]
662    fn test_l0_to_l1_break_on_pending_sub_level() {
663        let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
664            vec![
665                generate_table(4, 1, 10, 90, 1),
666                generate_table(5, 1, 210, 220, 1),
667            ],
668            vec![generate_table(6, 1, 0, 100000, 1)],
669            vec![generate_table(7, 1, 0, 100000, 1)],
670        ]);
671
672        let levels = Levels {
673            l0,
674            levels: vec![generate_level(1, vec![generate_table(3, 1, 0, 100000, 1)])],
675            ..Default::default()
676        };
677        let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
678        let mut local_stats = LocalPickerStatistic::default();
679
680        // Create a pending sub-level.
681        let pending_level = levels.l0.sub_levels[1].clone();
682        assert_eq!(pending_level.sub_level_id, 1);
683        let tier_task_input = CompactionInput {
684            input_levels: vec![InputLevel {
685                level_idx: 0,
686                level_type: pending_level.level_type,
687                table_infos: pending_level.table_infos.clone(),
688            }],
689            target_level: 1,
690            target_sub_level_id: pending_level.sub_level_id,
691            ..Default::default()
692        };
693        assert!(!levels_handler[0].is_level_pending_compact(&pending_level));
694        tier_task_input.add_pending_task(1, &mut levels_handler);
695        assert!(levels_handler[0].is_level_pending_compact(&pending_level));
696
697        // Pick with large max_compaction_bytes results all sub levels included in input.
698        let config = Arc::new(
699            CompactionConfigBuilder::new()
700                .max_compaction_bytes(500000)
701                .level0_sub_level_compact_level_count(2)
702                .build(),
703        );
704
705        // Only include sub-level 0 results will violate MAX_WRITE_AMPLIFICATION.
706        // But stopped by pending sub-level when trying to include more sub-levels.
707        let mut picker = LevelCompactionPicker::new(
708            1,
709            config.clone(),
710            Arc::new(CompactionDeveloperConfig::default()),
711        );
712        let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
713        assert!(ret.is_none());
714
715        // Free the pending sub-level.
716        for pending_task_id in &levels_handler[0].pending_tasks_ids() {
717            levels_handler[0].remove_task(*pending_task_id);
718        }
719
720        // No more pending sub-level so we can get a task now.
721        let mut picker =
722            LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
723        picker
724            .pick_compaction(&levels, &levels_handler, &mut local_stats)
725            .unwrap();
726    }
727
728    #[test]
729    fn test_l0_to_base_when_all_base_pending() {
730        let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
731            vec![
732                generate_table(4, 1, 10, 90, 1),
733                generate_table(5, 1, 1000, 2000, 1),
734            ],
735            vec![generate_table(6, 1, 10, 90, 1)],
736        ]);
737
738        let levels = Levels {
739            l0,
740            levels: vec![generate_level(1, vec![generate_table(3, 1, 1, 100, 1)])],
741            ..Default::default()
742        };
743        let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
744        let mut local_stats = LocalPickerStatistic::default();
745
746        let config = Arc::new(
747            CompactionConfigBuilder::new()
748                .max_compaction_bytes(500000)
749                .level0_sub_level_compact_level_count(2)
750                .sub_level_max_compaction_bytes(1000)
751                .build(),
752        );
753
754        let mut picker =
755            LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
756        let ret = picker
757            .pick_compaction(&levels, &levels_handler, &mut local_stats)
758            .unwrap();
759        // 1. trivial_move
760        assert_eq!(2, ret.input_levels.len());
761        assert!(ret.input_levels[1].table_infos.is_empty());
762        assert_eq!(5, ret.input_levels[0].table_infos[0].sst_id);
763        ret.add_pending_task(0, &mut levels_handler);
764
765        let ret = picker
766            .pick_compaction(&levels, &levels_handler, &mut local_stats)
767            .unwrap();
768        assert_eq!(3, ret.input_levels.len());
769        assert_eq!(6, ret.input_levels[0].table_infos[0].sst_id);
770    }
771}