1use std::cmp::Ordering;
16use std::collections::HashMap;
17
18use risingwave_hummock_sdk::compact_task::CompactTaskAssignment;
19use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
20use risingwave_hummock_sdk::level::InputLevel;
21use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
22
23use crate::hummock::compaction::picker::CompactionInput;
24
25#[derive(Clone, Debug, PartialEq, Eq, Hash)]
26enum TargetContainer {
27 Level(u32),
28 L0SubLevel(u64),
29}
30
31#[derive(Clone, Debug, PartialEq, Eq)]
32struct CompactionTargetRange {
33 target_container: TargetContainer,
34 key_range: KeyRange,
35}
36
37impl CompactionTargetRange {
38 fn from_input_levels(
39 target_level: u32,
40 target_sub_level_id: u64,
41 input_levels: &[InputLevel],
42 ) -> Option<Self> {
43 let key_range = input_key_range(input_levels)?;
44 Some(Self {
45 target_container: target_container(target_level, target_sub_level_id),
46 key_range,
47 })
48 }
49}
50
51#[derive(Clone, Debug, PartialEq, Eq)]
52struct InProgressTargetRange {
53 task_id: HummockCompactionTaskId,
54 key_range: KeyRange,
55}
56
57#[derive(Default)]
58pub struct InProgressCompactionView {
59 ranges_by_target_container: HashMap<TargetContainer, Vec<InProgressTargetRange>>,
60}
61
62impl InProgressCompactionView {
63 pub(crate) fn for_group<'a>(
64 assignments: impl IntoIterator<Item = &'a CompactTaskAssignment>,
65 compaction_group_id: CompactionGroupId,
66 ) -> Self {
67 let mut view = Self::default();
68 for assignment in assignments {
69 let task = &assignment.compact_task;
70 if task.compaction_group_id != compaction_group_id {
71 continue;
72 }
73
74 let Some(target_range) = CompactionTargetRange::from_input_levels(
75 task.target_level,
76 task.target_sub_level_id,
77 &task.input_ssts,
78 ) else {
79 continue;
80 };
81 view.ranges_by_target_container
82 .entry(target_range.target_container)
83 .or_default()
84 .push(InProgressTargetRange {
85 task_id: task.task_id,
86 key_range: target_range.key_range,
87 });
88 }
89
90 for ranges in view.ranges_by_target_container.values_mut() {
92 ranges.sort_by(|left, right| left.key_range.cmp(&right.key_range));
93 assert!(
94 ranges
95 .windows(2)
96 .all(|pair| !pair[0].key_range.sstable_overlap(&pair[1].key_range)),
97 "in-progress target ranges must not overlap within the same target container"
98 );
99 }
100
101 view
102 }
103
104 pub(crate) fn has_conflict_with_input(&self, input: &CompactionInput) -> bool {
105 let target_range = CompactionTargetRange::from_input_levels(
106 input.target_level as u32,
107 input.target_sub_level_id,
108 &input.input_levels,
109 );
110 target_range
111 .as_ref()
112 .and_then(|target_range| self.conflict_target_range(target_range))
113 .is_some()
114 }
115
116 fn conflict_target_range(
117 &self,
118 target_range: &CompactionTargetRange,
119 ) -> Option<HummockCompactionTaskId> {
120 let ranges = self
121 .ranges_by_target_container
122 .get(&target_range.target_container)?;
123 let pos = ranges.partition_point(|range| {
127 range
128 .key_range
129 .compare_right_with(&target_range.key_range.left)
130 == Ordering::Less
131 });
132
133 ranges.get(pos).and_then(|range| {
134 target_range
135 .key_range
136 .sstable_overlap(&range.key_range)
137 .then_some(range.task_id)
138 })
139 }
140}
141
142fn target_container(target_level: u32, target_sub_level_id: u64) -> TargetContainer {
143 match target_level {
144 0 => TargetContainer::L0SubLevel(target_sub_level_id),
145 level => TargetContainer::Level(level),
146 }
147}
148
149fn input_key_range(input_levels: &[InputLevel]) -> Option<KeyRange> {
150 let mut range: Option<KeyRange> = None;
151 for level in input_levels {
152 for sst in &level.table_infos {
153 assert!(
154 !sst.key_range.inf_key_range(),
155 "input SST key range for pending compaction target range must be finite: level_idx={}, sst_id={}",
156 level.level_idx,
157 sst.sst_id,
158 );
159 match range.as_mut() {
160 Some(range) => range.full_key_extend(&sst.key_range),
161 None => range = Some(sst.key_range.clone()),
162 }
163 }
164 }
165 range
166}
167
168#[cfg(test)]
169mod tests {
170 use bytes::Bytes;
171 use risingwave_hummock_sdk::compact_task::{CompactTask, CompactTaskAssignment};
172 use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
173 use risingwave_pb::hummock::LevelType;
174
175 use super::*;
176
177 fn sst(sst_id: u64, left: usize, right: usize) -> SstableInfo {
178 SstableInfoInner {
179 object_id: sst_id.into(),
180 sst_id: sst_id.into(),
181 sst_size: (right - left) as u64,
182 key_range: KeyRange {
183 left: Bytes::from(crate::hummock::test_utils::iterator_test_key_of_epoch(
184 1, left, 1,
185 )),
186 right: Bytes::from(crate::hummock::test_utils::iterator_test_key_of_epoch(
187 1, right, 1,
188 )),
189 right_exclusive: false,
190 },
191 ..Default::default()
192 }
193 .into()
194 }
195
196 fn input_level(
197 level_idx: u32,
198 level_type: LevelType,
199 table_infos: Vec<SstableInfo>,
200 ) -> InputLevel {
201 InputLevel {
202 level_idx,
203 level_type,
204 table_infos,
205 }
206 }
207
208 fn compact_task(
209 task_id: u64,
210 compaction_group_id: u64,
211 target_level: u32,
212 target_sub_level_id: u64,
213 input_ssts: Vec<InputLevel>,
214 ) -> CompactTask {
215 CompactTask {
216 task_id,
217 compaction_group_id: compaction_group_id.into(),
218 target_level,
219 target_sub_level_id,
220 input_ssts,
221 ..Default::default()
222 }
223 }
224
225 fn compaction_input(
226 target_level: usize,
227 target_sub_level_id: u64,
228 input_levels: Vec<InputLevel>,
229 ) -> CompactionInput {
230 CompactionInput {
231 input_levels,
232 target_level,
233 target_sub_level_id,
234 ..Default::default()
235 }
236 }
237
238 fn assignment(task: CompactTask) -> CompactTaskAssignment {
239 CompactTaskAssignment {
240 compact_task: task,
241 context_id: 1.into(),
242 }
243 }
244
245 fn view_for_group(
246 tasks: Vec<CompactTask>,
247 compaction_group_id: u64,
248 ) -> InProgressCompactionView {
249 let assignments: Vec<_> = tasks.into_iter().map(assignment).collect();
250 InProgressCompactionView::for_group(&assignments, compaction_group_id.into())
251 }
252
253 #[test]
254 fn target_range_uses_apply_target_container() {
255 for (target_level, target_sub_level_id, input_levels, expected_container) in [
256 (
257 4,
258 0,
259 vec![
260 input_level(0, LevelType::Nonoverlapping, vec![sst(2, 10, 20)]),
261 input_level(4, LevelType::Nonoverlapping, vec![sst(1, 1, 5)]),
262 ],
263 TargetContainer::Level(4),
264 ),
265 (
266 0,
267 42,
268 vec![input_level(
269 0,
270 LevelType::Nonoverlapping,
271 vec![sst(1, 2, 4)],
272 )],
273 TargetContainer::L0SubLevel(42),
274 ),
275 (
276 0,
277 42,
278 vec![input_level(0, LevelType::Overlapping, vec![sst(1, 2, 4)])],
279 TargetContainer::L0SubLevel(42),
280 ),
281 (
282 0,
283 42,
284 vec![
285 input_level(0, LevelType::Nonoverlapping, vec![sst(1, 2, 4)]),
286 input_level(0, LevelType::Overlapping, vec![sst(2, 5, 7)]),
287 ],
288 TargetContainer::L0SubLevel(42),
289 ),
290 ] {
291 let range = CompactionTargetRange::from_input_levels(
292 target_level,
293 target_sub_level_id,
294 &input_levels,
295 )
296 .unwrap();
297
298 assert_eq!(range.target_container, expected_container);
299 }
300 }
301
302 #[test]
303 #[should_panic(
304 expected = "input SST key range for pending compaction target range must be finite"
305 )]
306 fn target_range_rejects_infinite_input_sst_key_range() {
307 let _ = CompactionTargetRange::from_input_levels(
308 4,
309 0,
310 &[input_level(
311 0,
312 LevelType::Nonoverlapping,
313 vec![
314 SstableInfoInner {
315 object_id: 1.into(),
316 sst_id: 1.into(),
317 key_range: KeyRange::inf(),
318 ..Default::default()
319 }
320 .into(),
321 ],
322 )],
323 );
324 }
325
326 #[test]
327 fn target_conflict_requires_same_container_and_overlapping_key_range() {
328 let in_progress = view_for_group(
329 vec![compact_task(
330 7,
331 9,
332 4,
333 0,
334 vec![input_level(
335 0,
336 LevelType::Nonoverlapping,
337 vec![sst(1, 10, 20)],
338 )],
339 )],
340 9,
341 );
342 let overlapping = CompactionTargetRange::from_input_levels(
343 4,
344 0,
345 &[input_level(
346 0,
347 LevelType::Nonoverlapping,
348 vec![sst(2, 15, 25)],
349 )],
350 );
351 let different_level = CompactionTargetRange::from_input_levels(
352 5,
353 0,
354 &[input_level(
355 0,
356 LevelType::Nonoverlapping,
357 vec![sst(3, 15, 25)],
358 )],
359 );
360 let disjoint = CompactionTargetRange::from_input_levels(
361 4,
362 0,
363 &[input_level(
364 0,
365 LevelType::Nonoverlapping,
366 vec![sst(4, 30, 40)],
367 )],
368 );
369 let touching_right_boundary = CompactionTargetRange::from_input_levels(
370 4,
371 0,
372 &[input_level(
373 0,
374 LevelType::Nonoverlapping,
375 vec![sst(5, 20, 30)],
376 )],
377 );
378
379 assert_eq!(
380 in_progress.conflict_target_range(&overlapping.unwrap()),
381 Some(7)
382 );
383 assert_eq!(
384 in_progress.conflict_target_range(&different_level.unwrap()),
385 None
386 );
387 assert_eq!(in_progress.conflict_target_range(&disjoint.unwrap()), None);
388 assert_eq!(
389 in_progress.conflict_target_range(&touching_right_boundary.unwrap()),
390 Some(7)
391 );
392 }
393
394 #[test]
395 fn target_conflict_from_input_ignores_skip_policy() {
396 let in_progress = view_for_group(
397 vec![compact_task(
398 7,
399 9,
400 4,
401 0,
402 vec![input_level(
403 0,
404 LevelType::Nonoverlapping,
405 vec![sst(1, 10, 20)],
406 )],
407 )],
408 9,
409 );
410 let mut input = compaction_input(
411 4,
412 0,
413 vec![input_level(
414 0,
415 LevelType::Nonoverlapping,
416 vec![sst(2, 15, 25)],
417 )],
418 );
419
420 assert!(in_progress.has_conflict_with_input(&input));
421 input.skip_target_range_conflict_check = true;
422
423 assert!(in_progress.has_conflict_with_input(&input));
424 }
425
426 #[test]
427 fn target_conflict_respects_right_exclusive_boundary() {
428 let in_progress = view_for_group(
429 vec![compact_task(
430 7,
431 9,
432 4,
433 0,
434 vec![input_level(
435 0,
436 LevelType::Nonoverlapping,
437 vec![
438 SstableInfoInner {
439 object_id: 1.into(),
440 sst_id: 1.into(),
441 sst_size: 10,
442 key_range: KeyRange {
443 left: Bytes::from(
444 crate::hummock::test_utils::iterator_test_key_of_epoch(
445 1, 10, 1,
446 ),
447 ),
448 right: Bytes::from(
449 crate::hummock::test_utils::iterator_test_key_of_epoch(
450 1, 20, 1,
451 ),
452 ),
453 right_exclusive: true,
454 },
455 ..Default::default()
456 }
457 .into(),
458 ],
459 )],
460 )],
461 9,
462 );
463 let touching_exclusive_boundary = CompactionTargetRange::from_input_levels(
464 4,
465 0,
466 &[input_level(
467 0,
468 LevelType::Nonoverlapping,
469 vec![sst(2, 20, 30)],
470 )],
471 );
472
473 assert_eq!(
474 in_progress.conflict_target_range(&touching_exclusive_boundary.unwrap()),
475 None
476 );
477 }
478
479 #[test]
480 fn target_conflict_uses_input_key_range_envelope() {
481 let in_progress = view_for_group(
482 vec![compact_task(
483 7,
484 9,
485 4,
486 0,
487 vec![input_level(
488 0,
489 LevelType::Nonoverlapping,
490 vec![sst(1, 10, 20), sst(2, 40, 50)],
491 )],
492 )],
493 9,
494 );
495 let gap_inside_input_envelope = CompactionTargetRange::from_input_levels(
496 4,
497 0,
498 &[input_level(
499 0,
500 LevelType::Nonoverlapping,
501 vec![sst(3, 25, 35)],
502 )],
503 );
504
505 assert_eq!(
506 in_progress.conflict_target_range(&gap_inside_input_envelope.unwrap()),
507 Some(7)
508 );
509 }
510
511 #[test]
512 fn target_conflict_finds_later_range_in_same_container() {
513 let in_progress = view_for_group(
514 vec![
515 compact_task(
516 7,
517 9,
518 4,
519 0,
520 vec![input_level(
521 0,
522 LevelType::Nonoverlapping,
523 vec![sst(1, 10, 20)],
524 )],
525 ),
526 compact_task(
527 8,
528 9,
529 4,
530 0,
531 vec![input_level(
532 0,
533 LevelType::Nonoverlapping,
534 vec![sst(2, 40, 50)],
535 )],
536 ),
537 compact_task(
538 9,
539 9,
540 4,
541 0,
542 vec![input_level(
543 0,
544 LevelType::Nonoverlapping,
545 vec![sst(3, 70, 80)],
546 )],
547 ),
548 ],
549 9,
550 );
551 let overlapping_later = CompactionTargetRange::from_input_levels(
552 4,
553 0,
554 &[input_level(
555 0,
556 LevelType::Nonoverlapping,
557 vec![sst(4, 45, 46)],
558 )],
559 );
560 let disjoint_gap = CompactionTargetRange::from_input_levels(
561 4,
562 0,
563 &[input_level(
564 0,
565 LevelType::Nonoverlapping,
566 vec![sst(5, 25, 35)],
567 )],
568 );
569
570 assert_eq!(
571 in_progress.conflict_target_range(&overlapping_later.unwrap()),
572 Some(8)
573 );
574 assert_eq!(
575 in_progress.conflict_target_range(&disjoint_gap.unwrap()),
576 None
577 );
578 }
579
580 #[test]
581 fn target_conflict_checks_non_overlapping_l0_sub_level_container() {
582 let in_progress = view_for_group(
583 vec![compact_task(
584 7,
585 9,
586 0,
587 42,
588 vec![input_level(
589 0,
590 LevelType::Nonoverlapping,
591 vec![sst(1, 10, 20)],
592 )],
593 )],
594 9,
595 );
596 let same_sub_level = CompactionTargetRange::from_input_levels(
597 0,
598 42,
599 &[input_level(
600 0,
601 LevelType::Nonoverlapping,
602 vec![sst(2, 15, 25)],
603 )],
604 );
605 let different_sub_level = CompactionTargetRange::from_input_levels(
606 0,
607 43,
608 &[input_level(
609 0,
610 LevelType::Nonoverlapping,
611 vec![sst(3, 15, 25)],
612 )],
613 );
614
615 assert_eq!(
616 in_progress.conflict_target_range(&same_sub_level.unwrap()),
617 Some(7)
618 );
619 assert_eq!(
620 in_progress.conflict_target_range(&different_sub_level.unwrap()),
621 None
622 );
623 }
624
625 #[test]
626 #[should_panic(expected = "in-progress target ranges must not overlap")]
627 fn assignment_projection_rejects_overlapping_target_ranges() {
628 let assignments = vec![
629 assignment(compact_task(
630 7,
631 9,
632 4,
633 0,
634 vec![input_level(
635 0,
636 LevelType::Nonoverlapping,
637 vec![sst(1, 10, 20)],
638 )],
639 )),
640 assignment(compact_task(
641 8,
642 9,
643 4,
644 0,
645 vec![input_level(
646 0,
647 LevelType::Nonoverlapping,
648 vec![sst(2, 15, 25)],
649 )],
650 )),
651 ];
652
653 let _ = InProgressCompactionView::for_group(&assignments, 9.into());
654 }
655
656 #[test]
657 fn assignment_projection_filters_by_compaction_group_and_target_range() {
658 let group_9_task = compact_task(
659 7,
660 9,
661 4,
662 0,
663 vec![input_level(
664 0,
665 LevelType::Nonoverlapping,
666 vec![sst(1, 10, 20)],
667 )],
668 );
669 let group_10_task = compact_task(
670 8,
671 10,
672 4,
673 0,
674 vec![input_level(
675 0,
676 LevelType::Nonoverlapping,
677 vec![sst(2, 30, 40)],
678 )],
679 );
680 let group_9_l0_task = compact_task(
681 9,
682 9,
683 0,
684 0,
685 vec![input_level(0, LevelType::Overlapping, vec![sst(3, 50, 60)])],
686 );
687 let assignments = vec![
688 assignment(group_9_task),
689 assignment(group_10_task),
690 assignment(group_9_l0_task),
691 ];
692
693 let projection = InProgressCompactionView::for_group(&assignments, 9.into());
694 let group_9_range = CompactionTargetRange::from_input_levels(
695 4,
696 0,
697 &[input_level(
698 0,
699 LevelType::Nonoverlapping,
700 vec![sst(4, 15, 16)],
701 )],
702 );
703 let group_10_range = CompactionTargetRange::from_input_levels(
704 4,
705 0,
706 &[input_level(
707 0,
708 LevelType::Nonoverlapping,
709 vec![sst(5, 35, 36)],
710 )],
711 );
712 let group_9_l0_range = CompactionTargetRange::from_input_levels(
713 0,
714 0,
715 &[input_level(0, LevelType::Overlapping, vec![sst(6, 55, 56)])],
716 );
717
718 assert_eq!(
719 projection.conflict_target_range(&group_9_range.unwrap()),
720 Some(7)
721 );
722 assert_eq!(
723 projection.conflict_target_range(&group_10_range.unwrap()),
724 None
725 );
726 assert_eq!(
727 projection.conflict_target_range(&group_9_l0_range.unwrap()),
728 Some(9)
729 );
730 }
731}