Skip to main content

risingwave_meta/hummock/compaction/
in_progress_compaction.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashMap;
17
18use risingwave_hummock_sdk::compact_task::CompactTaskAssignment;
19use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
20use risingwave_hummock_sdk::level::InputLevel;
21use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
22
23use crate::hummock::compaction::picker::CompactionInput;
24
25#[derive(Clone, Debug, PartialEq, Eq, Hash)]
26enum TargetContainer {
27    Level(u32),
28    L0SubLevel(u64),
29}
30
31#[derive(Clone, Debug, PartialEq, Eq)]
32struct CompactionTargetRange {
33    target_container: TargetContainer,
34    key_range: KeyRange,
35}
36
37impl CompactionTargetRange {
38    fn from_input_levels(
39        target_level: u32,
40        target_sub_level_id: u64,
41        input_levels: &[InputLevel],
42    ) -> Option<Self> {
43        let key_range = input_key_range(input_levels)?;
44        Some(Self {
45            target_container: target_container(target_level, target_sub_level_id),
46            key_range,
47        })
48    }
49}
50
51#[derive(Clone, Debug, PartialEq, Eq)]
52struct InProgressTargetRange {
53    task_id: HummockCompactionTaskId,
54    key_range: KeyRange,
55}
56
57#[derive(Default)]
58pub struct InProgressCompactionView {
59    ranges_by_target_container: HashMap<TargetContainer, Vec<InProgressTargetRange>>,
60}
61
62impl InProgressCompactionView {
63    pub(crate) fn for_group<'a>(
64        assignments: impl IntoIterator<Item = &'a CompactTaskAssignment>,
65        compaction_group_id: CompactionGroupId,
66    ) -> Self {
67        let mut view = Self::default();
68        for assignment in assignments {
69            let task = &assignment.compact_task;
70            if task.compaction_group_id != compaction_group_id {
71                continue;
72            }
73
74            let Some(target_range) = CompactionTargetRange::from_input_levels(
75                task.target_level,
76                task.target_sub_level_id,
77                &task.input_ssts,
78            ) else {
79                continue;
80            };
81            view.ranges_by_target_container
82                .entry(target_range.target_container)
83                .or_default()
84                .push(InProgressTargetRange {
85                    task_id: task.task_id,
86                    key_range: target_range.key_range,
87                });
88        }
89
90        // Finalize each target bucket for binary-search based overlap checks.
91        for ranges in view.ranges_by_target_container.values_mut() {
92            ranges.sort_by(|left, right| left.key_range.cmp(&right.key_range));
93            assert!(
94                ranges
95                    .windows(2)
96                    .all(|pair| !pair[0].key_range.sstable_overlap(&pair[1].key_range)),
97                "in-progress target ranges must not overlap within the same target container"
98            );
99        }
100
101        view
102    }
103
104    pub(crate) fn has_conflict_with_input(&self, input: &CompactionInput) -> bool {
105        let target_range = CompactionTargetRange::from_input_levels(
106            input.target_level as u32,
107            input.target_sub_level_id,
108            &input.input_levels,
109        );
110        target_range
111            .as_ref()
112            .and_then(|target_range| self.conflict_target_range(target_range))
113            .is_some()
114    }
115
116    fn conflict_target_range(
117        &self,
118        target_range: &CompactionTargetRange,
119    ) -> Option<HummockCompactionTaskId> {
120        let ranges = self
121            .ranges_by_target_container
122            .get(&target_range.target_container)?;
123        // Follow the range overlap strategy: `compare_right_with` skips ranges that end before the
124        // candidate, and `sstable_overlap` performs the final right-exclusive-aware check. Ranges
125        // in the same target container are non-overlapping, so the first remaining range is enough.
126        let pos = ranges.partition_point(|range| {
127            range
128                .key_range
129                .compare_right_with(&target_range.key_range.left)
130                == Ordering::Less
131        });
132
133        ranges.get(pos).and_then(|range| {
134            target_range
135                .key_range
136                .sstable_overlap(&range.key_range)
137                .then_some(range.task_id)
138        })
139    }
140}
141
142fn target_container(target_level: u32, target_sub_level_id: u64) -> TargetContainer {
143    match target_level {
144        0 => TargetContainer::L0SubLevel(target_sub_level_id),
145        level => TargetContainer::Level(level),
146    }
147}
148
149fn input_key_range(input_levels: &[InputLevel]) -> Option<KeyRange> {
150    let mut range: Option<KeyRange> = None;
151    for level in input_levels {
152        for sst in &level.table_infos {
153            assert!(
154                !sst.key_range.inf_key_range(),
155                "input SST key range for pending compaction target range must be finite: level_idx={}, sst_id={}",
156                level.level_idx,
157                sst.sst_id,
158            );
159            match range.as_mut() {
160                Some(range) => range.full_key_extend(&sst.key_range),
161                None => range = Some(sst.key_range.clone()),
162            }
163        }
164    }
165    range
166}
167
168#[cfg(test)]
169mod tests {
170    use bytes::Bytes;
171    use risingwave_hummock_sdk::compact_task::{CompactTask, CompactTaskAssignment};
172    use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
173    use risingwave_pb::hummock::LevelType;
174
175    use super::*;
176
177    fn sst(sst_id: u64, left: usize, right: usize) -> SstableInfo {
178        SstableInfoInner {
179            object_id: sst_id.into(),
180            sst_id: sst_id.into(),
181            sst_size: (right - left) as u64,
182            key_range: KeyRange {
183                left: Bytes::from(crate::hummock::test_utils::iterator_test_key_of_epoch(
184                    1, left, 1,
185                )),
186                right: Bytes::from(crate::hummock::test_utils::iterator_test_key_of_epoch(
187                    1, right, 1,
188                )),
189                right_exclusive: false,
190            },
191            ..Default::default()
192        }
193        .into()
194    }
195
196    fn input_level(
197        level_idx: u32,
198        level_type: LevelType,
199        table_infos: Vec<SstableInfo>,
200    ) -> InputLevel {
201        InputLevel {
202            level_idx,
203            level_type,
204            table_infos,
205        }
206    }
207
208    fn compact_task(
209        task_id: u64,
210        compaction_group_id: u64,
211        target_level: u32,
212        target_sub_level_id: u64,
213        input_ssts: Vec<InputLevel>,
214    ) -> CompactTask {
215        CompactTask {
216            task_id,
217            compaction_group_id: compaction_group_id.into(),
218            target_level,
219            target_sub_level_id,
220            input_ssts,
221            ..Default::default()
222        }
223    }
224
225    fn compaction_input(
226        target_level: usize,
227        target_sub_level_id: u64,
228        input_levels: Vec<InputLevel>,
229    ) -> CompactionInput {
230        CompactionInput {
231            input_levels,
232            target_level,
233            target_sub_level_id,
234            ..Default::default()
235        }
236    }
237
238    fn assignment(task: CompactTask) -> CompactTaskAssignment {
239        CompactTaskAssignment {
240            compact_task: task,
241            context_id: 1.into(),
242        }
243    }
244
245    fn view_for_group(
246        tasks: Vec<CompactTask>,
247        compaction_group_id: u64,
248    ) -> InProgressCompactionView {
249        let assignments: Vec<_> = tasks.into_iter().map(assignment).collect();
250        InProgressCompactionView::for_group(&assignments, compaction_group_id.into())
251    }
252
253    #[test]
254    fn target_range_uses_apply_target_container() {
255        for (target_level, target_sub_level_id, input_levels, expected_container) in [
256            (
257                4,
258                0,
259                vec![
260                    input_level(0, LevelType::Nonoverlapping, vec![sst(2, 10, 20)]),
261                    input_level(4, LevelType::Nonoverlapping, vec![sst(1, 1, 5)]),
262                ],
263                TargetContainer::Level(4),
264            ),
265            (
266                0,
267                42,
268                vec![input_level(
269                    0,
270                    LevelType::Nonoverlapping,
271                    vec![sst(1, 2, 4)],
272                )],
273                TargetContainer::L0SubLevel(42),
274            ),
275            (
276                0,
277                42,
278                vec![input_level(0, LevelType::Overlapping, vec![sst(1, 2, 4)])],
279                TargetContainer::L0SubLevel(42),
280            ),
281            (
282                0,
283                42,
284                vec![
285                    input_level(0, LevelType::Nonoverlapping, vec![sst(1, 2, 4)]),
286                    input_level(0, LevelType::Overlapping, vec![sst(2, 5, 7)]),
287                ],
288                TargetContainer::L0SubLevel(42),
289            ),
290        ] {
291            let range = CompactionTargetRange::from_input_levels(
292                target_level,
293                target_sub_level_id,
294                &input_levels,
295            )
296            .unwrap();
297
298            assert_eq!(range.target_container, expected_container);
299        }
300    }
301
302    #[test]
303    #[should_panic(
304        expected = "input SST key range for pending compaction target range must be finite"
305    )]
306    fn target_range_rejects_infinite_input_sst_key_range() {
307        let _ = CompactionTargetRange::from_input_levels(
308            4,
309            0,
310            &[input_level(
311                0,
312                LevelType::Nonoverlapping,
313                vec![
314                    SstableInfoInner {
315                        object_id: 1.into(),
316                        sst_id: 1.into(),
317                        key_range: KeyRange::inf(),
318                        ..Default::default()
319                    }
320                    .into(),
321                ],
322            )],
323        );
324    }
325
326    #[test]
327    fn target_conflict_requires_same_container_and_overlapping_key_range() {
328        let in_progress = view_for_group(
329            vec![compact_task(
330                7,
331                9,
332                4,
333                0,
334                vec![input_level(
335                    0,
336                    LevelType::Nonoverlapping,
337                    vec![sst(1, 10, 20)],
338                )],
339            )],
340            9,
341        );
342        let overlapping = CompactionTargetRange::from_input_levels(
343            4,
344            0,
345            &[input_level(
346                0,
347                LevelType::Nonoverlapping,
348                vec![sst(2, 15, 25)],
349            )],
350        );
351        let different_level = CompactionTargetRange::from_input_levels(
352            5,
353            0,
354            &[input_level(
355                0,
356                LevelType::Nonoverlapping,
357                vec![sst(3, 15, 25)],
358            )],
359        );
360        let disjoint = CompactionTargetRange::from_input_levels(
361            4,
362            0,
363            &[input_level(
364                0,
365                LevelType::Nonoverlapping,
366                vec![sst(4, 30, 40)],
367            )],
368        );
369        let touching_right_boundary = CompactionTargetRange::from_input_levels(
370            4,
371            0,
372            &[input_level(
373                0,
374                LevelType::Nonoverlapping,
375                vec![sst(5, 20, 30)],
376            )],
377        );
378
379        assert_eq!(
380            in_progress.conflict_target_range(&overlapping.unwrap()),
381            Some(7)
382        );
383        assert_eq!(
384            in_progress.conflict_target_range(&different_level.unwrap()),
385            None
386        );
387        assert_eq!(in_progress.conflict_target_range(&disjoint.unwrap()), None);
388        assert_eq!(
389            in_progress.conflict_target_range(&touching_right_boundary.unwrap()),
390            Some(7)
391        );
392    }
393
394    #[test]
395    fn target_conflict_from_input_ignores_skip_policy() {
396        let in_progress = view_for_group(
397            vec![compact_task(
398                7,
399                9,
400                4,
401                0,
402                vec![input_level(
403                    0,
404                    LevelType::Nonoverlapping,
405                    vec![sst(1, 10, 20)],
406                )],
407            )],
408            9,
409        );
410        let mut input = compaction_input(
411            4,
412            0,
413            vec![input_level(
414                0,
415                LevelType::Nonoverlapping,
416                vec![sst(2, 15, 25)],
417            )],
418        );
419
420        assert!(in_progress.has_conflict_with_input(&input));
421        input.skip_target_range_conflict_check = true;
422
423        assert!(in_progress.has_conflict_with_input(&input));
424    }
425
426    #[test]
427    fn target_conflict_respects_right_exclusive_boundary() {
428        let in_progress = view_for_group(
429            vec![compact_task(
430                7,
431                9,
432                4,
433                0,
434                vec![input_level(
435                    0,
436                    LevelType::Nonoverlapping,
437                    vec![
438                        SstableInfoInner {
439                            object_id: 1.into(),
440                            sst_id: 1.into(),
441                            sst_size: 10,
442                            key_range: KeyRange {
443                                left: Bytes::from(
444                                    crate::hummock::test_utils::iterator_test_key_of_epoch(
445                                        1, 10, 1,
446                                    ),
447                                ),
448                                right: Bytes::from(
449                                    crate::hummock::test_utils::iterator_test_key_of_epoch(
450                                        1, 20, 1,
451                                    ),
452                                ),
453                                right_exclusive: true,
454                            },
455                            ..Default::default()
456                        }
457                        .into(),
458                    ],
459                )],
460            )],
461            9,
462        );
463        let touching_exclusive_boundary = CompactionTargetRange::from_input_levels(
464            4,
465            0,
466            &[input_level(
467                0,
468                LevelType::Nonoverlapping,
469                vec![sst(2, 20, 30)],
470            )],
471        );
472
473        assert_eq!(
474            in_progress.conflict_target_range(&touching_exclusive_boundary.unwrap()),
475            None
476        );
477    }
478
479    #[test]
480    fn target_conflict_uses_input_key_range_envelope() {
481        let in_progress = view_for_group(
482            vec![compact_task(
483                7,
484                9,
485                4,
486                0,
487                vec![input_level(
488                    0,
489                    LevelType::Nonoverlapping,
490                    vec![sst(1, 10, 20), sst(2, 40, 50)],
491                )],
492            )],
493            9,
494        );
495        let gap_inside_input_envelope = CompactionTargetRange::from_input_levels(
496            4,
497            0,
498            &[input_level(
499                0,
500                LevelType::Nonoverlapping,
501                vec![sst(3, 25, 35)],
502            )],
503        );
504
505        assert_eq!(
506            in_progress.conflict_target_range(&gap_inside_input_envelope.unwrap()),
507            Some(7)
508        );
509    }
510
511    #[test]
512    fn target_conflict_finds_later_range_in_same_container() {
513        let in_progress = view_for_group(
514            vec![
515                compact_task(
516                    7,
517                    9,
518                    4,
519                    0,
520                    vec![input_level(
521                        0,
522                        LevelType::Nonoverlapping,
523                        vec![sst(1, 10, 20)],
524                    )],
525                ),
526                compact_task(
527                    8,
528                    9,
529                    4,
530                    0,
531                    vec![input_level(
532                        0,
533                        LevelType::Nonoverlapping,
534                        vec![sst(2, 40, 50)],
535                    )],
536                ),
537                compact_task(
538                    9,
539                    9,
540                    4,
541                    0,
542                    vec![input_level(
543                        0,
544                        LevelType::Nonoverlapping,
545                        vec![sst(3, 70, 80)],
546                    )],
547                ),
548            ],
549            9,
550        );
551        let overlapping_later = CompactionTargetRange::from_input_levels(
552            4,
553            0,
554            &[input_level(
555                0,
556                LevelType::Nonoverlapping,
557                vec![sst(4, 45, 46)],
558            )],
559        );
560        let disjoint_gap = CompactionTargetRange::from_input_levels(
561            4,
562            0,
563            &[input_level(
564                0,
565                LevelType::Nonoverlapping,
566                vec![sst(5, 25, 35)],
567            )],
568        );
569
570        assert_eq!(
571            in_progress.conflict_target_range(&overlapping_later.unwrap()),
572            Some(8)
573        );
574        assert_eq!(
575            in_progress.conflict_target_range(&disjoint_gap.unwrap()),
576            None
577        );
578    }
579
580    #[test]
581    fn target_conflict_checks_non_overlapping_l0_sub_level_container() {
582        let in_progress = view_for_group(
583            vec![compact_task(
584                7,
585                9,
586                0,
587                42,
588                vec![input_level(
589                    0,
590                    LevelType::Nonoverlapping,
591                    vec![sst(1, 10, 20)],
592                )],
593            )],
594            9,
595        );
596        let same_sub_level = CompactionTargetRange::from_input_levels(
597            0,
598            42,
599            &[input_level(
600                0,
601                LevelType::Nonoverlapping,
602                vec![sst(2, 15, 25)],
603            )],
604        );
605        let different_sub_level = CompactionTargetRange::from_input_levels(
606            0,
607            43,
608            &[input_level(
609                0,
610                LevelType::Nonoverlapping,
611                vec![sst(3, 15, 25)],
612            )],
613        );
614
615        assert_eq!(
616            in_progress.conflict_target_range(&same_sub_level.unwrap()),
617            Some(7)
618        );
619        assert_eq!(
620            in_progress.conflict_target_range(&different_sub_level.unwrap()),
621            None
622        );
623    }
624
625    #[test]
626    #[should_panic(expected = "in-progress target ranges must not overlap")]
627    fn assignment_projection_rejects_overlapping_target_ranges() {
628        let assignments = vec![
629            assignment(compact_task(
630                7,
631                9,
632                4,
633                0,
634                vec![input_level(
635                    0,
636                    LevelType::Nonoverlapping,
637                    vec![sst(1, 10, 20)],
638                )],
639            )),
640            assignment(compact_task(
641                8,
642                9,
643                4,
644                0,
645                vec![input_level(
646                    0,
647                    LevelType::Nonoverlapping,
648                    vec![sst(2, 15, 25)],
649                )],
650            )),
651        ];
652
653        let _ = InProgressCompactionView::for_group(&assignments, 9.into());
654    }
655
656    #[test]
657    fn assignment_projection_filters_by_compaction_group_and_target_range() {
658        let group_9_task = compact_task(
659            7,
660            9,
661            4,
662            0,
663            vec![input_level(
664                0,
665                LevelType::Nonoverlapping,
666                vec![sst(1, 10, 20)],
667            )],
668        );
669        let group_10_task = compact_task(
670            8,
671            10,
672            4,
673            0,
674            vec![input_level(
675                0,
676                LevelType::Nonoverlapping,
677                vec![sst(2, 30, 40)],
678            )],
679        );
680        let group_9_l0_task = compact_task(
681            9,
682            9,
683            0,
684            0,
685            vec![input_level(0, LevelType::Overlapping, vec![sst(3, 50, 60)])],
686        );
687        let assignments = vec![
688            assignment(group_9_task),
689            assignment(group_10_task),
690            assignment(group_9_l0_task),
691        ];
692
693        let projection = InProgressCompactionView::for_group(&assignments, 9.into());
694        let group_9_range = CompactionTargetRange::from_input_levels(
695            4,
696            0,
697            &[input_level(
698                0,
699                LevelType::Nonoverlapping,
700                vec![sst(4, 15, 16)],
701            )],
702        );
703        let group_10_range = CompactionTargetRange::from_input_levels(
704            4,
705            0,
706            &[input_level(
707                0,
708                LevelType::Nonoverlapping,
709                vec![sst(5, 35, 36)],
710            )],
711        );
712        let group_9_l0_range = CompactionTargetRange::from_input_levels(
713            0,
714            0,
715            &[input_level(0, LevelType::Overlapping, vec![sst(6, 55, 56)])],
716        );
717
718        assert_eq!(
719            projection.conflict_target_range(&group_9_range.unwrap()),
720            Some(7)
721        );
722        assert_eq!(
723            projection.conflict_target_range(&group_10_range.unwrap()),
724            None
725        );
726        assert_eq!(
727            projection.conflict_target_range(&group_9_l0_range.unwrap()),
728            Some(9)
729        );
730    }
731}