1use std::cell::RefCell;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::config::default::compaction_config;
20use risingwave_hummock_sdk::level::{InputLevel, Level, Levels, OverlappingLevel};
21use risingwave_pb::hummock::{CompactionConfig, LevelType};
22
23use super::min_overlap_compaction_picker::NonOverlapSubLevelPicker;
24use super::{
25 CompactionInput, CompactionPicker, CompactionTaskValidator, LocalPickerStatistic,
26 ValidationRuleType,
27};
28use crate::hummock::compaction::picker::TrivialMovePicker;
29use crate::hummock::compaction::{CompactionDeveloperConfig, create_overlap_strategy};
30use crate::hummock::level_handler::LevelHandler;
31
32std::thread_local! {
33 static LOG_COUNTER: RefCell<usize> = const { RefCell::new(0) };
34}
35
36pub struct LevelCompactionPicker {
37 target_level: usize,
38 config: Arc<CompactionConfig>,
39 compaction_task_validator: Arc<CompactionTaskValidator>,
40 developer_config: Arc<CompactionDeveloperConfig>,
41}
42
43impl CompactionPicker for LevelCompactionPicker {
44 fn pick_compaction(
45 &mut self,
46 levels: &Levels,
47 level_handlers: &[LevelHandler],
48 stats: &mut LocalPickerStatistic,
49 ) -> Option<CompactionInput> {
50 let l0 = &levels.l0;
51 if l0.sub_levels.is_empty() {
52 return None;
53 }
54 if l0.sub_levels[0].level_type != LevelType::Nonoverlapping
55 && l0.sub_levels[0].table_infos.len() > 1
56 {
57 stats.skip_by_overlapping += 1;
58 return None;
59 }
60
61 let is_l0_pending_compact =
62 level_handlers[0].is_level_all_pending_compact(&l0.sub_levels[0]);
63
64 if is_l0_pending_compact {
65 stats.skip_by_pending_files += 1;
66 return None;
67 }
68
69 if let Some(mut ret) = self.pick_base_trivial_move(
70 l0,
71 levels.get_level(self.target_level),
72 level_handlers,
73 stats,
74 ) {
75 ret.vnode_partition_count = self.config.split_weight_by_vnode;
76 return Some(ret);
77 }
78
79 debug_assert!(self.target_level == levels.get_level(self.target_level).level_idx as usize);
80 if let Some(ret) = self.pick_multi_level_to_base(
81 l0,
82 levels.get_level(self.target_level),
83 self.config.split_weight_by_vnode,
84 level_handlers,
85 stats,
86 ) {
87 return Some(ret);
88 }
89
90 None
91 }
92}
93
94impl LevelCompactionPicker {
95 #[cfg(test)]
96 pub fn new(
97 target_level: usize,
98 config: Arc<CompactionConfig>,
99 developer_config: Arc<CompactionDeveloperConfig>,
100 ) -> LevelCompactionPicker {
101 LevelCompactionPicker {
102 target_level,
103 compaction_task_validator: Arc::new(CompactionTaskValidator::new(config.clone())),
104 config,
105 developer_config,
106 }
107 }
108
109 pub fn new_with_validator(
110 target_level: usize,
111 config: Arc<CompactionConfig>,
112 compaction_task_validator: Arc<CompactionTaskValidator>,
113 developer_config: Arc<CompactionDeveloperConfig>,
114 ) -> LevelCompactionPicker {
115 LevelCompactionPicker {
116 target_level,
117 config,
118 compaction_task_validator,
119 developer_config,
120 }
121 }
122
123 fn pick_base_trivial_move(
124 &self,
125 l0: &OverlappingLevel,
126 target_level: &Level,
127 level_handlers: &[LevelHandler],
128 stats: &mut LocalPickerStatistic,
129 ) -> Option<CompactionInput> {
130 if !self.developer_config.enable_trivial_move {
131 return None;
132 }
133
134 let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());
135 let trivial_move_picker = TrivialMovePicker::new(
136 0,
137 self.target_level,
138 overlap_strategy.clone(),
139 if self.compaction_task_validator.is_enable() {
140 self.config.sst_allowed_trivial_move_min_size.unwrap_or(0)
142 } else {
143 0
144 },
145 self.config
146 .sst_allowed_trivial_move_max_count
147 .unwrap_or(compaction_config::sst_allowed_trivial_move_max_count())
148 as usize,
149 );
150
151 trivial_move_picker.pick_trivial_move_task(
152 &l0.sub_levels[0].table_infos,
153 &target_level.table_infos,
154 level_handlers,
155 stats,
156 )
157 }
158
159 fn pick_multi_level_to_base(
160 &self,
161 l0: &OverlappingLevel,
162 target_level: &Level,
163 vnode_partition_count: u32,
164 level_handlers: &[LevelHandler],
165 stats: &mut LocalPickerStatistic,
166 ) -> Option<CompactionInput> {
167 let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());
168 let min_compaction_bytes = self.config.sub_level_max_compaction_bytes;
169 let non_overlap_sub_level_picker = NonOverlapSubLevelPicker::new(
170 min_compaction_bytes,
171 std::cmp::max(
174 self.config.max_bytes_for_level_base,
175 self.config.max_compaction_bytes / 2,
176 ),
177 1,
178 self.config.level0_max_compact_file_number,
180 overlap_strategy.clone(),
181 self.developer_config.enable_check_task_level_overlap,
182 self.config
183 .max_l0_compact_level_count
184 .unwrap_or(compaction_config::max_l0_compact_level_count()) as usize,
185 );
186
187 let mut max_vnode_partition_idx = 0;
188 for (idx, level) in l0.sub_levels.iter().enumerate() {
189 if level.vnode_partition_count < vnode_partition_count {
190 break;
191 }
192 max_vnode_partition_idx = idx;
193 }
194
195 let l0_select_tables_vec = non_overlap_sub_level_picker.pick_l0_multi_non_overlap_level(
196 &l0.sub_levels[..=max_vnode_partition_idx],
197 &level_handlers[0],
198 );
199 if l0_select_tables_vec.is_empty() {
200 stats.skip_by_pending_files += 1;
201 return None;
202 }
203
204 let mut skip_by_pending = false;
205 let mut input_levels = vec![];
206
207 for input in l0_select_tables_vec {
208 let l0_select_tables = input
209 .sstable_infos
210 .iter()
211 .flat_map(|select_tables| select_tables.clone())
212 .collect_vec();
213
214 let target_level_ssts = overlap_strategy
215 .check_base_level_overlap(&l0_select_tables, &target_level.table_infos);
216
217 let mut target_level_size = 0;
218 let mut pending_compact = false;
219 for sst in &target_level_ssts {
220 if level_handlers[target_level.level_idx as usize].is_pending_compact(&sst.sst_id) {
221 pending_compact = true;
222 break;
223 }
224
225 target_level_size += sst.sst_size;
226 }
227
228 if pending_compact {
229 skip_by_pending = true;
230 continue;
231 }
232
233 input_levels.push((input, target_level_size, target_level_ssts));
234 }
235
236 if input_levels.is_empty() {
237 if skip_by_pending {
238 stats.skip_by_pending_files += 1;
239 }
240 return None;
241 }
242
243 for (input, target_file_size, target_level_files) in input_levels {
244 let mut select_level_inputs = input
245 .sstable_infos
246 .into_iter()
247 .map(|table_infos| InputLevel {
248 level_idx: 0,
249 level_type: LevelType::Nonoverlapping,
250 table_infos,
251 })
252 .collect_vec();
253 select_level_inputs.reverse();
254 let target_file_count = target_level_files.len();
255 select_level_inputs.push(InputLevel {
256 level_idx: target_level.level_idx,
257 level_type: target_level.level_type,
258 table_infos: target_level_files,
259 });
260
261 let result = CompactionInput {
262 input_levels: select_level_inputs,
263 target_level: self.target_level,
264 select_input_size: input.total_file_size,
265 target_input_size: target_file_size,
266 total_file_count: (input.total_file_count + target_file_count) as u64,
267 vnode_partition_count,
268 ..Default::default()
269 };
270
271 if !self.compaction_task_validator.valid_compact_task(
272 &result,
273 ValidationRuleType::ToBase,
274 stats,
275 ) {
276 if l0.total_file_size > target_level.total_file_size * 8 {
277 let log_counter = LOG_COUNTER.with_borrow_mut(|counter| {
278 *counter += 1;
279 *counter
280 });
281
282 if log_counter % 100 == 0 {
284 tracing::warn!(
285 "skip task with level count: {}, file count: {}, select size: {}, target size: {}, target level size: {}",
286 result.input_levels.len(),
287 result.total_file_count,
288 result.select_input_size,
289 result.target_input_size,
290 target_level.total_file_size,
291 );
292 }
293 }
294 continue;
295 }
296
297 return Some(result);
298 }
299 None
300 }
301}
302
303#[cfg(test)]
304pub mod tests {
305
306 use super::*;
307 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
308 use crate::hummock::compaction::selector::tests::*;
309 use crate::hummock::compaction::{CompactionMode, TierCompactionPicker};
310
311 fn create_compaction_picker_for_test() -> LevelCompactionPicker {
312 let config = Arc::new(
313 CompactionConfigBuilder::new()
314 .level0_tier_compact_file_number(2)
315 .level0_sub_level_compact_level_count(1)
316 .build(),
317 );
318 LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()))
319 }
320
321 #[test]
322 fn test_compact_l0_to_l1() {
323 let mut picker = create_compaction_picker_for_test();
324 let l0 = generate_level(
325 0,
326 vec![
327 generate_table(5, 1, 100, 200, 2),
328 generate_table(4, 1, 201, 300, 2),
329 ],
330 );
331 let mut levels = Levels {
332 l0: OverlappingLevel {
333 total_file_size: l0.total_file_size,
334 uncompressed_file_size: l0.total_file_size,
335 sub_levels: vec![l0],
336 },
337 levels: vec![generate_level(
338 1,
339 vec![
340 generate_table(3, 1, 1, 100, 1),
341 generate_table(2, 1, 101, 150, 1),
342 generate_table(1, 1, 201, 210, 1),
343 ],
344 )],
345 ..Default::default()
346 };
347 let mut local_stats = LocalPickerStatistic::default();
348 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
349
350 let ret = picker
351 .pick_compaction(&levels, &levels_handler, &mut local_stats)
352 .unwrap();
353 assert_eq!(ret.input_levels[0].table_infos.len(), 1);
354 assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 4);
355 assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 1);
356
357 ret.add_pending_task(0, &mut levels_handler);
358 {
359 push_table_level0_nonoverlapping(&mut levels, generate_table(6, 1, 100, 200, 2));
360 push_table_level0_nonoverlapping(&mut levels, generate_table(7, 1, 301, 333, 4));
361 let ret2 = picker
362 .pick_compaction(&levels, &levels_handler, &mut local_stats)
363 .unwrap();
364
365 assert_eq!(ret2.input_levels[0].table_infos.len(), 1);
366 assert_eq!(ret2.input_levels[0].table_infos[0].sst_id, 6);
367 assert_eq!(ret2.input_levels[1].table_infos[0].sst_id, 5);
368 }
369
370 levels.l0.sub_levels[0]
371 .table_infos
372 .retain(|table| table.sst_id != 4);
373 levels.l0.total_file_size -= ret.input_levels[0].table_infos[0].file_size;
374
375 levels_handler[0].remove_task(0);
376 levels_handler[1].remove_task(0);
377
378 let ret = picker
379 .pick_compaction(&levels, &levels_handler, &mut local_stats)
380 .unwrap();
381 assert_eq!(ret.input_levels.len(), 3);
382 assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 6);
383 assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 5);
384 assert_eq!(ret.input_levels[2].table_infos.len(), 2);
385 assert_eq!(ret.input_levels[2].table_infos[0].sst_id, 3);
386 assert_eq!(ret.input_levels[2].table_infos[1].sst_id, 2);
387 ret.add_pending_task(1, &mut levels_handler);
388
389 let mut local_stats = LocalPickerStatistic::default();
390 push_table_level0_overlapping(&mut levels, generate_table(8, 1, 199, 233, 3));
393 let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
394 assert!(ret.is_none());
395
396 levels_handler[0].remove_task(1);
398 levels_handler[1].remove_task(1);
399 let ret = picker
400 .pick_compaction(&levels, &levels_handler, &mut local_stats)
401 .unwrap();
402 assert_eq!(ret.input_levels.len(), 3);
403 assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 6);
404 assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 5);
405 assert_eq!(ret.input_levels[2].table_infos.len(), 2);
406 }
407 #[test]
408 fn test_selecting_key_range_overlap() {
409 let config = Arc::new(
411 CompactionConfigBuilder::new()
412 .level0_tier_compact_file_number(2)
413 .compaction_mode(CompactionMode::Range as i32)
414 .level0_sub_level_compact_level_count(1)
415 .build(),
416 );
417 let mut picker =
418 LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
419
420 let levels = vec![Level {
421 level_idx: 1,
422 level_type: LevelType::Nonoverlapping,
423 table_infos: vec![
424 generate_table(3, 1, 0, 50, 1),
425 generate_table(4, 1, 150, 200, 1),
426 generate_table(5, 1, 250, 300, 1),
427 ],
428 ..Default::default()
429 }];
430 let mut levels = Levels {
431 levels,
432 l0: OverlappingLevel {
433 sub_levels: vec![],
434 total_file_size: 0,
435 uncompressed_file_size: 0,
436 },
437 ..Default::default()
438 };
439 push_tables_level0_nonoverlapping(&mut levels, vec![generate_table(1, 1, 50, 140, 2)]);
440 push_tables_level0_nonoverlapping(
441 &mut levels,
442 vec![
443 generate_table(7, 1, 200, 250, 2),
444 generate_table(8, 1, 400, 500, 2),
445 ],
446 );
447
448 let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
449
450 let mut local_stats = LocalPickerStatistic::default();
451 let ret = picker
452 .pick_compaction(&levels, &levels_handler, &mut local_stats)
453 .unwrap();
454
455 assert_eq!(ret.input_levels.len(), 2);
459 assert_eq!(
460 ret.input_levels[0]
461 .table_infos
462 .iter()
463 .map(|t| t.sst_id)
464 .collect_vec(),
465 vec![1]
466 );
467
468 assert_eq!(
469 ret.input_levels[1]
470 .table_infos
471 .iter()
472 .map(|t| t.sst_id)
473 .collect_vec(),
474 vec![3,]
475 );
476 }
477
478 #[test]
479 fn test_l0_to_l1_compact_conflict() {
480 let mut picker = create_compaction_picker_for_test();
483 let levels = vec![Level {
484 level_idx: 1,
485 level_type: LevelType::Nonoverlapping,
486 table_infos: vec![],
487 total_file_size: 0,
488 sub_level_id: 0,
489 uncompressed_file_size: 0,
490 ..Default::default()
491 }];
492 let mut levels = Levels {
493 levels,
494 l0: OverlappingLevel {
495 sub_levels: vec![],
496 total_file_size: 0,
497 uncompressed_file_size: 0,
498 },
499 ..Default::default()
500 };
501 push_tables_level0_nonoverlapping(
502 &mut levels,
503 vec![
504 generate_table(1, 1, 100, 300, 2),
505 generate_table(2, 1, 350, 500, 2),
506 ],
507 );
508 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
509
510 let mut local_stats = LocalPickerStatistic::default();
511 let ret = picker
512 .pick_compaction(&levels, &levels_handler, &mut local_stats)
513 .unwrap();
514 ret.add_pending_task(0, &mut levels_handler); push_tables_level0_nonoverlapping(&mut levels, vec![generate_table(3, 1, 250, 300, 3)]);
517 let config: CompactionConfig = CompactionConfigBuilder::new()
518 .level0_tier_compact_file_number(2)
519 .max_compaction_bytes(1000)
520 .sub_level_max_compaction_bytes(150)
521 .max_bytes_for_level_multiplier(1)
522 .level0_sub_level_compact_level_count(3)
523 .build();
524 let mut picker = TierCompactionPicker::new(Arc::new(config));
525
526 let ret: Option<CompactionInput> =
527 picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
528 assert!(ret.is_none());
529 }
530
531 #[test]
532 fn test_skip_compact_write_amplification_limit() {
533 let config: CompactionConfig = CompactionConfigBuilder::new()
534 .level0_tier_compact_file_number(2)
535 .max_compaction_bytes(1000)
536 .sub_level_max_compaction_bytes(150)
537 .max_bytes_for_level_multiplier(1)
538 .level0_sub_level_compact_level_count(2)
539 .build();
540 let mut picker = LevelCompactionPicker::new(
541 1,
542 Arc::new(config),
543 Arc::new(CompactionDeveloperConfig::default()),
544 );
545
546 let mut levels = Levels {
547 levels: vec![Level {
548 level_idx: 1,
549 level_type: LevelType::Nonoverlapping,
550 table_infos: vec![
551 generate_table(1, 1, 100, 399, 2),
552 generate_table(2, 1, 400, 699, 2),
553 generate_table(3, 1, 700, 999, 2),
554 ],
555 total_file_size: 900,
556 sub_level_id: 0,
557 uncompressed_file_size: 900,
558 ..Default::default()
559 }],
560 l0: generate_l0_nonoverlapping_sublevels(vec![]),
561 ..Default::default()
562 };
563 push_tables_level0_nonoverlapping(
564 &mut levels,
565 vec![
566 generate_table(4, 1, 100, 180, 2),
567 generate_table(5, 1, 400, 450, 2),
568 generate_table(6, 1, 600, 700, 2),
569 ],
570 );
571
572 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
573 let mut local_stats = LocalPickerStatistic::default();
574 levels_handler[0].add_pending_task(1, 4, &levels.l0.sub_levels[0].table_infos);
575 let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
576 assert!(ret.is_none());
578 }
579
580 #[test]
581 fn test_l0_to_l1_break_on_exceed_compaction_size() {
582 let mut local_stats = LocalPickerStatistic::default();
583 let mut l0 = generate_l0_overlapping_sublevels(vec![
584 vec![
585 generate_table(4, 1, 10, 90, 1),
586 generate_table(5, 1, 210, 220, 1),
587 ],
588 vec![generate_table(6, 1, 0, 100000, 1)],
589 vec![generate_table(7, 1, 0, 100000, 1)],
590 ]);
591 for s in &mut l0.sub_levels {
593 s.level_type = LevelType::Nonoverlapping;
594 }
595 let levels = Levels {
596 l0,
597 levels: vec![generate_level(1, vec![generate_table(3, 1, 0, 100000, 1)])],
598 ..Default::default()
599 };
600 let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
601
602 let config = Arc::new(
604 CompactionConfigBuilder::new()
605 .max_compaction_bytes(500000)
606 .sub_level_max_compaction_bytes(50000)
607 .max_bytes_for_level_base(500000)
608 .level0_sub_level_compact_level_count(1)
609 .build(),
610 );
611 let mut picker =
614 LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
615 let ret = picker
616 .pick_compaction(&levels, &levels_handler, &mut local_stats)
617 .unwrap();
618 assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 7);
619 assert_eq!(
620 3,
621 ret.input_levels.iter().filter(|l| l.level_idx == 0).count()
622 );
623 assert_eq!(
624 4,
625 ret.input_levels
626 .iter()
627 .filter(|l| l.level_idx == 0)
628 .map(|l| l.table_infos.len())
629 .sum::<usize>()
630 );
631
632 let config = Arc::new(
634 CompactionConfigBuilder::new()
635 .max_compaction_bytes(100010)
636 .max_bytes_for_level_base(512)
637 .level0_sub_level_compact_level_count(1)
638 .build(),
639 );
640 let mut picker =
641 LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
642
643 let ret = picker
644 .pick_compaction(&levels, &levels_handler, &mut local_stats)
645 .unwrap();
646 assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 6);
647 assert_eq!(
648 2,
649 ret.input_levels.iter().filter(|l| l.level_idx == 0).count()
650 );
651 assert_eq!(
652 3,
653 ret.input_levels
654 .iter()
655 .filter(|l| l.level_idx == 0)
656 .map(|l| l.table_infos.len())
657 .sum::<usize>()
658 );
659 }
660
661 #[test]
662 fn test_l0_to_l1_break_on_pending_sub_level() {
663 let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
664 vec![
665 generate_table(4, 1, 10, 90, 1),
666 generate_table(5, 1, 210, 220, 1),
667 ],
668 vec![generate_table(6, 1, 0, 100000, 1)],
669 vec![generate_table(7, 1, 0, 100000, 1)],
670 ]);
671
672 let levels = Levels {
673 l0,
674 levels: vec![generate_level(1, vec![generate_table(3, 1, 0, 100000, 1)])],
675 ..Default::default()
676 };
677 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
678 let mut local_stats = LocalPickerStatistic::default();
679
680 let pending_level = levels.l0.sub_levels[1].clone();
682 assert_eq!(pending_level.sub_level_id, 1);
683 let tier_task_input = CompactionInput {
684 input_levels: vec![InputLevel {
685 level_idx: 0,
686 level_type: pending_level.level_type,
687 table_infos: pending_level.table_infos.clone(),
688 }],
689 target_level: 1,
690 target_sub_level_id: pending_level.sub_level_id,
691 ..Default::default()
692 };
693 assert!(!levels_handler[0].is_level_pending_compact(&pending_level));
694 tier_task_input.add_pending_task(1, &mut levels_handler);
695 assert!(levels_handler[0].is_level_pending_compact(&pending_level));
696
697 let config = Arc::new(
699 CompactionConfigBuilder::new()
700 .max_compaction_bytes(500000)
701 .level0_sub_level_compact_level_count(2)
702 .build(),
703 );
704
705 let mut picker = LevelCompactionPicker::new(
708 1,
709 config.clone(),
710 Arc::new(CompactionDeveloperConfig::default()),
711 );
712 let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
713 assert!(ret.is_none());
714
715 for pending_task_id in &levels_handler[0].pending_tasks_ids() {
717 levels_handler[0].remove_task(*pending_task_id);
718 }
719
720 let mut picker =
722 LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
723 picker
724 .pick_compaction(&levels, &levels_handler, &mut local_stats)
725 .unwrap();
726 }
727
728 #[test]
729 fn test_l0_to_base_when_all_base_pending() {
730 let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
731 vec![
732 generate_table(4, 1, 10, 90, 1),
733 generate_table(5, 1, 1000, 2000, 1),
734 ],
735 vec![generate_table(6, 1, 10, 90, 1)],
736 ]);
737
738 let levels = Levels {
739 l0,
740 levels: vec![generate_level(1, vec![generate_table(3, 1, 1, 100, 1)])],
741 ..Default::default()
742 };
743 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
744 let mut local_stats = LocalPickerStatistic::default();
745
746 let config = Arc::new(
747 CompactionConfigBuilder::new()
748 .max_compaction_bytes(500000)
749 .level0_sub_level_compact_level_count(2)
750 .sub_level_max_compaction_bytes(1000)
751 .build(),
752 );
753
754 let mut picker =
755 LevelCompactionPicker::new(1, config, Arc::new(CompactionDeveloperConfig::default()));
756 let ret = picker
757 .pick_compaction(&levels, &levels_handler, &mut local_stats)
758 .unwrap();
759 assert_eq!(2, ret.input_levels.len());
761 assert!(ret.input_levels[1].table_infos.is_empty());
762 assert_eq!(5, ret.input_levels[0].table_infos[0].sst_id);
763 ret.add_pending_task(0, &mut levels_handler);
764
765 let ret = picker
766 .pick_compaction(&levels, &levels_handler, &mut local_stats)
767 .unwrap();
768 assert_eq!(3, ret.input_levels.len());
769 assert_eq!(6, ret.input_levels[0].table_infos[0].sst_id);
770 }
771}