risingwave_hummock_sdk/compaction_group/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod hummock_version_ext;
16
17use parse_display::Display;
18use risingwave_common::catalog::TableId;
19
20use crate::CompactionGroupId;
21
22pub type StateTableId = TableId;
23
24/// A compaction task's `StaticCompactionGroupId` indicates the compaction group that all its input
25/// SSTs belong to.
26#[derive(Display)]
27pub enum StaticCompactionGroupId {
28    /// Create a new compaction group.
29    NewCompactionGroup = 0,
30    /// All shared buffer local compaction task goes to here. Meta service will never see this
31    /// value. Note that currently we've restricted the compaction task's input by `via
32    /// compact_shared_buffer_by_compaction_group`
33    SharedBuffer = 1,
34    /// All states goes to here by default.
35    StateDefault = 2,
36    /// All MVs goes to here.
37    MaterializedView = 3,
38    /// Larger than any `StaticCompactionGroupId`.
39    End = 4,
40}
41
42impl From<StaticCompactionGroupId> for CompactionGroupId {
43    fn from(cg: StaticCompactionGroupId) -> Self {
44        cg as CompactionGroupId
45    }
46}
47
48/// The split will follow the following rules:
49/// 1. Ssts with `split_key` will be split into two separate ssts and their `key_range` will be changed `sst_1`: [`sst.key_range.right`, `split_key`) `sst_2`: [`split_key`, `sst.key_range.right`].
50/// 2. Currently only `vnode` 0 and `vnode` max is supported.
51/// 3. Due to the above rule, `vnode` max will be rewritten as `table_id` + 1, vnode 0
52pub mod group_split {
53    use std::cmp::Ordering;
54    use std::collections::BTreeSet;
55
56    use bytes::Bytes;
57    use risingwave_common::catalog::TableId;
58    use risingwave_common::hash::VirtualNode;
59    use risingwave_pb::hummock::PbLevelType;
60
61    use super::StateTableId;
62    use super::hummock_version_ext::insert_new_sub_level;
63    use crate::key::{FullKey, TableKey};
64    use crate::key_range::KeyRange;
65    use crate::level::{Level, Levels};
66    use crate::sstable_info::SstableInfo;
67    use crate::{HummockEpoch, HummockSstableId, KeyComparator, can_concat};
68
69    // By default, the split key is constructed with vnode = 0 and epoch = MAX, so that we can split table_id to the right group
70    pub fn build_split_key(table_id: StateTableId, vnode: VirtualNode) -> Bytes {
71        build_split_full_key(table_id, vnode).encode().into()
72    }
73
74    /// generate a full key for convenience to get the `table_id` and `vnode`
75    pub fn build_split_full_key(
76        mut table_id: StateTableId,
77        mut vnode: VirtualNode,
78    ) -> FullKey<Vec<u8>> {
79        if VirtualNode::MAX_REPRESENTABLE == vnode {
80            // Modify `table_id` to `next_table_id` to satisfy the `split_to_right`` rule, so that the `table_id`` originally passed in will be split to left.
81            table_id = table_id.as_raw_id().strict_add(1).into();
82            vnode = VirtualNode::ZERO;
83        }
84
85        FullKey::new(
86            table_id,
87            TableKey(vnode.to_be_bytes().to_vec()),
88            HummockEpoch::MAX,
89        )
90    }
91
92    #[derive(Debug, PartialEq, Clone)]
93    pub enum SstSplitType {
94        Left,
95        Right,
96        Both,
97    }
98
99    /// Determine whether the SST needs to be split, and if so, which side to split.
100    pub fn need_to_split(sst: &SstableInfo, split_key: Bytes) -> SstSplitType {
101        let key_range = &sst.key_range;
102        // 1. compare left
103        if KeyComparator::compare_encoded_full_key(&split_key, &key_range.left).is_le() {
104            return SstSplitType::Right;
105        }
106
107        // 2. compare right
108        if key_range.right_exclusive {
109            if KeyComparator::compare_encoded_full_key(&split_key, &key_range.right).is_ge() {
110                return SstSplitType::Left;
111            }
112        } else if KeyComparator::compare_encoded_full_key(&split_key, &key_range.right).is_gt() {
113            return SstSplitType::Left;
114        }
115
116        SstSplitType::Both
117    }
118
119    /// Split the SST into two parts based on the split key.
120    /// The left part is the original SST, and the right part is the new SST.
121    /// The split key is exclusive for the left part and inclusive for the right part.
122    /// The `table_ids` of the new SST are calculated based on the split key.
123    /// e.g.
124    ///  `sst.table_ids` = [1, 2, 3, 4, 5, 6, 7, 8, 9], `split_key` = (`table_id` = 5, vnode = 0)
125    ///  then the result:
126    /// sst1 {
127    ///     `sst_id`: `new_sst_id + 1`,
128    ///     `table_ids`: [1, 2, 3, 4],
129    ///     `key_range`: [left, `split_key`),
130    ///     `sst_size`: `left_size`,
131    /// }
132    /// sst2 {
133    ///    `sst_id`: `new_sst_id`,
134    ///    `table_ids`: [5, 6, 7, 8, 9],
135    ///    `key_range`: [`split_key`, right],
136    ///    `sst_size`: `right_size`,
137    /// }
138    pub fn split_sst(
139        origin_sst_info: SstableInfo,
140        new_sst_id: &mut HummockSstableId,
141        split_key: Bytes,
142        left_size: u64,
143        right_size: u64,
144    ) -> (Option<SstableInfo>, Option<SstableInfo>) {
145        let mut origin_sst_info = origin_sst_info.get_inner();
146        let mut branch_table_info = origin_sst_info.clone();
147        branch_table_info.sst_id = *new_sst_id;
148        *new_sst_id += 1;
149        origin_sst_info.sst_id = *new_sst_id;
150        *new_sst_id += 1;
151
152        let (key_range_l, key_range_r) = {
153            let key_range = &origin_sst_info.key_range;
154            let l = KeyRange {
155                left: key_range.left.clone(),
156                right: split_key.clone(),
157                right_exclusive: true,
158            };
159
160            let r = KeyRange {
161                left: split_key.clone(),
162                right: key_range.right.clone(),
163                right_exclusive: key_range.right_exclusive,
164            };
165
166            (l, r)
167        };
168        let (table_ids_l, table_ids_r) =
169            split_table_ids_with_split_key(&origin_sst_info.table_ids, split_key);
170
171        // rebuild the key_range and size and sstable file size
172        {
173            // origin_sst_info
174            origin_sst_info.key_range = key_range_l;
175            origin_sst_info.sst_size = std::cmp::max(1, left_size);
176            origin_sst_info.table_ids = table_ids_l;
177        }
178
179        {
180            // new sst
181            branch_table_info.key_range = key_range_r;
182            branch_table_info.sst_size = std::cmp::max(1, right_size);
183            branch_table_info.table_ids = table_ids_r;
184        }
185
186        // This function does not make any assumptions about the incoming sst, so add some judgement to ensure that the generated sst meets the restrictions.
187        if origin_sst_info.table_ids.is_empty() {
188            (None, Some(branch_table_info.into()))
189        } else if branch_table_info.table_ids.is_empty() {
190            (Some(origin_sst_info.into()), None)
191        } else if KeyComparator::compare_encoded_full_key(
192            &origin_sst_info.key_range.left,
193            &origin_sst_info.key_range.right,
194        )
195        .is_eq()
196        {
197            // avoid empty key_range of origin_sst
198            (None, Some(branch_table_info.into()))
199        } else {
200            (Some(origin_sst_info.into()), Some(branch_table_info.into()))
201        }
202    }
203
204    /// The old split sst logic with `table_ids`
205    /// This function is used to split the sst into two parts based on the `table_ids`.
206    /// In contrast to `split_sst`, this function does not modify the `key_range` and does not guarantee that the split ssts can be merged, which needs to be guaranteed by the caller.
207    pub fn split_sst_with_table_ids(
208        origin_sst_info: &SstableInfo,
209        new_sst_id: &mut HummockSstableId,
210        old_sst_size: u64,
211        new_sst_size: u64,
212        new_table_ids: Vec<TableId>,
213    ) -> (SstableInfo, SstableInfo) {
214        let mut sst_info = origin_sst_info.get_inner();
215        let mut branch_table_info = sst_info.clone();
216        branch_table_info.sst_id = *new_sst_id;
217        branch_table_info.sst_size = std::cmp::max(1, new_sst_size);
218        *new_sst_id += 1;
219
220        sst_info.sst_id = *new_sst_id;
221        sst_info.sst_size = std::cmp::max(1, old_sst_size);
222        *new_sst_id += 1;
223
224        {
225            // related github.com/risingwavelabs/risingwave/pull/17898/
226            // This is a temporary implementation that will update `table_ids`` based on the new split rule after PR 17898
227            // sst_info.table_ids = vec[1, 2, 3];
228            // new_table_ids = vec[2, 3, 4];
229            // branch_table_info.table_ids = vec[1, 2, 3] ∩ vec[2, 3, 4] = vec[2, 3]
230            let set1: BTreeSet<_> = sst_info.table_ids.iter().cloned().collect();
231            let set2: BTreeSet<_> = new_table_ids.into_iter().collect();
232            let intersection: Vec<_> = set1.intersection(&set2).cloned().collect();
233
234            // Update table_ids
235            branch_table_info.table_ids = intersection;
236            sst_info
237                .table_ids
238                .retain(|table_id| !branch_table_info.table_ids.contains(table_id));
239        }
240
241        (sst_info.into(), branch_table_info.into())
242    }
243
244    // Should avoid split same table_id into two groups
245    pub fn split_table_ids_with_split_key(
246        table_ids: &Vec<TableId>,
247        split_key: Bytes,
248    ) -> (Vec<TableId>, Vec<TableId>) {
249        assert!(table_ids.is_sorted());
250        let split_full_key = FullKey::decode(&split_key);
251        let split_user_key = split_full_key.user_key;
252        let vnode = split_user_key.get_vnode_id();
253        let table_id = split_user_key.table_id;
254        split_table_ids_with_table_id_and_vnode(table_ids, table_id, vnode)
255    }
256
257    pub fn split_table_ids_with_table_id_and_vnode(
258        table_ids: &Vec<TableId>,
259        table_id: StateTableId,
260        vnode: usize,
261    ) -> (Vec<TableId>, Vec<TableId>) {
262        assert!(table_ids.is_sorted());
263        assert_eq!(VirtualNode::ZERO, VirtualNode::from_index(vnode));
264        let pos = table_ids.partition_point(|&id| id < table_id);
265        (table_ids[..pos].to_vec(), table_ids[pos..].to_vec())
266    }
267
268    pub fn get_split_pos(sstables: &Vec<SstableInfo>, split_key: Bytes) -> usize {
269        sstables
270            .partition_point(|sst| {
271                KeyComparator::compare_encoded_full_key(&sst.key_range.left, &split_key).is_lt()
272            })
273            .saturating_sub(1)
274    }
275
276    /// Merge the right levels into the left levels.
277    pub fn merge_levels(left_levels: &mut Levels, right_levels: Levels) {
278        let right_l0 = right_levels.l0;
279
280        let mut max_left_sub_level_id = left_levels
281            .l0
282            .sub_levels
283            .iter()
284            .map(|sub_level| sub_level.sub_level_id + 1)
285            .max()
286            .unwrap_or(0); // If there are no sub levels, the max sub level id is 0.
287        let need_rewrite_right_sub_level_id = max_left_sub_level_id != 0;
288
289        for mut right_sub_level in right_l0.sub_levels {
290            // Rewrtie the sub level id of right sub level to avoid conflict with left sub levels. (conflict level type)
291            // e.g. left sub levels: [0, 1, 2], right sub levels: [0, 1, 2], after rewrite, right sub levels: [3, 4, 5]
292            if need_rewrite_right_sub_level_id {
293                right_sub_level.sub_level_id = max_left_sub_level_id;
294                max_left_sub_level_id += 1;
295            }
296
297            insert_new_sub_level(
298                &mut left_levels.l0,
299                right_sub_level.sub_level_id,
300                right_sub_level.level_type,
301                right_sub_level.table_infos,
302                None,
303            );
304        }
305
306        assert!(
307            left_levels
308                .l0
309                .sub_levels
310                .is_sorted_by_key(|sub_level| sub_level.sub_level_id),
311            "{}",
312            format!("left_levels.l0.sub_levels: {:?}", left_levels.l0.sub_levels)
313        );
314
315        // Reinitialise `vnode_partition_count` to avoid misaligned hierarchies
316        // caused by the merge of different compaction groups.(picker might reject the different `vnode_partition_count` sub_level to compact)
317        left_levels
318            .l0
319            .sub_levels
320            .iter_mut()
321            .for_each(|sub_level| sub_level.vnode_partition_count = 0);
322
323        for (idx, level) in right_levels.levels.into_iter().enumerate() {
324            if level.table_infos.is_empty() {
325                continue;
326            }
327
328            let insert_table_infos = level.table_infos;
329            left_levels.levels[idx].total_file_size += insert_table_infos
330                .iter()
331                .map(|sst| sst.sst_size)
332                .sum::<u64>();
333            left_levels.levels[idx].uncompressed_file_size += insert_table_infos
334                .iter()
335                .map(|sst| sst.uncompressed_file_size)
336                .sum::<u64>();
337
338            left_levels.levels[idx]
339                .table_infos
340                .extend(insert_table_infos);
341            left_levels.levels[idx]
342                .table_infos
343                .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
344            assert!(
345                can_concat(&left_levels.levels[idx].table_infos),
346                "{}",
347                format!(
348                    "left-group {} right-group {} left_levels.levels[{}].table_infos: {:?} level_idx {:?}",
349                    left_levels.group_id,
350                    right_levels.group_id,
351                    idx,
352                    left_levels.levels[idx].table_infos,
353                    left_levels.levels[idx].level_idx
354                )
355            );
356        }
357    }
358
359    // When `insert_hint` is `Ok(idx)`, it means that the sub level `idx` in `target_l0`
360    // will extend these SSTs. When `insert_hint` is `Err(idx)`, it
361    // means that we will add a new sub level `idx` into `target_l0`.
362    pub fn get_sub_level_insert_hint(
363        target_levels: &Vec<Level>,
364        sub_level: &Level,
365    ) -> Result<usize, usize> {
366        for (idx, other) in target_levels.iter().enumerate() {
367            match other.sub_level_id.cmp(&sub_level.sub_level_id) {
368                Ordering::Less => {}
369                Ordering::Equal => {
370                    return Ok(idx);
371                }
372                Ordering::Greater => {
373                    return Err(idx);
374                }
375            }
376        }
377
378        Err(target_levels.len())
379    }
380
381    /// Split the SSTs in the level according to the split key.
382    pub fn split_sst_info_for_level_v2(
383        level: &mut Level,
384        new_sst_id: &mut HummockSstableId,
385        split_key: Bytes,
386    ) -> Vec<SstableInfo> {
387        if level.table_infos.is_empty() {
388            return vec![];
389        }
390
391        let mut insert_table_infos = vec![];
392        if level.level_type == PbLevelType::Overlapping {
393            level.table_infos.retain_mut(|sst| {
394                let sst_split_type = need_to_split(sst, split_key.clone());
395                match sst_split_type {
396                    SstSplitType::Left => true,
397                    SstSplitType::Right => {
398                        insert_table_infos.push(sst.clone());
399                        false
400                    }
401                    SstSplitType::Both => {
402                        let sst_size = sst.sst_size;
403                        if sst_size / 2 == 0 {
404                            tracing::warn!(
405                                id = %sst.sst_id,
406                                object_id = %sst.object_id,
407                                sst_size = sst.sst_size,
408                                file_size = sst.file_size,
409                                "Sstable sst_size is under expected",
410                            );
411                        };
412
413                        let (left, right) = split_sst(
414                            sst.clone(),
415                            new_sst_id,
416                            split_key.clone(),
417                            sst_size / 2,
418                            sst_size / 2,
419                        );
420                        if let Some(branch_sst) = right {
421                            insert_table_infos.push(branch_sst);
422                        }
423
424                        if let Some(s) = left {
425                            *sst = s;
426                            true
427                        } else {
428                            false
429                        }
430                    }
431                }
432            });
433        } else {
434            let pos = get_split_pos(&level.table_infos, split_key.clone());
435            if pos >= level.table_infos.len() {
436                return insert_table_infos;
437            }
438
439            let sst_split_type = need_to_split(&level.table_infos[pos], split_key.clone());
440            match sst_split_type {
441                SstSplitType::Left => {
442                    insert_table_infos.extend_from_slice(&level.table_infos[pos + 1..]);
443                    level.table_infos = level.table_infos[0..=pos].to_vec();
444                }
445                SstSplitType::Right => {
446                    assert_eq!(0, pos);
447                    insert_table_infos.extend_from_slice(&level.table_infos[pos..]); // the sst at pos has been split to the right
448                    level.table_infos.clear();
449                }
450                SstSplitType::Both => {
451                    // split the sst
452                    let sst = level.table_infos[pos].clone();
453                    let sst_size = sst.sst_size;
454                    if sst_size / 2 == 0 {
455                        tracing::warn!(
456                            id = %sst.sst_id,
457                            object_id = %sst.object_id,
458                            sst_size = sst.sst_size,
459                            file_size = sst.file_size,
460                            "Sstable sst_size is under expected",
461                        );
462                    };
463
464                    let (left, right) = split_sst(
465                        sst,
466                        new_sst_id,
467                        split_key.clone(),
468                        sst_size / 2,
469                        sst_size / 2,
470                    );
471
472                    if let Some(branch_sst) = right {
473                        insert_table_infos.push(branch_sst);
474                    }
475
476                    let right_start = pos + 1;
477                    let left_end = pos;
478                    // the sst at pos has been split to both left and right
479                    // the branched sst has been inserted to the `insert_table_infos`
480                    insert_table_infos.extend_from_slice(&level.table_infos[right_start..]);
481                    level.table_infos = level.table_infos[0..=left_end].to_vec();
482                    if let Some(origin_sst) = left {
483                        // replace the origin sst with the left part
484                        level.table_infos[left_end] = origin_sst;
485                    } else {
486                        level.table_infos.pop();
487                    }
488                }
489            };
490        }
491
492        insert_table_infos
493    }
494}
495
496#[cfg(test)]
497mod tests {
498    use itertools::Itertools;
499    use risingwave_common::catalog::TableId;
500    use risingwave_common::hash::VirtualNode;
501
502    #[test]
503    fn test_split_table_ids() {
504        let table_ids = [1, 2, 3, 4, 5, 6, 7, 8, 9]
505            .into_iter()
506            .map(Into::<TableId>::into)
507            .collect();
508        let (left, right) = super::group_split::split_table_ids_with_table_id_and_vnode(
509            &table_ids,
510            5.into(),
511            VirtualNode::ZERO.to_index(),
512        );
513        assert_eq!(
514            left,
515            [1, 2, 3, 4]
516                .into_iter()
517                .map(Into::<TableId>::into)
518                .collect_vec()
519        );
520        assert_eq!(
521            right,
522            [5, 6, 7, 8, 9]
523                .into_iter()
524                .map(Into::<TableId>::into)
525                .collect_vec()
526        );
527
528        // test table_id not in the table_ids
529
530        let (left, right) = super::group_split::split_table_ids_with_table_id_and_vnode(
531            &table_ids,
532            10.into(),
533            VirtualNode::ZERO.to_index(),
534        );
535        assert_eq!(
536            left,
537            [1, 2, 3, 4, 5, 6, 7, 8, 9]
538                .into_iter()
539                .map(Into::<TableId>::into)
540                .collect_vec()
541        );
542        assert!(right.is_empty());
543
544        let (left, right) = super::group_split::split_table_ids_with_table_id_and_vnode(
545            &table_ids,
546            0.into(),
547            VirtualNode::ZERO.to_index(),
548        );
549
550        assert!(left.is_empty());
551        assert_eq!(
552            right,
553            [1, 2, 3, 4, 5, 6, 7, 8, 9]
554                .into_iter()
555                .map(Into::<TableId>::into)
556                .collect_vec()
557        );
558    }
559
560    #[test]
561    fn test_split_table_ids_with_split_key() {
562        let table_ids = [1, 2, 3, 4, 5, 6, 7, 8, 9]
563            .into_iter()
564            .map(Into::<TableId>::into)
565            .collect();
566        let split_key = super::group_split::build_split_key(5.into(), VirtualNode::ZERO);
567        let (left, right) =
568            super::group_split::split_table_ids_with_split_key(&table_ids, split_key);
569        assert_eq!(
570            left,
571            [1, 2, 3, 4]
572                .into_iter()
573                .map(Into::<TableId>::into)
574                .collect_vec()
575        );
576        assert_eq!(
577            right,
578            [5, 6, 7, 8, 9]
579                .into_iter()
580                .map(Into::<TableId>::into)
581                .collect_vec()
582        );
583    }
584}