risingwave_meta/hummock/compaction/picker/
intra_compaction_picker.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_common::config::meta::default::compaction_config;
18use risingwave_hummock_sdk::level::{InputLevel, Levels, OverlappingLevel};
19use risingwave_pb::hummock::{CompactionConfig, LevelType};
20
21use super::{
22    CompactionInput, CompactionPicker, CompactionTaskValidator, LocalPickerStatistic,
23    ValidationRuleType,
24};
25use crate::hummock::compaction::picker::{NonOverlapSubLevelPicker, TrivialMovePicker};
26use crate::hummock::compaction::{CompactionDeveloperConfig, create_overlap_strategy};
27use crate::hummock::level_handler::LevelHandler;
28
29pub struct IntraCompactionPicker {
30    config: Arc<CompactionConfig>,
31    compaction_task_validator: Arc<CompactionTaskValidator>,
32
33    developer_config: Arc<CompactionDeveloperConfig>,
34}
35
36impl CompactionPicker for IntraCompactionPicker {
37    fn pick_compaction(
38        &mut self,
39        levels: &Levels,
40        level_handlers: &[LevelHandler],
41        stats: &mut LocalPickerStatistic,
42    ) -> Option<CompactionInput> {
43        let l0 = &levels.l0;
44        if l0.sub_levels.is_empty() {
45            return None;
46        }
47
48        if let Some(ret) = self.pick_l0_trivial_move_file(l0, level_handlers, stats) {
49            return Some(ret);
50        }
51
52        let vnode_partition_count = self.config.split_weight_by_vnode;
53
54        if let Some(ret) =
55            self.pick_whole_level(l0, &level_handlers[0], vnode_partition_count, stats)
56        {
57            if ret.input_levels.len() < 2 {
58                tracing::error!(
59                    ?ret,
60                    vnode_partition_count,
61                    "pick_whole_level failed to pick enough levels"
62                );
63                return None;
64            }
65
66            return Some(ret);
67        }
68
69        if let Some(ret) = self.pick_l0_intra(l0, &level_handlers[0], vnode_partition_count, stats)
70        {
71            if ret.input_levels.len() < 2 {
72                tracing::error!(
73                    ?ret,
74                    vnode_partition_count,
75                    "pick_l0_intra failed to pick enough levels"
76                );
77                return None;
78            }
79
80            return Some(ret);
81        }
82
83        None
84    }
85}
86
87impl IntraCompactionPicker {
88    #[cfg(test)]
89    pub fn for_test(
90        config: Arc<CompactionConfig>,
91        developer_config: Arc<CompactionDeveloperConfig>,
92    ) -> IntraCompactionPicker {
93        IntraCompactionPicker {
94            compaction_task_validator: Arc::new(CompactionTaskValidator::new(config.clone())),
95            config,
96            developer_config,
97        }
98    }
99
100    pub fn new_with_validator(
101        config: Arc<CompactionConfig>,
102        compaction_task_validator: Arc<CompactionTaskValidator>,
103        developer_config: Arc<CompactionDeveloperConfig>,
104    ) -> IntraCompactionPicker {
105        assert!(config.level0_sub_level_compact_level_count > 1);
106        IntraCompactionPicker {
107            config,
108            compaction_task_validator,
109            developer_config,
110        }
111    }
112
113    fn pick_whole_level(
114        &self,
115        l0: &OverlappingLevel,
116        level_handler: &LevelHandler,
117        partition_count: u32,
118        stats: &mut LocalPickerStatistic,
119    ) -> Option<CompactionInput> {
120        let picker = WholeLevelCompactionPicker::new(
121            self.config.clone(),
122            self.compaction_task_validator.clone(),
123        );
124        picker.pick_whole_level(l0, level_handler, partition_count, stats)
125    }
126
127    fn pick_l0_intra(
128        &self,
129        l0: &OverlappingLevel,
130        level_handler: &LevelHandler,
131        vnode_partition_count: u32,
132        stats: &mut LocalPickerStatistic,
133    ) -> Option<CompactionInput> {
134        let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());
135        let mut max_vnode_partition_idx = 0;
136        for (idx, level) in l0.sub_levels.iter().enumerate() {
137            if level.vnode_partition_count < vnode_partition_count {
138                break;
139            }
140            max_vnode_partition_idx = idx;
141        }
142
143        for (idx, level) in l0.sub_levels.iter().enumerate() {
144            if level.level_type != LevelType::Nonoverlapping
145                || level.total_file_size > self.config.sub_level_max_compaction_bytes
146            {
147                continue;
148            }
149
150            if idx > max_vnode_partition_idx {
151                break;
152            }
153
154            if level_handler.is_level_all_pending_compact(level) {
155                continue;
156            }
157
158            let max_compaction_bytes = std::cmp::min(
159                self.config.max_compaction_bytes,
160                self.config.sub_level_max_compaction_bytes,
161            );
162
163            let non_overlap_sub_level_picker = NonOverlapSubLevelPicker::new(
164                self.config.sub_level_max_compaction_bytes / 2,
165                max_compaction_bytes,
166                self.config.level0_sub_level_compact_level_count as usize,
167                self.config.level0_max_compact_file_number,
168                overlap_strategy.clone(),
169                self.developer_config.enable_check_task_level_overlap,
170                self.config
171                    .max_l0_compact_level_count
172                    .unwrap_or(compaction_config::max_l0_compact_level_count())
173                    as usize,
174                self.config
175                    .enable_optimize_l0_interval_selection
176                    .unwrap_or(compaction_config::enable_optimize_l0_interval_selection()),
177            );
178
179            let candidate_l0_plans = non_overlap_sub_level_picker.pick_l0_multi_non_overlap_level(
180                &l0.sub_levels[idx..=max_vnode_partition_idx],
181                level_handler,
182            );
183
184            if candidate_l0_plans.is_empty() {
185                continue;
186            }
187
188            for input in candidate_l0_plans {
189                let mut task_input_size = 0;
190                let mut task_file_count = 0;
191                let mut select_level_inputs = Vec::with_capacity(input.sstable_infos.len());
192                let mut target_sub_level_id = None;
193                for (sub_level_id, level_select_sst) in input.sstable_infos {
194                    if level_select_sst.is_empty() {
195                        continue;
196                    }
197
198                    if target_sub_level_id.is_none() {
199                        target_sub_level_id = Some(sub_level_id);
200                    }
201
202                    task_input_size += level_select_sst.iter().map(|sst| sst.sst_size).sum::<u64>();
203                    task_file_count += level_select_sst.len();
204
205                    select_level_inputs.push(InputLevel {
206                        level_idx: 0,
207                        level_type: LevelType::Nonoverlapping,
208                        table_infos: level_select_sst,
209                    });
210                }
211                select_level_inputs.reverse();
212
213                let result = CompactionInput {
214                    input_levels: select_level_inputs,
215                    target_sub_level_id: target_sub_level_id.unwrap(),
216                    select_input_size: task_input_size,
217                    total_file_count: task_file_count as u64,
218                    ..Default::default()
219                };
220
221                if !self.compaction_task_validator.valid_compact_task(
222                    &result,
223                    ValidationRuleType::Intra,
224                    stats,
225                ) {
226                    continue;
227                }
228
229                return Some(result);
230            }
231        }
232
233        None
234    }
235
236    fn pick_l0_trivial_move_file(
237        &self,
238        l0: &OverlappingLevel,
239        level_handlers: &[LevelHandler],
240        stats: &mut LocalPickerStatistic,
241    ) -> Option<CompactionInput> {
242        if !self.developer_config.enable_trivial_move {
243            return None;
244        }
245
246        let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());
247
248        for (idx, level) in l0.sub_levels.iter().enumerate() {
249            if level.level_type == LevelType::Overlapping || idx + 1 >= l0.sub_levels.len() {
250                continue;
251            }
252
253            if l0.sub_levels[idx + 1].level_type == LevelType::Overlapping {
254                continue;
255            }
256
257            if level_handlers[0].is_level_pending_compact(level) {
258                continue;
259            }
260
261            if l0.sub_levels[idx + 1].vnode_partition_count
262                != l0.sub_levels[idx].vnode_partition_count
263            {
264                continue;
265            }
266
267            let trivial_move_picker = TrivialMovePicker::new(
268                0,
269                0,
270                overlap_strategy.clone(),
271                0,
272                self.config
273                    .sst_allowed_trivial_move_max_count
274                    .unwrap_or(compaction_config::sst_allowed_trivial_move_max_count())
275                    as usize,
276            );
277
278            if let Some(select_ssts) = trivial_move_picker.pick_multi_trivial_move_ssts(
279                &l0.sub_levels[idx + 1].table_infos,
280                &level.table_infos,
281                level_handlers,
282                stats,
283            ) {
284                let mut overlap = overlap_strategy.create_overlap_info();
285                select_ssts
286                    .iter()
287                    .for_each(|sst| overlap.update(&sst.key_range));
288
289                assert!(
290                    overlap
291                        .check_multiple_overlap(&l0.sub_levels[idx].table_infos)
292                        .is_empty()
293                );
294
295                let select_input_size = select_ssts.iter().map(|sst| sst.sst_size).sum();
296                let total_file_count = select_ssts.len() as u64;
297                let input_levels = vec![
298                    InputLevel {
299                        level_idx: 0,
300                        level_type: LevelType::Nonoverlapping,
301                        table_infos: select_ssts,
302                    },
303                    InputLevel {
304                        level_idx: 0,
305                        level_type: LevelType::Nonoverlapping,
306                        table_infos: vec![],
307                    },
308                ];
309                return Some(CompactionInput {
310                    input_levels,
311                    target_level: 0,
312                    target_sub_level_id: level.sub_level_id,
313                    select_input_size,
314                    total_file_count,
315                    ..Default::default()
316                });
317            }
318        }
319        None
320    }
321}
322
323pub struct WholeLevelCompactionPicker {
324    config: Arc<CompactionConfig>,
325    compaction_task_validator: Arc<CompactionTaskValidator>,
326}
327
328impl WholeLevelCompactionPicker {
329    pub fn new(
330        config: Arc<CompactionConfig>,
331        compaction_task_validator: Arc<CompactionTaskValidator>,
332    ) -> Self {
333        Self {
334            config,
335            compaction_task_validator,
336        }
337    }
338
339    pub fn pick_whole_level(
340        &self,
341        l0: &OverlappingLevel,
342        level_handler: &LevelHandler,
343        partition_count: u32,
344        stats: &mut LocalPickerStatistic,
345    ) -> Option<CompactionInput> {
346        if partition_count == 0 {
347            return None;
348        }
349        for (idx, level) in l0.sub_levels.iter().enumerate() {
350            if level.level_type != LevelType::Nonoverlapping
351                || level.vnode_partition_count == partition_count
352            {
353                continue;
354            }
355
356            let max_compaction_bytes = std::cmp::max(
357                self.config.max_bytes_for_level_base,
358                self.config.sub_level_max_compaction_bytes
359                    * (self.config.level0_sub_level_compact_level_count as u64),
360            );
361
362            let mut select_input_size = 0;
363
364            let mut select_level_inputs = vec![];
365            let mut total_file_count = 0;
366            let mut wait_enough = false;
367            for next_level in l0.sub_levels.iter().skip(idx) {
368                if (select_input_size > max_compaction_bytes
369                    || total_file_count > self.config.level0_max_compact_file_number
370                    || next_level.vnode_partition_count == partition_count)
371                    && select_level_inputs.len() > 1
372                {
373                    wait_enough = true;
374                    break;
375                }
376
377                if level_handler.is_level_pending_compact(next_level) {
378                    break;
379                }
380
381                select_input_size += next_level.total_file_size;
382                total_file_count += next_level.table_infos.len() as u64;
383
384                select_level_inputs.push(InputLevel {
385                    level_idx: 0,
386                    level_type: next_level.level_type,
387                    table_infos: next_level.table_infos.clone(),
388                });
389            }
390            if select_level_inputs.len() > 1 {
391                let vnode_partition_count =
392                    if select_input_size > self.config.sub_level_max_compaction_bytes / 2 {
393                        partition_count
394                    } else {
395                        0
396                    };
397                let result = CompactionInput {
398                    input_levels: select_level_inputs,
399                    target_sub_level_id: level.sub_level_id,
400                    select_input_size,
401                    total_file_count,
402                    vnode_partition_count,
403                    ..Default::default()
404                };
405                if wait_enough
406                    || self.compaction_task_validator.valid_compact_task(
407                        &result,
408                        ValidationRuleType::Intra,
409                        stats,
410                    )
411                {
412                    return Some(result);
413                }
414            }
415        }
416
417        None
418    }
419}
420
421#[cfg(test)]
422pub mod tests {
423    use risingwave_hummock_sdk::level::Level;
424
425    use super::*;
426    use crate::hummock::compaction::TierCompactionPicker;
427    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
428    use crate::hummock::compaction::selector::tests::{
429        generate_l0_nonoverlapping_multi_sublevels, generate_l0_nonoverlapping_sublevels,
430        generate_l0_overlapping_sublevels, generate_level, generate_table,
431        push_table_level0_overlapping, push_tables_level0_nonoverlapping,
432    };
433
434    fn create_compaction_picker_for_test() -> IntraCompactionPicker {
435        let config = Arc::new(
436            CompactionConfigBuilder::new()
437                .level0_tier_compact_file_number(2)
438                .level0_sub_level_compact_level_count(1)
439                .build(),
440        );
441        IntraCompactionPicker::for_test(config, Arc::new(CompactionDeveloperConfig::default()))
442    }
443
444    #[test]
445    fn test_l0_to_l1_compact_conflict() {
446        // When picking L0->L1, L0's selecting_key_range should not be overlapped with L0's
447        // compacting_key_range.
448        let levels = vec![Level {
449            level_idx: 1,
450            level_type: LevelType::Nonoverlapping,
451            table_infos: vec![],
452            ..Default::default()
453        }];
454        let mut levels = Levels {
455            levels,
456            l0: OverlappingLevel {
457                sub_levels: vec![],
458                total_file_size: 0,
459                uncompressed_file_size: 0,
460            },
461            ..Default::default()
462        };
463        push_tables_level0_nonoverlapping(
464            &mut levels,
465            vec![
466                generate_table(1, 1, 100, 300, 2),
467                generate_table(2, 1, 350, 500, 2),
468            ],
469        );
470        let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
471
472        let mut local_stats = LocalPickerStatistic::default();
473        push_tables_level0_nonoverlapping(&mut levels, vec![generate_table(3, 1, 250, 300, 3)]);
474        let config: CompactionConfig = CompactionConfigBuilder::new()
475            .level0_tier_compact_file_number(2)
476            .max_compaction_bytes(1000)
477            .sub_level_max_compaction_bytes(150)
478            .max_bytes_for_level_multiplier(1)
479            .level0_sub_level_compact_level_count(3)
480            .build();
481        let mut picker = TierCompactionPicker::new(Arc::new(config));
482
483        let ret: Option<CompactionInput> =
484            picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
485        assert!(ret.is_none());
486    }
487
488    #[test]
489    fn test_compacting_key_range_overlap_intra_l0() {
490        // When picking L0->L0, L0's selecting_key_range should not be overlapped with L0's
491        // compacting_key_range.
492        let mut picker = create_compaction_picker_for_test();
493
494        let mut levels = Levels {
495            levels: vec![Level {
496                level_idx: 1,
497                level_type: LevelType::Nonoverlapping,
498                table_infos: vec![generate_table(3, 1, 200, 300, 2)],
499                ..Default::default()
500            }],
501            l0: generate_l0_nonoverlapping_sublevels(vec![
502                generate_table(1, 1, 100, 210, 2),
503                generate_table(2, 1, 200, 250, 2),
504            ]),
505            ..Default::default()
506        };
507        let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
508
509        let mut local_stats = LocalPickerStatistic::default();
510        let ret = picker
511            .pick_compaction(&levels, &levels_handler, &mut local_stats)
512            .unwrap();
513        ret.add_pending_task(0, &mut levels_handler);
514
515        push_table_level0_overlapping(&mut levels, generate_table(4, 1, 170, 180, 3));
516        assert!(
517            picker
518                .pick_compaction(&levels, &levels_handler, &mut local_stats)
519                .is_none()
520        );
521    }
522
523    #[test]
524    fn test_pick_l0_intra() {
525        {
526            let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
527                vec![
528                    generate_table(6, 1, 50, 99, 1),
529                    generate_table(1, 1, 100, 200, 1),
530                    generate_table(2, 1, 250, 300, 1),
531                ],
532                vec![
533                    generate_table(3, 1, 10, 90, 1),
534                    generate_table(6, 1, 100, 110, 1),
535                ],
536                vec![
537                    generate_table(4, 1, 50, 99, 1),
538                    generate_table(5, 1, 100, 200, 1),
539                ],
540            ]);
541            let levels = Levels {
542                l0,
543                levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
544                ..Default::default()
545            };
546            let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
547            levels_handler[1].add_pending_task(100, 1, &levels.levels[0].table_infos);
548            let config = Arc::new(
549                CompactionConfigBuilder::new()
550                    .level0_sub_level_compact_level_count(1)
551                    .level0_overlapping_sub_level_compact_level_count(4)
552                    .build(),
553            );
554            let mut picker = IntraCompactionPicker::for_test(
555                config,
556                Arc::new(CompactionDeveloperConfig::default()),
557            );
558            let mut local_stats = LocalPickerStatistic::default();
559            let ret = picker
560                .pick_compaction(&levels, &levels_handler, &mut local_stats)
561                .unwrap();
562            ret.add_pending_task(1, &mut levels_handler);
563            assert_eq!(
564                ret.input_levels
565                    .iter()
566                    .map(|i| i.table_infos.len())
567                    .sum::<usize>(),
568                3
569            );
570        }
571
572        {
573            // Suppose keyguard [100, 200] [300, 400]
574            // will pick sst [1, 3, 4]
575            let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
576                vec![
577                    generate_table(1, 1, 100, 200, 1),
578                    generate_table(2, 1, 300, 400, 1),
579                ],
580                vec![
581                    generate_table(3, 1, 100, 200, 1),
582                    generate_table(6, 1, 300, 500, 1),
583                ],
584                vec![
585                    generate_table(4, 1, 100, 200, 1),
586                    generate_table(5, 1, 300, 400, 1),
587                ],
588            ]);
589            let levels = Levels {
590                l0,
591                levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
592                ..Default::default()
593            };
594            let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
595            levels_handler[1].add_pending_task(100, 1, &levels.levels[0].table_infos);
596            let config = Arc::new(
597                CompactionConfigBuilder::new()
598                    .level0_sub_level_compact_level_count(1)
599                    .sub_level_max_compaction_bytes(300)
600                    .build(),
601            );
602            let mut picker = IntraCompactionPicker::for_test(
603                config,
604                Arc::new(CompactionDeveloperConfig::default()),
605            );
606            let mut local_stats = LocalPickerStatistic::default();
607            let ret = picker
608                .pick_compaction(&levels, &levels_handler, &mut local_stats)
609                .unwrap();
610            ret.add_pending_task(1, &mut levels_handler);
611            assert_eq!(
612                ret.input_levels
613                    .iter()
614                    .map(|i| i.table_infos.len())
615                    .sum::<usize>(),
616                3
617            );
618
619            assert_eq!(4, ret.input_levels[0].table_infos[0].sst_id);
620            assert_eq!(3, ret.input_levels[1].table_infos[0].sst_id);
621            assert_eq!(1, ret.input_levels[2].table_infos[0].sst_id);
622
623            // will pick sst [2, 6]
624            let ret2 = picker
625                .pick_compaction(&levels, &levels_handler, &mut local_stats)
626                .unwrap();
627
628            assert_eq!(
629                ret2.input_levels
630                    .iter()
631                    .map(|i| i.table_infos.len())
632                    .sum::<usize>(),
633                2
634            );
635
636            assert_eq!(6, ret2.input_levels[0].table_infos[0].sst_id);
637            assert_eq!(2, ret2.input_levels[1].table_infos[0].sst_id);
638        }
639
640        {
641            let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
642                vec![
643                    generate_table(1, 1, 100, 149, 1),
644                    generate_table(6, 1, 150, 199, 1),
645                    generate_table(7, 1, 200, 250, 1),
646                    generate_table(2, 1, 300, 400, 1),
647                ],
648                vec![
649                    generate_table(3, 1, 100, 149, 1),
650                    generate_table(8, 1, 150, 199, 1),
651                    generate_table(9, 1, 200, 250, 1),
652                    generate_table(10, 1, 300, 400, 1),
653                ],
654                vec![
655                    generate_table(4, 1, 100, 199, 1),
656                    generate_table(11, 1, 200, 250, 1),
657                    generate_table(5, 1, 300, 350, 1),
658                ],
659            ]);
660            let levels = Levels {
661                l0,
662                levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
663                ..Default::default()
664            };
665            let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
666            levels_handler[1].add_pending_task(100, 1, &levels.levels[0].table_infos);
667            let config = Arc::new(
668                CompactionConfigBuilder::new()
669                    .level0_sub_level_compact_level_count(1)
670                    .sub_level_max_compaction_bytes(300)
671                    .build(),
672            );
673            let mut picker = IntraCompactionPicker::for_test(
674                config,
675                Arc::new(CompactionDeveloperConfig::default()),
676            );
677            let mut local_stats = LocalPickerStatistic::default();
678            let ret = picker
679                .pick_compaction(&levels, &levels_handler, &mut local_stats)
680                .unwrap();
681            ret.add_pending_task(1, &mut levels_handler);
682            assert_eq!(
683                ret.input_levels
684                    .iter()
685                    .map(|i| i.table_infos.len())
686                    .sum::<usize>(),
687                3
688            );
689
690            assert_eq!(11, ret.input_levels[0].table_infos[0].sst_id);
691            assert_eq!(9, ret.input_levels[1].table_infos[0].sst_id);
692            assert_eq!(7, ret.input_levels[2].table_infos[0].sst_id);
693
694            let ret2 = picker
695                .pick_compaction(&levels, &levels_handler, &mut local_stats)
696                .unwrap();
697
698            assert_eq!(
699                ret2.input_levels
700                    .iter()
701                    .map(|i| i.table_infos.len())
702                    .sum::<usize>(),
703                3
704            );
705
706            assert_eq!(5, ret2.input_levels[0].table_infos[0].sst_id);
707            assert_eq!(10, ret2.input_levels[1].table_infos[0].sst_id);
708            assert_eq!(2, ret2.input_levels[2].table_infos[0].sst_id);
709        }
710
711        {
712            // Two candidates exist; ensure size/count are not accumulated across candidates.
713            let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
714                vec![
715                    generate_table(1, 1, 0, 50, 1),    // seed 1
716                    generate_table(2, 1, 100, 150, 1), // seed 2
717                ],
718                vec![generate_table(3, 1, 0, 150, 1)], // overlaps both seeds
719            ]);
720            let levels = Levels {
721                l0,
722                levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
723                ..Default::default()
724            };
725            let levels_handler = [LevelHandler::new(0), LevelHandler::new(1)];
726            let config = Arc::new(
727                CompactionConfigBuilder::new()
728                    .level0_sub_level_compact_level_count(2)
729                    .level0_max_compact_file_number(2)
730                    .sub_level_max_compaction_bytes(500)
731                    .max_compaction_bytes(300) // single candidate fits; accumulated would exceed
732                    .build(),
733            );
734            let picker = IntraCompactionPicker::for_test(
735                config,
736                Arc::new(CompactionDeveloperConfig::default()),
737            );
738            let mut local_stats = LocalPickerStatistic::default();
739            let ret = picker
740                .pick_l0_intra(
741                    &levels.l0,
742                    &levels_handler[0],
743                    levels.l0.sub_levels[0].vnode_partition_count,
744                    &mut local_stats,
745                )
746                .unwrap();
747
748            // Ensure we can pick a candidate and the returned size/count matches the tables chosen.
749            assert_eq!(ret.input_levels.len(), 2);
750
751            let actual_file_count: usize = ret
752                .input_levels
753                .iter()
754                .map(|lvl| lvl.table_infos.len())
755                .sum();
756            assert_eq!(ret.total_file_count as usize, actual_file_count);
757
758            let expect_size: u64 = ret
759                .input_levels
760                .iter()
761                .flat_map(|lvl| lvl.table_infos.iter())
762                .map(|sst| sst.sst_size)
763                .sum();
764            assert_eq!(ret.select_input_size, expect_size);
765        }
766    }
767
768    fn is_l0_trivial_move(compaction_input: &CompactionInput) -> bool {
769        compaction_input.input_levels.len() == 2
770            && !compaction_input.input_levels[0].table_infos.is_empty()
771            && compaction_input.input_levels[1].table_infos.is_empty()
772    }
773
774    #[test]
775    fn test_trivial_move() {
776        let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
777        let config = Arc::new(
778            CompactionConfigBuilder::new()
779                .level0_tier_compact_file_number(2)
780                .target_file_size_base(30)
781                .level0_sub_level_compact_level_count(20) // reject intra
782                .build(),
783        );
784        let mut picker =
785            IntraCompactionPicker::for_test(config, Arc::new(CompactionDeveloperConfig::default()));
786
787        // Cannot trivial move because there is only 1 sub-level.
788        let l0 = generate_l0_overlapping_sublevels(vec![vec![
789            generate_table(1, 1, 100, 110, 1),
790            generate_table(2, 1, 150, 250, 1),
791        ]]);
792        let levels = Levels {
793            l0,
794            levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
795            ..Default::default()
796        };
797        levels_handler[1].add_pending_task(100, 1, &levels.levels[0].table_infos);
798        let mut local_stats = LocalPickerStatistic::default();
799        let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
800        assert!(ret.is_none());
801
802        // Cannot trivial move because sub-levels are overlapping
803        let l0: OverlappingLevel = generate_l0_overlapping_sublevels(vec![
804            vec![
805                generate_table(1, 1, 100, 110, 1),
806                generate_table(2, 1, 150, 250, 1),
807            ],
808            vec![generate_table(3, 1, 10, 90, 1)],
809            vec![generate_table(4, 1, 10, 90, 1)],
810            vec![generate_table(5, 1, 10, 90, 1)],
811        ]);
812        let mut levels = Levels {
813            l0,
814            levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
815            ..Default::default()
816        };
817        assert!(
818            picker
819                .pick_compaction(&levels, &levels_handler, &mut local_stats)
820                .is_none()
821        );
822
823        // Cannot trivial move because latter sub-level is overlapping
824        levels.l0.sub_levels[0].level_type = LevelType::Nonoverlapping;
825        levels.l0.sub_levels[1].level_type = LevelType::Overlapping;
826        let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
827        assert!(ret.is_none());
828
829        // Cannot trivial move because former sub-level is overlapping
830        levels.l0.sub_levels[0].level_type = LevelType::Overlapping;
831        levels.l0.sub_levels[1].level_type = LevelType::Nonoverlapping;
832        let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
833        assert!(ret.is_none());
834
835        // trivial move
836        levels.l0.sub_levels[0].level_type = LevelType::Nonoverlapping;
837        levels.l0.sub_levels[1].level_type = LevelType::Nonoverlapping;
838        let ret = picker
839            .pick_compaction(&levels, &levels_handler, &mut local_stats)
840            .unwrap();
841        assert!(is_l0_trivial_move(&ret));
842        assert_eq!(ret.input_levels[0].table_infos.len(), 1);
843    }
844    #[test]
845    fn test_pick_whole_level() {
846        let config = Arc::new(
847            CompactionConfigBuilder::new()
848                .level0_max_compact_file_number(20)
849                .build(),
850        );
851        let mut table_infos = vec![];
852        for epoch in 1..3 {
853            let base = epoch * 100;
854            let mut ssts = vec![];
855            for i in 1..50 {
856                let left = (i as usize) * 100;
857                let right = left + 100;
858                ssts.push(generate_table(base + i, 1, left, right, epoch));
859            }
860            table_infos.push(ssts);
861        }
862
863        let l0 = generate_l0_nonoverlapping_multi_sublevels(table_infos);
864        let compaction_task_validator = Arc::new(CompactionTaskValidator::new(config.clone()));
865        let picker = WholeLevelCompactionPicker::new(config, compaction_task_validator);
866        let level_handler = LevelHandler::new(0);
867        let ret = picker
868            .pick_whole_level(&l0, &level_handler, 4, &mut LocalPickerStatistic::default())
869            .unwrap();
870        assert_eq!(ret.input_levels.len(), 2);
871    }
872
873    #[test]
874    fn test_priority() {
875        let config = Arc::new(
876            CompactionConfigBuilder::new()
877                .level0_max_compact_file_number(20)
878                .sub_level_max_compaction_bytes(1)
879                .level0_sub_level_compact_level_count(2)
880                .build(),
881        );
882        let mut table_infos = vec![];
883        for epoch in 1..3 {
884            let base = epoch * 100;
885            let mut ssts = vec![];
886            for i in 1..50 {
887                let left = (i as usize) * 100;
888                let right = left + 100;
889                ssts.push(generate_table(base + i, 1, left, right, epoch));
890            }
891            table_infos.push(ssts);
892        }
893
894        let mut l0 = generate_l0_nonoverlapping_multi_sublevels(table_infos);
895        // trivial-move
896        l0.sub_levels[1]
897            .table_infos
898            .push(generate_table(9999, 900000000, 0, 100, 1));
899
900        l0.sub_levels[0].total_file_size = 1;
901        l0.sub_levels[1].total_file_size = 1;
902
903        let mut picker = IntraCompactionPicker::new_with_validator(
904            config,
905            Arc::new(CompactionTaskValidator::unused()),
906            Arc::new(CompactionDeveloperConfig::default()),
907        );
908        let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
909        let mut local_stats = LocalPickerStatistic::default();
910
911        let levels = Levels {
912            l0,
913            levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
914            ..Default::default()
915        };
916
917        let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
918        assert!(is_l0_trivial_move(ret.as_ref().unwrap()));
919        ret.as_ref()
920            .unwrap()
921            .add_pending_task(1, &mut levels_handler);
922        let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
923        assert!(ret.is_some());
924        let input = ret.as_ref().unwrap();
925        assert_eq!(input.input_levels.len(), 2);
926        assert_ne!(
927            levels.l0.sub_levels[0].table_infos.len(),
928            input.input_levels[0].table_infos.len()
929        );
930        assert_ne!(
931            levels.l0.sub_levels[1].table_infos.len(),
932            input.input_levels[1].table_infos.len()
933        );
934    }
935}