1use std::cell::RefCell;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::config::meta::default::compaction_config;
20use risingwave_hummock_sdk::level::{InputLevel, Level, Levels, OverlappingLevel};
21use risingwave_pb::hummock::{CompactionConfig, LevelType};
22
23use super::min_overlap_compaction_picker::NonOverlapSubLevelPicker;
24use super::{
25 CompactionInput, CompactionPicker, CompactionTaskValidator, LocalPickerStatistic,
26 ValidationRuleType,
27};
28use crate::hummock::compaction::picker::TrivialMovePicker;
29use crate::hummock::compaction::{CompactionDeveloperConfig, create_overlap_strategy};
30use crate::hummock::level_handler::LevelHandler;
31
32std::thread_local! {
33 static LOG_COUNTER: RefCell<usize> = const { RefCell::new(0) };
34}
35
36pub struct LevelCompactionPicker {
37 target_level: usize,
38 config: Arc<CompactionConfig>,
39 compaction_task_validator: Arc<CompactionTaskValidator>,
40 developer_config: Arc<CompactionDeveloperConfig>,
41}
42
43impl CompactionPicker for LevelCompactionPicker {
44 fn pick_compaction(
45 &mut self,
46 levels: &Levels,
47 level_handlers: &[LevelHandler],
48 stats: &mut LocalPickerStatistic,
49 ) -> Option<CompactionInput> {
50 let l0 = &levels.l0;
51 if l0.sub_levels.is_empty() {
52 return None;
53 }
54 if l0.sub_levels[0].level_type != LevelType::Nonoverlapping
55 && l0.sub_levels[0].table_infos.len() > 1
56 {
57 stats.skip_by_overlapping += 1;
58 return None;
59 }
60
61 let is_l0_pending_compact =
62 level_handlers[0].is_level_all_pending_compact(&l0.sub_levels[0]);
63
64 if is_l0_pending_compact {
65 stats.skip_by_pending_files += 1;
66 return None;
67 }
68
69 if let Some(mut ret) = self.pick_base_trivial_move(
70 l0,
71 levels.get_level(self.target_level),
72 level_handlers,
73 stats,
74 ) {
75 ret.vnode_partition_count = self.config.split_weight_by_vnode;
76 return Some(ret);
77 }
78
79 debug_assert!(self.target_level == levels.get_level(self.target_level).level_idx as usize);
80 if let Some(ret) = self.pick_multi_level_to_base(
81 l0,
82 levels.get_level(self.target_level),
83 self.config.split_weight_by_vnode,
84 level_handlers,
85 stats,
86 ) {
87 return Some(ret);
88 }
89
90 None
91 }
92}
93
94impl LevelCompactionPicker {
95 #[cfg(test)]
96 pub fn new(
97 target_level: usize,
98 config: Arc<CompactionConfig>,
99 developer_config: Arc<CompactionDeveloperConfig>,
100 ) -> LevelCompactionPicker {
101 LevelCompactionPicker {
102 target_level,
103 compaction_task_validator: Arc::new(CompactionTaskValidator::new(config.clone())),
104 config,
105 developer_config,
106 }
107 }
108
109 pub fn new_with_validator(
110 target_level: usize,
111 config: Arc<CompactionConfig>,
112 compaction_task_validator: Arc<CompactionTaskValidator>,
113 developer_config: Arc<CompactionDeveloperConfig>,
114 ) -> LevelCompactionPicker {
115 LevelCompactionPicker {
116 target_level,
117 config,
118 compaction_task_validator,
119 developer_config,
120 }
121 }
122
123 fn pick_base_trivial_move(
124 &self,
125 l0: &OverlappingLevel,
126 target_level: &Level,
127 level_handlers: &[LevelHandler],
128 stats: &mut LocalPickerStatistic,
129 ) -> Option<CompactionInput> {
130 if !self.developer_config.enable_trivial_move {
131 return None;
132 }
133
134 let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());
135 let trivial_move_picker = TrivialMovePicker::new(
136 0,
137 self.target_level,
138 overlap_strategy.clone(),
139 if self.compaction_task_validator.is_enable() {
140 self.config.sst_allowed_trivial_move_min_size.unwrap_or(0)
142 } else {
143 0
144 },
145 self.config
146 .sst_allowed_trivial_move_max_count
147 .unwrap_or(compaction_config::sst_allowed_trivial_move_max_count())
148 as usize,
149 );
150
151 trivial_move_picker.pick_trivial_move_task(
152 &l0.sub_levels[0].table_infos,
153 &target_level.table_infos,
154 level_handlers,
155 stats,
156 )
157 }
158
159 fn pick_multi_level_to_base(
160 &self,
161 l0: &OverlappingLevel,
162 target_level: &Level,
163 vnode_partition_count: u32,
164 level_handlers: &[LevelHandler],
165 stats: &mut LocalPickerStatistic,
166 ) -> Option<CompactionInput> {
167 let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());
168 let min_compaction_bytes = self.config.sub_level_max_compaction_bytes;
169 let non_overlap_sub_level_picker = NonOverlapSubLevelPicker::new(
170 min_compaction_bytes,
171 std::cmp::max(
174 self.config.max_bytes_for_level_base,
175 self.config.max_compaction_bytes / 2,
176 ),
177 1,
178 self.config.level0_max_compact_file_number,
180 overlap_strategy.clone(),
181 self.developer_config.enable_check_task_level_overlap,
182 self.config
183 .max_l0_compact_level_count
184 .unwrap_or(compaction_config::max_l0_compact_level_count()) as usize,
185 self.config
186 .enable_optimize_l0_interval_selection
187 .unwrap_or(compaction_config::enable_optimize_l0_interval_selection()),
188 );
189
190 let mut max_vnode_partition_idx = 0;
191 for (idx, level) in l0.sub_levels.iter().enumerate() {
192 if level.vnode_partition_count < vnode_partition_count {
193 break;
194 }
195 max_vnode_partition_idx = idx;
196 }
197
198 let l0_select_tables_vec = non_overlap_sub_level_picker.pick_l0_multi_non_overlap_level(
199 &l0.sub_levels[..=max_vnode_partition_idx],
200 &level_handlers[0],
201 );
202 if l0_select_tables_vec.is_empty() {
203 stats.skip_by_pending_files += 1;
204 return None;
205 }
206
207 let mut skip_by_pending = false;
208 let mut input_levels = vec![];
209
210 for input in l0_select_tables_vec {
211 let l0_select_tables = input
212 .sstable_infos
213 .iter()
214 .flat_map(|(_, select_tables)| select_tables.clone())
215 .collect_vec();
216
217 let target_level_ssts = overlap_strategy
218 .check_base_level_overlap(&l0_select_tables, &target_level.table_infos);
219
220 let mut target_level_size = 0;
221 let mut pending_compact = false;
222 for sst in &target_level_ssts {
223 if level_handlers[target_level.level_idx as usize].is_pending_compact(&sst.sst_id) {
224 pending_compact = true;
225 break;
226 }
227
228 target_level_size += sst.sst_size;
229 }
230
231 if pending_compact {
232 skip_by_pending = true;
233 continue;
234 }
235
236 input_levels.push((input, target_level_size, target_level_ssts));
237 }
238
239 if input_levels.is_empty() {
240 if skip_by_pending {
241 stats.skip_by_pending_files += 1;
242 }
243 return None;
244 }
245
246 for (input, target_file_size, target_level_files) in input_levels {
247 let mut select_level_inputs = input
248 .sstable_infos
249 .into_iter()
250 .map(|(_, table_infos)| InputLevel {
251 level_idx: 0,
252 level_type: LevelType::Nonoverlapping,
253 table_infos,
254 })
255 .collect_vec();
256 select_level_inputs.reverse();
257 let target_file_count = target_level_files.len();
258 select_level_inputs.push(InputLevel {
259 level_idx: target_level.level_idx,
260 level_type: target_level.level_type,
261 table_infos: target_level_files,
262 });
263
264 let result = CompactionInput {
265 input_levels: select_level_inputs,
266 target_level: self.target_level,
267 select_input_size: input.total_file_size,
268 target_input_size: target_file_size,
269 total_file_count: (input.total_file_count + target_file_count) as u64,
270 vnode_partition_count,
271 ..Default::default()
272 };
273
274 if !self.compaction_task_validator.valid_compact_task(
275 &result,
276 ValidationRuleType::ToBase,
277 stats,
278 ) {
279 if l0.total_file_size > target_level.total_file_size * 8 {
280 let log_counter = LOG_COUNTER.with_borrow_mut(|counter| {
281 *counter += 1;
282 *counter
283 });
284
285 if log_counter.is_multiple_of(100) {
287 tracing::warn!(
288 "skip task with level count: {}, file count: {}, select size: {}, target size: {}, target level size: {}",
289 result.input_levels.len(),
290 result.total_file_count,
291 result.select_input_size,
292 result.target_input_size,
293 target_level.total_file_size,
294 );
295 }
296 }
297 continue;
298 }
299
300 return Some(result);
301 }
302 None
303 }
304}
305
306#[cfg(test)]
307pub mod tests {
308
309 use super::*;
310 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
311 use crate::hummock::compaction::selector::tests::*;
312 use crate::hummock::compaction::{CompactionMode, TierCompactionPicker};
313
314 fn create_compaction_picker_for_test() -> LevelCompactionPicker {
315 let config = Arc::new(
316 CompactionConfigBuilder::new()
317 .level0_tier_compact_file_number(2)
318 .level0_sub_level_compact_level_count(1)
319 .build(),
320 );
321 LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()))
322 }
323
324 #[test]
325 fn test_compact_l0_to_l1() {
326 let mut picker = create_compaction_picker_for_test();
327 let l0 = generate_level(
328 0,
329 vec![
330 generate_table(5, 1, 100, 200, 2),
331 generate_table(4, 1, 201, 300, 2),
332 ],
333 );
334 let mut levels = Levels {
335 l0: OverlappingLevel {
336 total_file_size: l0.total_file_size,
337 uncompressed_file_size: l0.total_file_size,
338 sub_levels: vec![l0],
339 },
340 levels: vec![generate_level(
341 1,
342 vec![
343 generate_table(3, 1, 1, 100, 1),
344 generate_table(2, 1, 101, 150, 1),
345 generate_table(1, 1, 201, 210, 1),
346 ],
347 )],
348 ..Default::default()
349 };
350 let mut local_stats = LocalPickerStatistic::default();
351 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
352
353 let ret = picker
354 .pick_compaction(&levels, &levels_handler, &mut local_stats)
355 .unwrap();
356 assert_eq!(ret.input_levels[0].table_infos.len(), 1);
357 assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 4);
358 assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 1);
359
360 ret.add_pending_task(0, &mut levels_handler);
361 {
362 push_table_level0_nonoverlapping(&mut levels, generate_table(6, 1, 100, 200, 2));
363 push_table_level0_nonoverlapping(&mut levels, generate_table(7, 1, 301, 333, 4));
364 let ret2 = picker
365 .pick_compaction(&levels, &levels_handler, &mut local_stats)
366 .unwrap();
367
368 assert_eq!(ret2.input_levels[0].table_infos.len(), 1);
369 assert_eq!(ret2.input_levels[0].table_infos[0].sst_id, 6);
370 assert_eq!(ret2.input_levels[1].table_infos[0].sst_id, 5);
371 }
372
373 levels.l0.sub_levels[0]
374 .table_infos
375 .retain(|table| table.sst_id != 4);
376 levels.l0.total_file_size -= ret.input_levels[0].table_infos[0].file_size;
377
378 levels_handler[0].remove_task(0);
379 levels_handler[1].remove_task(0);
380
381 let ret = picker
382 .pick_compaction(&levels, &levels_handler, &mut local_stats)
383 .unwrap();
384 assert_eq!(ret.input_levels.len(), 3);
385 assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 6);
386 assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 5);
387 assert_eq!(ret.input_levels[2].table_infos.len(), 2);
388 assert_eq!(ret.input_levels[2].table_infos[0].sst_id, 3);
389 assert_eq!(ret.input_levels[2].table_infos[1].sst_id, 2);
390 ret.add_pending_task(1, &mut levels_handler);
391
392 let mut local_stats = LocalPickerStatistic::default();
393 push_table_level0_overlapping(&mut levels, generate_table(8, 1, 199, 233, 3));
396 let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
397 assert!(ret.is_none());
398
399 levels_handler[0].remove_task(1);
401 levels_handler[1].remove_task(1);
402 let ret = picker
403 .pick_compaction(&levels, &levels_handler, &mut local_stats)
404 .unwrap();
405 assert_eq!(ret.input_levels.len(), 3);
406 assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 6);
407 assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 5);
408 assert_eq!(ret.input_levels[2].table_infos.len(), 2);
409 }
410
411 #[test]
412 fn test_selecting_key_range_overlap() {
413 let config = Arc::new(
415 CompactionConfigBuilder::new()
416 .level0_tier_compact_file_number(2)
417 .compaction_mode(CompactionMode::Range as i32)
418 .level0_sub_level_compact_level_count(1)
419 .enable_optimize_l0_interval_selection(Some(false))
420 .build(),
421 );
422
423 let config_enable_optimize_l0_interval_selection = Arc::new(
424 CompactionConfigBuilder::new()
425 .level0_tier_compact_file_number(2)
426 .compaction_mode(CompactionMode::Range as i32)
427 .level0_sub_level_compact_level_count(1)
428 .enable_optimize_l0_interval_selection(Some(true))
429 .build(),
430 );
431
432 let mut picker =
433 LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
434
435 let mut picker_enable_optimize_l0_interval_selection = LevelCompactionPicker::new(
436 1,
437 config_enable_optimize_l0_interval_selection,
438 Arc::new(CompactionDeveloperConfig::default()),
439 );
440
441 let levels = vec![Level {
442 level_idx: 1,
443 level_type: LevelType::Nonoverlapping,
444 table_infos: vec![
445 generate_table(3, 1, 0, 50, 1),
446 generate_table(4, 1, 150, 180, 1),
447 generate_table(5, 1, 250, 300, 1),
448 ],
449 ..Default::default()
450 }];
451 let mut levels = Levels {
452 levels,
453 l0: OverlappingLevel {
454 sub_levels: vec![],
455 total_file_size: 0,
456 uncompressed_file_size: 0,
457 },
458 ..Default::default()
459 };
460 push_tables_level0_nonoverlapping(&mut levels, vec![generate_table(1, 1, 50, 140, 2)]);
461 push_tables_level0_nonoverlapping(
462 &mut levels,
463 vec![
464 generate_table(7, 1, 200, 250, 2),
465 generate_table(8, 1, 400, 500, 2),
466 ],
467 );
468
469 {
470 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
471
472 let mut local_stats = LocalPickerStatistic::default();
473 let ret = picker_enable_optimize_l0_interval_selection
474 .pick_compaction(&levels, &levels_handler, &mut local_stats)
475 .unwrap();
476
477 assert_eq!(ret.input_levels.len(), 2);
480 assert_eq!(
481 ret.input_levels[0]
482 .table_infos
483 .iter()
484 .map(|t| t.sst_id)
485 .collect_vec(),
486 vec![8]
487 );
488 assert!(ret.input_levels[1].table_infos.is_empty());
490
491 ret.add_pending_task(0, &mut levels_handler);
492
493 let ret = picker_enable_optimize_l0_interval_selection
494 .pick_compaction(&levels, &levels_handler, &mut local_stats)
495 .unwrap();
496
497 assert_eq!(ret.input_levels.len(), 2);
498 assert_eq!(
499 ret.input_levels[0]
500 .table_infos
501 .iter()
502 .map(|t| t.sst_id)
503 .collect_vec(),
504 vec![7]
505 );
506
507 assert_eq!(
508 ret.input_levels[1]
509 .table_infos
510 .iter()
511 .map(|t| t.sst_id)
512 .collect_vec(),
513 vec![5]
514 );
515 }
516
517 {
518 let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
519
520 let mut local_stats = LocalPickerStatistic::default();
521 let ret = picker
522 .pick_compaction(&levels, &levels_handler, &mut local_stats)
523 .unwrap();
524
525 assert_eq!(ret.input_levels.len(), 2);
527 assert_eq!(
528 ret.input_levels[0]
529 .table_infos
530 .iter()
531 .map(|t| t.sst_id.inner())
532 .collect_vec(),
533 vec![1]
534 );
535
536 assert_eq!(
537 ret.input_levels[1]
538 .table_infos
539 .iter()
540 .map(|t| t.sst_id.inner())
541 .collect_vec(),
542 vec![3]
543 );
544 }
545 }
546
547 #[test]
548 fn test_l0_to_l1_compact_conflict() {
549 let mut picker = create_compaction_picker_for_test();
552 let levels = vec![Level {
553 level_idx: 1,
554 level_type: LevelType::Nonoverlapping,
555 table_infos: vec![],
556 total_file_size: 0,
557 sub_level_id: 0,
558 uncompressed_file_size: 0,
559 ..Default::default()
560 }];
561 let mut levels = Levels {
562 levels,
563 l0: OverlappingLevel {
564 sub_levels: vec![],
565 total_file_size: 0,
566 uncompressed_file_size: 0,
567 },
568 ..Default::default()
569 };
570 push_tables_level0_nonoverlapping(
571 &mut levels,
572 vec![
573 generate_table(1, 1, 100, 300, 2),
574 generate_table(2, 1, 350, 500, 2),
575 ],
576 );
577 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
578
579 let mut local_stats = LocalPickerStatistic::default();
580 let ret = picker
581 .pick_compaction(&levels, &levels_handler, &mut local_stats)
582 .unwrap();
583 ret.add_pending_task(0, &mut levels_handler); push_tables_level0_nonoverlapping(&mut levels, vec![generate_table(3, 1, 250, 300, 3)]);
586 let config: CompactionConfig = CompactionConfigBuilder::new()
587 .level0_tier_compact_file_number(2)
588 .max_compaction_bytes(1000)
589 .sub_level_max_compaction_bytes(150)
590 .max_bytes_for_level_multiplier(1)
591 .level0_sub_level_compact_level_count(3)
592 .build();
593 let mut picker = TierCompactionPicker::new(Arc::new(config));
594
595 let ret: Option<CompactionInput> =
596 picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
597 assert!(ret.is_none());
598 }
599
600 #[test]
601 fn test_skip_compact_write_amplification_limit() {
602 let config: CompactionConfig = CompactionConfigBuilder::new()
603 .level0_tier_compact_file_number(2)
604 .max_compaction_bytes(1000)
605 .sub_level_max_compaction_bytes(150)
606 .max_bytes_for_level_multiplier(1)
607 .level0_sub_level_compact_level_count(2)
608 .build();
609 let mut picker = LevelCompactionPicker::new(
610 1,
611 Arc::new(config),
612 Arc::new(CompactionDeveloperConfig::default()),
613 );
614
615 let mut levels = Levels {
616 levels: vec![Level {
617 level_idx: 1,
618 level_type: LevelType::Nonoverlapping,
619 table_infos: vec![
620 generate_table(1, 1, 100, 399, 2),
621 generate_table(2, 1, 400, 699, 2),
622 generate_table(3, 1, 700, 999, 2),
623 ],
624 total_file_size: 900,
625 sub_level_id: 0,
626 uncompressed_file_size: 900,
627 ..Default::default()
628 }],
629 l0: generate_l0_nonoverlapping_sublevels(vec![]),
630 ..Default::default()
631 };
632 push_tables_level0_nonoverlapping(
633 &mut levels,
634 vec![
635 generate_table(4, 1, 100, 180, 2),
636 generate_table(5, 1, 400, 450, 2),
637 generate_table(6, 1, 600, 700, 2),
638 ],
639 );
640
641 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
642 let mut local_stats = LocalPickerStatistic::default();
643 levels_handler[0].add_pending_task(1, 4, &levels.l0.sub_levels[0].table_infos);
644 let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
645 assert!(ret.is_none());
647 }
648
649 #[test]
650 fn test_l0_to_l1_break_on_exceed_compaction_size() {
651 let mut local_stats = LocalPickerStatistic::default();
652 let mut l0 = generate_l0_overlapping_sublevels(vec![
653 vec![
654 generate_table(4, 1, 10, 90, 1),
655 generate_table(5, 1, 210, 220, 1),
656 ],
657 vec![generate_table(6, 1, 0, 100000, 1)],
658 vec![generate_table(7, 1, 0, 100000, 1)],
659 ]);
660 for s in &mut l0.sub_levels {
662 s.level_type = LevelType::Nonoverlapping;
663 }
664 let levels = Levels {
665 l0,
666 levels: vec![generate_level(1, vec![generate_table(3, 1, 0, 100000, 1)])],
667 ..Default::default()
668 };
669 let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
670
671 let config = Arc::new(
673 CompactionConfigBuilder::new()
674 .max_compaction_bytes(500000)
675 .sub_level_max_compaction_bytes(50000)
676 .max_bytes_for_level_base(500000)
677 .level0_sub_level_compact_level_count(1)
678 .build(),
679 );
680 let mut picker =
683 LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
684 let ret = picker
685 .pick_compaction(&levels, &levels_handler, &mut local_stats)
686 .unwrap();
687 assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 7);
688 assert_eq!(
689 3,
690 ret.input_levels.iter().filter(|l| l.level_idx == 0).count()
691 );
692 assert_eq!(
693 4,
694 ret.input_levels
695 .iter()
696 .filter(|l| l.level_idx == 0)
697 .map(|l| l.table_infos.len())
698 .sum::<usize>()
699 );
700
701 let config = Arc::new(
703 CompactionConfigBuilder::new()
704 .max_compaction_bytes(100010)
705 .max_bytes_for_level_base(512)
706 .level0_sub_level_compact_level_count(1)
707 .build(),
708 );
709 let mut picker =
710 LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
711
712 let ret = picker
713 .pick_compaction(&levels, &levels_handler, &mut local_stats)
714 .unwrap();
715 assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 6);
716 assert_eq!(
717 2,
718 ret.input_levels.iter().filter(|l| l.level_idx == 0).count()
719 );
720 assert_eq!(
721 3,
722 ret.input_levels
723 .iter()
724 .filter(|l| l.level_idx == 0)
725 .map(|l| l.table_infos.len())
726 .sum::<usize>()
727 );
728 }
729
730 #[test]
731 fn test_l0_to_l1_break_on_pending_sub_level() {
732 let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
733 vec![
734 generate_table(4, 1, 10, 90, 1),
735 generate_table(5, 1, 210, 220, 1),
736 ],
737 vec![generate_table(6, 1, 0, 100000, 1)],
738 vec![generate_table(7, 1, 0, 100000, 1)],
739 ]);
740
741 let levels = Levels {
742 l0,
743 levels: vec![generate_level(1, vec![generate_table(3, 1, 0, 100000, 1)])],
744 ..Default::default()
745 };
746 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
747 let mut local_stats = LocalPickerStatistic::default();
748
749 let pending_level = levels.l0.sub_levels[1].clone();
751 assert_eq!(pending_level.sub_level_id, 1);
752 let tier_task_input = CompactionInput {
753 input_levels: vec![InputLevel {
754 level_idx: 0,
755 level_type: pending_level.level_type,
756 table_infos: pending_level.table_infos.clone(),
757 }],
758 target_level: 1,
759 target_sub_level_id: pending_level.sub_level_id,
760 ..Default::default()
761 };
762 assert!(!levels_handler[0].is_level_pending_compact(&pending_level));
763 tier_task_input.add_pending_task(1, &mut levels_handler);
764 assert!(levels_handler[0].is_level_pending_compact(&pending_level));
765
766 let config = Arc::new(
768 CompactionConfigBuilder::new()
769 .max_compaction_bytes(500000)
770 .level0_sub_level_compact_level_count(2)
771 .build(),
772 );
773
774 let mut picker = LevelCompactionPicker::new(
777 1,
778 config.clone(),
779 Arc::new(CompactionDeveloperConfig::default()),
780 );
781 let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
782 assert!(ret.is_none());
783
784 for pending_task_id in &levels_handler[0].pending_tasks_ids() {
786 levels_handler[0].remove_task(*pending_task_id);
787 }
788
789 let mut picker =
791 LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
792 picker
793 .pick_compaction(&levels, &levels_handler, &mut local_stats)
794 .unwrap();
795 }
796
797 #[test]
798 fn test_l0_to_base_when_all_base_pending() {
799 let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
800 vec![
801 generate_table(4, 1, 10, 90, 1),
802 generate_table(5, 1, 1000, 2000, 1),
803 ],
804 vec![generate_table(6, 1, 10, 90, 1)],
805 ]);
806
807 let levels = Levels {
808 l0,
809 levels: vec![generate_level(1, vec![generate_table(3, 1, 1, 100, 1)])],
810 ..Default::default()
811 };
812 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
813 let mut local_stats = LocalPickerStatistic::default();
814
815 let config = Arc::new(
816 CompactionConfigBuilder::new()
817 .max_compaction_bytes(500000)
818 .level0_sub_level_compact_level_count(2)
819 .sub_level_max_compaction_bytes(1000)
820 .build(),
821 );
822
823 let mut picker =
824 LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
825 let ret = picker
826 .pick_compaction(&levels, &levels_handler, &mut local_stats)
827 .unwrap();
828 assert_eq!(2, ret.input_levels.len());
830 assert!(ret.input_levels[1].table_infos.is_empty());
831 assert_eq!(5, ret.input_levels[0].table_infos[0].sst_id);
832 ret.add_pending_task(0, &mut levels_handler);
833
834 let ret = picker
835 .pick_compaction(&levels, &levels_handler, &mut local_stats)
836 .unwrap();
837 assert_eq!(3, ret.input_levels.len());
838 assert_eq!(6, ret.input_levels[0].table_infos[0].sst_id);
839 }
840}