risingwave_meta/hummock/compaction/picker/
base_level_compaction_picker.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cell::RefCell;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::config::meta::default::compaction_config;
20use risingwave_hummock_sdk::level::{InputLevel, Level, Levels, OverlappingLevel};
21use risingwave_pb::hummock::{CompactionConfig, LevelType};
22
23use super::min_overlap_compaction_picker::NonOverlapSubLevelPicker;
24use super::{
25    CompactionInput, CompactionPicker, CompactionTaskValidator, LocalPickerStatistic,
26    ValidationRuleType,
27};
28use crate::hummock::compaction::picker::TrivialMovePicker;
29use crate::hummock::compaction::{CompactionDeveloperConfig, create_overlap_strategy};
30use crate::hummock::level_handler::LevelHandler;
31
32std::thread_local! {
33    static LOG_COUNTER: RefCell<usize> = const { RefCell::new(0) };
34}
35
36pub struct LevelCompactionPicker {
37    target_level: usize,
38    config: Arc<CompactionConfig>,
39    compaction_task_validator: Arc<CompactionTaskValidator>,
40    developer_config: Arc<CompactionDeveloperConfig>,
41}
42
43impl CompactionPicker for LevelCompactionPicker {
44    fn pick_compaction(
45        &mut self,
46        levels: &Levels,
47        level_handlers: &[LevelHandler],
48        stats: &mut LocalPickerStatistic,
49    ) -> Option<CompactionInput> {
50        let l0 = &levels.l0;
51        if l0.sub_levels.is_empty() {
52            return None;
53        }
54        if l0.sub_levels[0].level_type != LevelType::Nonoverlapping
55            && l0.sub_levels[0].table_infos.len() > 1
56        {
57            stats.skip_by_overlapping += 1;
58            return None;
59        }
60
61        let is_l0_pending_compact =
62            level_handlers[0].is_level_all_pending_compact(&l0.sub_levels[0]);
63
64        if is_l0_pending_compact {
65            stats.skip_by_pending_files += 1;
66            return None;
67        }
68
69        if let Some(mut ret) = self.pick_base_trivial_move(
70            l0,
71            levels.get_level(self.target_level),
72            level_handlers,
73            stats,
74        ) {
75            ret.vnode_partition_count = self.config.split_weight_by_vnode;
76            return Some(ret);
77        }
78
79        debug_assert!(self.target_level == levels.get_level(self.target_level).level_idx as usize);
80        if let Some(ret) = self.pick_multi_level_to_base(
81            l0,
82            levels.get_level(self.target_level),
83            self.config.split_weight_by_vnode,
84            level_handlers,
85            stats,
86        ) {
87            return Some(ret);
88        }
89
90        None
91    }
92}
93
94impl LevelCompactionPicker {
95    #[cfg(test)]
96    pub fn new(
97        target_level: usize,
98        config: Arc<CompactionConfig>,
99        developer_config: Arc<CompactionDeveloperConfig>,
100    ) -> LevelCompactionPicker {
101        LevelCompactionPicker {
102            target_level,
103            compaction_task_validator: Arc::new(CompactionTaskValidator::new(config.clone())),
104            config,
105            developer_config,
106        }
107    }
108
109    pub fn new_with_validator(
110        target_level: usize,
111        config: Arc<CompactionConfig>,
112        compaction_task_validator: Arc<CompactionTaskValidator>,
113        developer_config: Arc<CompactionDeveloperConfig>,
114    ) -> LevelCompactionPicker {
115        LevelCompactionPicker {
116            target_level,
117            config,
118            compaction_task_validator,
119            developer_config,
120        }
121    }
122
123    fn pick_base_trivial_move(
124        &self,
125        l0: &OverlappingLevel,
126        target_level: &Level,
127        level_handlers: &[LevelHandler],
128        stats: &mut LocalPickerStatistic,
129    ) -> Option<CompactionInput> {
130        if !self.developer_config.enable_trivial_move {
131            return None;
132        }
133
134        let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());
135        let trivial_move_picker = TrivialMovePicker::new(
136            0,
137            self.target_level,
138            overlap_strategy.clone(),
139            if self.compaction_task_validator.is_enable() {
140                // tips: Older versions of the compaction group will be upgraded without this configuration, we leave it with its default behaviour and enable it manually when needed.
141                self.config.sst_allowed_trivial_move_min_size.unwrap_or(0)
142            } else {
143                0
144            },
145            self.config
146                .sst_allowed_trivial_move_max_count
147                .unwrap_or(compaction_config::sst_allowed_trivial_move_max_count())
148                as usize,
149        );
150
151        trivial_move_picker.pick_trivial_move_task(
152            &l0.sub_levels[0].table_infos,
153            &target_level.table_infos,
154            level_handlers,
155            stats,
156        )
157    }
158
159    fn pick_multi_level_to_base(
160        &self,
161        l0: &OverlappingLevel,
162        target_level: &Level,
163        vnode_partition_count: u32,
164        level_handlers: &[LevelHandler],
165        stats: &mut LocalPickerStatistic,
166    ) -> Option<CompactionInput> {
167        let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());
168        let min_compaction_bytes = self.config.sub_level_max_compaction_bytes;
169        let non_overlap_sub_level_picker = NonOverlapSubLevelPicker::new(
170            min_compaction_bytes,
171            // divide by 2 because we need to select files of base level and it need use the other
172            // half quota.
173            std::cmp::max(
174                self.config.max_bytes_for_level_base,
175                self.config.max_compaction_bytes / 2,
176            ),
177            1,
178            // The maximum number of sub_level compact level per task
179            self.config.level0_max_compact_file_number,
180            overlap_strategy.clone(),
181            self.developer_config.enable_check_task_level_overlap,
182            self.config
183                .max_l0_compact_level_count
184                .unwrap_or(compaction_config::max_l0_compact_level_count()) as usize,
185            self.config
186                .enable_optimize_l0_interval_selection
187                .unwrap_or(compaction_config::enable_optimize_l0_interval_selection()),
188        );
189
190        let mut max_vnode_partition_idx = 0;
191        for (idx, level) in l0.sub_levels.iter().enumerate() {
192            if level.vnode_partition_count < vnode_partition_count {
193                break;
194            }
195            max_vnode_partition_idx = idx;
196        }
197
198        let l0_select_tables_vec = non_overlap_sub_level_picker.pick_l0_multi_non_overlap_level(
199            &l0.sub_levels[..=max_vnode_partition_idx],
200            &level_handlers[0],
201        );
202        if l0_select_tables_vec.is_empty() {
203            stats.skip_by_pending_files += 1;
204            return None;
205        }
206
207        let mut skip_by_pending = false;
208        let mut input_levels = vec![];
209
210        for input in l0_select_tables_vec {
211            let l0_select_tables = input
212                .sstable_infos
213                .iter()
214                .flat_map(|(_, select_tables)| select_tables.clone())
215                .collect_vec();
216
217            let target_level_ssts = overlap_strategy
218                .check_base_level_overlap(&l0_select_tables, &target_level.table_infos);
219
220            let mut target_level_size = 0;
221            let mut pending_compact = false;
222            for sst in &target_level_ssts {
223                if level_handlers[target_level.level_idx as usize].is_pending_compact(&sst.sst_id) {
224                    pending_compact = true;
225                    break;
226                }
227
228                target_level_size += sst.sst_size;
229            }
230
231            if pending_compact {
232                skip_by_pending = true;
233                continue;
234            }
235
236            input_levels.push((input, target_level_size, target_level_ssts));
237        }
238
239        if input_levels.is_empty() {
240            if skip_by_pending {
241                stats.skip_by_pending_files += 1;
242            }
243            return None;
244        }
245
246        for (input, target_file_size, target_level_files) in input_levels {
247            let mut select_level_inputs = input
248                .sstable_infos
249                .into_iter()
250                .map(|(_, table_infos)| InputLevel {
251                    level_idx: 0,
252                    level_type: LevelType::Nonoverlapping,
253                    table_infos,
254                })
255                .collect_vec();
256            select_level_inputs.reverse();
257            let target_file_count = target_level_files.len();
258            select_level_inputs.push(InputLevel {
259                level_idx: target_level.level_idx,
260                level_type: target_level.level_type,
261                table_infos: target_level_files,
262            });
263
264            let result = CompactionInput {
265                input_levels: select_level_inputs,
266                target_level: self.target_level,
267                select_input_size: input.total_file_size,
268                target_input_size: target_file_size,
269                total_file_count: (input.total_file_count + target_file_count) as u64,
270                vnode_partition_count,
271                ..Default::default()
272            };
273
274            if !self.compaction_task_validator.valid_compact_task(
275                &result,
276                ValidationRuleType::ToBase,
277                stats,
278            ) {
279                if l0.total_file_size > target_level.total_file_size * 8 {
280                    let log_counter = LOG_COUNTER.with_borrow_mut(|counter| {
281                        *counter += 1;
282                        *counter
283                    });
284
285                    // reduce log
286                    if log_counter.is_multiple_of(100) {
287                        tracing::warn!(
288                            "skip task with level count: {}, file count: {}, select size: {}, target size: {}, target level size: {}",
289                            result.input_levels.len(),
290                            result.total_file_count,
291                            result.select_input_size,
292                            result.target_input_size,
293                            target_level.total_file_size,
294                        );
295                    }
296                }
297                continue;
298            }
299
300            return Some(result);
301        }
302        None
303    }
304}
305
306#[cfg(test)]
307pub mod tests {
308
309    use super::*;
310    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
311    use crate::hummock::compaction::selector::tests::*;
312    use crate::hummock::compaction::{CompactionMode, TierCompactionPicker};
313
314    fn create_compaction_picker_for_test() -> LevelCompactionPicker {
315        let config = Arc::new(
316            CompactionConfigBuilder::new()
317                .level0_tier_compact_file_number(2)
318                .level0_sub_level_compact_level_count(1)
319                .build(),
320        );
321        LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()))
322    }
323
324    #[test]
325    fn test_compact_l0_to_l1() {
326        let mut picker = create_compaction_picker_for_test();
327        let l0 = generate_level(
328            0,
329            vec![
330                generate_table(5, 1, 100, 200, 2),
331                generate_table(4, 1, 201, 300, 2),
332            ],
333        );
334        let mut levels = Levels {
335            l0: OverlappingLevel {
336                total_file_size: l0.total_file_size,
337                uncompressed_file_size: l0.total_file_size,
338                sub_levels: vec![l0],
339            },
340            levels: vec![generate_level(
341                1,
342                vec![
343                    generate_table(3, 1, 1, 100, 1),
344                    generate_table(2, 1, 101, 150, 1),
345                    generate_table(1, 1, 201, 210, 1),
346                ],
347            )],
348            ..Default::default()
349        };
350        let mut local_stats = LocalPickerStatistic::default();
351        let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
352
353        let ret = picker
354            .pick_compaction(&levels, &levels_handler, &mut local_stats)
355            .unwrap();
356        assert_eq!(ret.input_levels[0].table_infos.len(), 1);
357        assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 4);
358        assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 1);
359
360        ret.add_pending_task(0, &mut levels_handler);
361        {
362            push_table_level0_nonoverlapping(&mut levels, generate_table(6, 1, 100, 200, 2));
363            push_table_level0_nonoverlapping(&mut levels, generate_table(7, 1, 301, 333, 4));
364            let ret2 = picker
365                .pick_compaction(&levels, &levels_handler, &mut local_stats)
366                .unwrap();
367
368            assert_eq!(ret2.input_levels[0].table_infos.len(), 1);
369            assert_eq!(ret2.input_levels[0].table_infos[0].sst_id, 6);
370            assert_eq!(ret2.input_levels[1].table_infos[0].sst_id, 5);
371        }
372
373        levels.l0.sub_levels[0]
374            .table_infos
375            .retain(|table| table.sst_id != 4);
376        levels.l0.total_file_size -= ret.input_levels[0].table_infos[0].file_size;
377
378        levels_handler[0].remove_task(0);
379        levels_handler[1].remove_task(0);
380
381        let ret = picker
382            .pick_compaction(&levels, &levels_handler, &mut local_stats)
383            .unwrap();
384        assert_eq!(ret.input_levels.len(), 3);
385        assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 6);
386        assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 5);
387        assert_eq!(ret.input_levels[2].table_infos.len(), 2);
388        assert_eq!(ret.input_levels[2].table_infos[0].sst_id, 3);
389        assert_eq!(ret.input_levels[2].table_infos[1].sst_id, 2);
390        ret.add_pending_task(1, &mut levels_handler);
391
392        let mut local_stats = LocalPickerStatistic::default();
393        // Cannot pick because no idle table in sub-level[0]. (And sub-level[0] is pending
394        // actually).
395        push_table_level0_overlapping(&mut levels, generate_table(8, 1, 199, 233, 3));
396        let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
397        assert!(ret.is_none());
398
399        // Don't pick overlapping sub-level 8
400        levels_handler[0].remove_task(1);
401        levels_handler[1].remove_task(1);
402        let ret = picker
403            .pick_compaction(&levels, &levels_handler, &mut local_stats)
404            .unwrap();
405        assert_eq!(ret.input_levels.len(), 3);
406        assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 6);
407        assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 5);
408        assert_eq!(ret.input_levels[2].table_infos.len(), 2);
409    }
410
411    #[test]
412    fn test_selecting_key_range_overlap() {
413        // When picking L0->L1, all L1 files overlapped with selecting_key_range should be picked.
414        let config = Arc::new(
415            CompactionConfigBuilder::new()
416                .level0_tier_compact_file_number(2)
417                .compaction_mode(CompactionMode::Range as i32)
418                .level0_sub_level_compact_level_count(1)
419                .enable_optimize_l0_interval_selection(Some(false))
420                .build(),
421        );
422
423        let config_enable_optimize_l0_interval_selection = Arc::new(
424            CompactionConfigBuilder::new()
425                .level0_tier_compact_file_number(2)
426                .compaction_mode(CompactionMode::Range as i32)
427                .level0_sub_level_compact_level_count(1)
428                .enable_optimize_l0_interval_selection(Some(true))
429                .build(),
430        );
431
432        let mut picker =
433            LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
434
435        let mut picker_enable_optimize_l0_interval_selection = LevelCompactionPicker::new(
436            1,
437            config_enable_optimize_l0_interval_selection,
438            Arc::new(CompactionDeveloperConfig::default()),
439        );
440
441        let levels = vec![Level {
442            level_idx: 1,
443            level_type: LevelType::Nonoverlapping,
444            table_infos: vec![
445                generate_table(3, 1, 0, 50, 1),
446                generate_table(4, 1, 150, 180, 1),
447                generate_table(5, 1, 250, 300, 1),
448            ],
449            ..Default::default()
450        }];
451        let mut levels = Levels {
452            levels,
453            l0: OverlappingLevel {
454                sub_levels: vec![],
455                total_file_size: 0,
456                uncompressed_file_size: 0,
457            },
458            ..Default::default()
459        };
460        push_tables_level0_nonoverlapping(&mut levels, vec![generate_table(1, 1, 50, 140, 2)]);
461        push_tables_level0_nonoverlapping(
462            &mut levels,
463            vec![
464                generate_table(7, 1, 200, 250, 2),
465                generate_table(8, 1, 400, 500, 2),
466            ],
467        );
468
469        {
470            let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
471
472            let mut local_stats = LocalPickerStatistic::default();
473            let ret = picker_enable_optimize_l0_interval_selection
474                .pick_compaction(&levels, &levels_handler, &mut local_stats)
475                .unwrap();
476
477            // pick
478            // l0 [sst_8]
479            assert_eq!(ret.input_levels.len(), 2);
480            assert_eq!(
481                ret.input_levels[0]
482                    .table_infos
483                    .iter()
484                    .map(|t| t.sst_id)
485                    .collect_vec(),
486                vec![8]
487            );
488            // trivial_move
489            assert!(ret.input_levels[1].table_infos.is_empty());
490
491            ret.add_pending_task(0, &mut levels_handler);
492
493            let ret = picker_enable_optimize_l0_interval_selection
494                .pick_compaction(&levels, &levels_handler, &mut local_stats)
495                .unwrap();
496
497            assert_eq!(ret.input_levels.len(), 2);
498            assert_eq!(
499                ret.input_levels[0]
500                    .table_infos
501                    .iter()
502                    .map(|t| t.sst_id)
503                    .collect_vec(),
504                vec![7]
505            );
506
507            assert_eq!(
508                ret.input_levels[1]
509                    .table_infos
510                    .iter()
511                    .map(|t| t.sst_id)
512                    .collect_vec(),
513                vec![5]
514            );
515        }
516
517        {
518            let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
519
520            let mut local_stats = LocalPickerStatistic::default();
521            let ret = picker
522                .pick_compaction(&levels, &levels_handler, &mut local_stats)
523                .unwrap();
524
525            // pick
526            assert_eq!(ret.input_levels.len(), 2);
527            assert_eq!(
528                ret.input_levels[0]
529                    .table_infos
530                    .iter()
531                    .map(|t| t.sst_id.inner())
532                    .collect_vec(),
533                vec![1]
534            );
535
536            assert_eq!(
537                ret.input_levels[1]
538                    .table_infos
539                    .iter()
540                    .map(|t| t.sst_id.inner())
541                    .collect_vec(),
542                vec![3]
543            );
544        }
545    }
546
547    #[test]
548    fn test_l0_to_l1_compact_conflict() {
549        // When picking L0->L1, L0's selecting_key_range should not be overlapped with L0's
550        // compacting_key_range.
551        let mut picker = create_compaction_picker_for_test();
552        let levels = vec![Level {
553            level_idx: 1,
554            level_type: LevelType::Nonoverlapping,
555            table_infos: vec![],
556            total_file_size: 0,
557            sub_level_id: 0,
558            uncompressed_file_size: 0,
559            ..Default::default()
560        }];
561        let mut levels = Levels {
562            levels,
563            l0: OverlappingLevel {
564                sub_levels: vec![],
565                total_file_size: 0,
566                uncompressed_file_size: 0,
567            },
568            ..Default::default()
569        };
570        push_tables_level0_nonoverlapping(
571            &mut levels,
572            vec![
573                generate_table(1, 1, 100, 300, 2),
574                generate_table(2, 1, 350, 500, 2),
575            ],
576        );
577        let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
578
579        let mut local_stats = LocalPickerStatistic::default();
580        let ret = picker
581            .pick_compaction(&levels, &levels_handler, &mut local_stats)
582            .unwrap();
583        // trivial_move
584        ret.add_pending_task(0, &mut levels_handler); // pending only for test
585        push_tables_level0_nonoverlapping(&mut levels, vec![generate_table(3, 1, 250, 300, 3)]);
586        let config: CompactionConfig = CompactionConfigBuilder::new()
587            .level0_tier_compact_file_number(2)
588            .max_compaction_bytes(1000)
589            .sub_level_max_compaction_bytes(150)
590            .max_bytes_for_level_multiplier(1)
591            .level0_sub_level_compact_level_count(3)
592            .build();
593        let mut picker = TierCompactionPicker::new(Arc::new(config));
594
595        let ret: Option<CompactionInput> =
596            picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
597        assert!(ret.is_none());
598    }
599
600    #[test]
601    fn test_skip_compact_write_amplification_limit() {
602        let config: CompactionConfig = CompactionConfigBuilder::new()
603            .level0_tier_compact_file_number(2)
604            .max_compaction_bytes(1000)
605            .sub_level_max_compaction_bytes(150)
606            .max_bytes_for_level_multiplier(1)
607            .level0_sub_level_compact_level_count(2)
608            .build();
609        let mut picker = LevelCompactionPicker::new(
610            1,
611            Arc::new(config),
612            Arc::new(CompactionDeveloperConfig::default()),
613        );
614
615        let mut levels = Levels {
616            levels: vec![Level {
617                level_idx: 1,
618                level_type: LevelType::Nonoverlapping,
619                table_infos: vec![
620                    generate_table(1, 1, 100, 399, 2),
621                    generate_table(2, 1, 400, 699, 2),
622                    generate_table(3, 1, 700, 999, 2),
623                ],
624                total_file_size: 900,
625                sub_level_id: 0,
626                uncompressed_file_size: 900,
627                ..Default::default()
628            }],
629            l0: generate_l0_nonoverlapping_sublevels(vec![]),
630            ..Default::default()
631        };
632        push_tables_level0_nonoverlapping(
633            &mut levels,
634            vec![
635                generate_table(4, 1, 100, 180, 2),
636                generate_table(5, 1, 400, 450, 2),
637                generate_table(6, 1, 600, 700, 2),
638            ],
639        );
640
641        let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
642        let mut local_stats = LocalPickerStatistic::default();
643        levels_handler[0].add_pending_task(1, 4, &levels.l0.sub_levels[0].table_infos);
644        let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
645        // Skip this compaction because the write amplification is too large.
646        assert!(ret.is_none());
647    }
648
649    #[test]
650    fn test_l0_to_l1_break_on_exceed_compaction_size() {
651        let mut local_stats = LocalPickerStatistic::default();
652        let mut l0 = generate_l0_overlapping_sublevels(vec![
653            vec![
654                generate_table(4, 1, 10, 90, 1),
655                generate_table(5, 1, 210, 220, 1),
656            ],
657            vec![generate_table(6, 1, 0, 100000, 1)],
658            vec![generate_table(7, 1, 0, 100000, 1)],
659        ]);
660        // We can set level_type only because the input above is valid.
661        for s in &mut l0.sub_levels {
662            s.level_type = LevelType::Nonoverlapping;
663        }
664        let levels = Levels {
665            l0,
666            levels: vec![generate_level(1, vec![generate_table(3, 1, 0, 100000, 1)])],
667            ..Default::default()
668        };
669        let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
670
671        // Pick with large max_compaction_bytes results all sub levels included in input.
672        let config = Arc::new(
673            CompactionConfigBuilder::new()
674                .max_compaction_bytes(500000)
675                .sub_level_max_compaction_bytes(50000)
676                .max_bytes_for_level_base(500000)
677                .level0_sub_level_compact_level_count(1)
678                .build(),
679        );
680        // Only include sub-level 0 results will violate MAX_WRITE_AMPLIFICATION.
681        // So all sub-levels are included to make write amplification < MAX_WRITE_AMPLIFICATION.
682        let mut picker =
683            LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
684        let ret = picker
685            .pick_compaction(&levels, &levels_handler, &mut local_stats)
686            .unwrap();
687        assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 7);
688        assert_eq!(
689            3,
690            ret.input_levels.iter().filter(|l| l.level_idx == 0).count()
691        );
692        assert_eq!(
693            4,
694            ret.input_levels
695                .iter()
696                .filter(|l| l.level_idx == 0)
697                .map(|l| l.table_infos.len())
698                .sum::<usize>()
699        );
700
701        // Pick with small max_compaction_bytes results partial sub levels included in input.
702        let config = Arc::new(
703            CompactionConfigBuilder::new()
704                .max_compaction_bytes(100010)
705                .max_bytes_for_level_base(512)
706                .level0_sub_level_compact_level_count(1)
707                .build(),
708        );
709        let mut picker =
710            LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
711
712        let ret = picker
713            .pick_compaction(&levels, &levels_handler, &mut local_stats)
714            .unwrap();
715        assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 6);
716        assert_eq!(
717            2,
718            ret.input_levels.iter().filter(|l| l.level_idx == 0).count()
719        );
720        assert_eq!(
721            3,
722            ret.input_levels
723                .iter()
724                .filter(|l| l.level_idx == 0)
725                .map(|l| l.table_infos.len())
726                .sum::<usize>()
727        );
728    }
729
730    #[test]
731    fn test_l0_to_l1_break_on_pending_sub_level() {
732        let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
733            vec![
734                generate_table(4, 1, 10, 90, 1),
735                generate_table(5, 1, 210, 220, 1),
736            ],
737            vec![generate_table(6, 1, 0, 100000, 1)],
738            vec![generate_table(7, 1, 0, 100000, 1)],
739        ]);
740
741        let levels = Levels {
742            l0,
743            levels: vec![generate_level(1, vec![generate_table(3, 1, 0, 100000, 1)])],
744            ..Default::default()
745        };
746        let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
747        let mut local_stats = LocalPickerStatistic::default();
748
749        // Create a pending sub-level.
750        let pending_level = levels.l0.sub_levels[1].clone();
751        assert_eq!(pending_level.sub_level_id, 1);
752        let tier_task_input = CompactionInput {
753            input_levels: vec![InputLevel {
754                level_idx: 0,
755                level_type: pending_level.level_type,
756                table_infos: pending_level.table_infos.clone(),
757            }],
758            target_level: 1,
759            target_sub_level_id: pending_level.sub_level_id,
760            ..Default::default()
761        };
762        assert!(!levels_handler[0].is_level_pending_compact(&pending_level));
763        tier_task_input.add_pending_task(1, &mut levels_handler);
764        assert!(levels_handler[0].is_level_pending_compact(&pending_level));
765
766        // Pick with large max_compaction_bytes results all sub levels included in input.
767        let config = Arc::new(
768            CompactionConfigBuilder::new()
769                .max_compaction_bytes(500000)
770                .level0_sub_level_compact_level_count(2)
771                .build(),
772        );
773
774        // Only include sub-level 0 results will violate MAX_WRITE_AMPLIFICATION.
775        // But stopped by pending sub-level when trying to include more sub-levels.
776        let mut picker = LevelCompactionPicker::new(
777            1,
778            config.clone(),
779            Arc::new(CompactionDeveloperConfig::default()),
780        );
781        let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
782        assert!(ret.is_none());
783
784        // Free the pending sub-level.
785        for pending_task_id in &levels_handler[0].pending_tasks_ids() {
786            levels_handler[0].remove_task(*pending_task_id);
787        }
788
789        // No more pending sub-level so we can get a task now.
790        let mut picker =
791            LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
792        picker
793            .pick_compaction(&levels, &levels_handler, &mut local_stats)
794            .unwrap();
795    }
796
797    #[test]
798    fn test_l0_to_base_when_all_base_pending() {
799        let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
800            vec![
801                generate_table(4, 1, 10, 90, 1),
802                generate_table(5, 1, 1000, 2000, 1),
803            ],
804            vec![generate_table(6, 1, 10, 90, 1)],
805        ]);
806
807        let levels = Levels {
808            l0,
809            levels: vec![generate_level(1, vec![generate_table(3, 1, 1, 100, 1)])],
810            ..Default::default()
811        };
812        let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
813        let mut local_stats = LocalPickerStatistic::default();
814
815        let config = Arc::new(
816            CompactionConfigBuilder::new()
817                .max_compaction_bytes(500000)
818                .level0_sub_level_compact_level_count(2)
819                .sub_level_max_compaction_bytes(1000)
820                .build(),
821        );
822
823        let mut picker =
824            LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
825        let ret = picker
826            .pick_compaction(&levels, &levels_handler, &mut local_stats)
827            .unwrap();
828        // 1. trivial_move
829        assert_eq!(2, ret.input_levels.len());
830        assert!(ret.input_levels[1].table_infos.is_empty());
831        assert_eq!(5, ret.input_levels[0].table_infos[0].sst_id);
832        ret.add_pending_task(0, &mut levels_handler);
833
834        let ret = picker
835            .pick_compaction(&levels, &levels_handler, &mut local_stats)
836            .unwrap();
837        assert_eq!(3, ret.input_levels.len());
838        assert_eq!(6, ret.input_levels[0].table_infos[0].sst_id);
839    }
840}