risingwave_hummock_sdk/compaction_group/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod hummock_version_ext;
16
17use parse_display::Display;
18
19use crate::CompactionGroupId;
20
21pub type StateTableId = u32;
22
23/// A compaction task's `StaticCompactionGroupId` indicates the compaction group that all its input
24/// SSTs belong to.
25#[derive(Display)]
26pub enum StaticCompactionGroupId {
27    /// Create a new compaction group.
28    NewCompactionGroup = 0,
29    /// All shared buffer local compaction task goes to here. Meta service will never see this
30    /// value. Note that currently we've restricted the compaction task's input by `via
31    /// compact_shared_buffer_by_compaction_group`
32    SharedBuffer = 1,
33    /// All states goes to here by default.
34    StateDefault = 2,
35    /// All MVs goes to here.
36    MaterializedView = 3,
37    /// Larger than any `StaticCompactionGroupId`.
38    End = 4,
39}
40
41impl From<StaticCompactionGroupId> for CompactionGroupId {
42    fn from(cg: StaticCompactionGroupId) -> Self {
43        cg as CompactionGroupId
44    }
45}
46
47/// The split will follow the following rules:
48/// 1. Ssts with `split_key` will be split into two separate ssts and their `key_range` will be changed `sst_1`: [`sst.key_range.right`, `split_key`) `sst_2`: [`split_key`, `sst.key_range.right`].
49/// 2. Currently only `vnode` 0 and `vnode` max is supported.
50/// 3. Due to the above rule, `vnode` max will be rewritten as `table_id` + 1, vnode 0
51pub mod group_split {
52    use std::cmp::Ordering;
53    use std::collections::BTreeSet;
54
55    use bytes::Bytes;
56    use risingwave_common::catalog::TableId;
57    use risingwave_common::hash::VirtualNode;
58    use risingwave_pb::hummock::PbLevelType;
59
60    use super::StateTableId;
61    use super::hummock_version_ext::insert_new_sub_level;
62    use crate::key::{FullKey, TableKey};
63    use crate::key_range::KeyRange;
64    use crate::level::{Level, Levels};
65    use crate::sstable_info::SstableInfo;
66    use crate::{HummockEpoch, KeyComparator, can_concat};
67
68    // By default, the split key is constructed with vnode = 0 and epoch = MAX, so that we can split table_id to the right group
69    pub fn build_split_key(table_id: StateTableId, vnode: VirtualNode) -> Bytes {
70        build_split_full_key(table_id, vnode).encode().into()
71    }
72
73    /// generate a full key for convenience to get the `table_id` and `vnode`
74    pub fn build_split_full_key(
75        mut table_id: StateTableId,
76        mut vnode: VirtualNode,
77    ) -> FullKey<Vec<u8>> {
78        if VirtualNode::MAX_REPRESENTABLE == vnode {
79            // Modify `table_id` to `next_table_id` to satisfy the `split_to_right`` rule, so that the `table_id`` originally passed in will be split to left.
80            table_id = table_id.strict_add(1);
81            vnode = VirtualNode::ZERO;
82        }
83
84        FullKey::new(
85            TableId::from(table_id),
86            TableKey(vnode.to_be_bytes().to_vec()),
87            HummockEpoch::MAX,
88        )
89    }
90
91    #[derive(Debug, PartialEq, Clone)]
92    pub enum SstSplitType {
93        Left,
94        Right,
95        Both,
96    }
97
98    /// Determine whether the SST needs to be split, and if so, which side to split.
99    pub fn need_to_split(sst: &SstableInfo, split_key: Bytes) -> SstSplitType {
100        let key_range = &sst.key_range;
101        // 1. compare left
102        if KeyComparator::compare_encoded_full_key(&split_key, &key_range.left).is_le() {
103            return SstSplitType::Right;
104        }
105
106        // 2. compare right
107        if key_range.right_exclusive {
108            if KeyComparator::compare_encoded_full_key(&split_key, &key_range.right).is_ge() {
109                return SstSplitType::Left;
110            }
111        } else if KeyComparator::compare_encoded_full_key(&split_key, &key_range.right).is_gt() {
112            return SstSplitType::Left;
113        }
114
115        SstSplitType::Both
116    }
117
118    /// Split the SST into two parts based on the split key.
119    /// The left part is the original SST, and the right part is the new SST.
120    /// The split key is exclusive for the left part and inclusive for the right part.
121    /// The `table_ids` of the new SST are calculated based on the split key.
122    /// e.g.
123    ///  `sst.table_ids` = [1, 2, 3, 4, 5, 6, 7, 8, 9], `split_key` = (`table_id` = 5, vnode = 0)
124    ///  then the result:
125    /// sst1 {
126    ///     `sst_id`: `new_sst_id + 1`,
127    ///     `table_ids`: [1, 2, 3, 4],
128    ///     `key_range`: [left, `split_key`),
129    ///     `sst_size`: `left_size`,
130    /// }
131    /// sst2 {
132    ///    `sst_id`: `new_sst_id`,
133    ///    `table_ids`: [5, 6, 7, 8, 9],
134    ///    `key_range`: [`split_key`, right],
135    ///    `sst_size`: `right_size`,
136    /// }
137    pub fn split_sst(
138        origin_sst_info: SstableInfo,
139        new_sst_id: &mut u64,
140        split_key: Bytes,
141        left_size: u64,
142        right_size: u64,
143    ) -> (Option<SstableInfo>, Option<SstableInfo>) {
144        let mut origin_sst_info = origin_sst_info.get_inner();
145        let mut branch_table_info = origin_sst_info.clone();
146        branch_table_info.sst_id = *new_sst_id;
147        *new_sst_id += 1;
148        origin_sst_info.sst_id = *new_sst_id;
149        *new_sst_id += 1;
150
151        let (key_range_l, key_range_r) = {
152            let key_range = &origin_sst_info.key_range;
153            let l = KeyRange {
154                left: key_range.left.clone(),
155                right: split_key.clone(),
156                right_exclusive: true,
157            };
158
159            let r = KeyRange {
160                left: split_key.clone(),
161                right: key_range.right.clone(),
162                right_exclusive: key_range.right_exclusive,
163            };
164
165            (l, r)
166        };
167        let (table_ids_l, table_ids_r) =
168            split_table_ids_with_split_key(&origin_sst_info.table_ids, split_key.clone());
169
170        // rebuild the key_range and size and sstable file size
171        {
172            // origin_sst_info
173            origin_sst_info.key_range = key_range_l;
174            origin_sst_info.sst_size = std::cmp::max(1, left_size);
175            origin_sst_info.table_ids = table_ids_l;
176        }
177
178        {
179            // new sst
180            branch_table_info.key_range = key_range_r;
181            branch_table_info.sst_size = std::cmp::max(1, right_size);
182            branch_table_info.table_ids = table_ids_r;
183        }
184
185        // This function does not make any assumptions about the incoming sst, so add some judgement to ensure that the generated sst meets the restrictions.
186        if origin_sst_info.table_ids.is_empty() {
187            (None, Some(branch_table_info.into()))
188        } else if branch_table_info.table_ids.is_empty() {
189            (Some(origin_sst_info.into()), None)
190        } else if KeyComparator::compare_encoded_full_key(
191            &origin_sst_info.key_range.left,
192            &origin_sst_info.key_range.right,
193        )
194        .is_eq()
195        {
196            // avoid empty key_range of origin_sst
197            (None, Some(branch_table_info.into()))
198        } else {
199            (Some(origin_sst_info.into()), Some(branch_table_info.into()))
200        }
201    }
202
203    /// The old split sst logic with `table_ids`
204    /// This function is used to split the sst into two parts based on the `table_ids`.
205    /// In contrast to `split_sst`, this function does not modify the `key_range` and does not guarantee that the split ssts can be merged, which needs to be guaranteed by the caller.
206    pub fn split_sst_with_table_ids(
207        origin_sst_info: &SstableInfo,
208        new_sst_id: &mut u64,
209        old_sst_size: u64,
210        new_sst_size: u64,
211        new_table_ids: Vec<u32>,
212    ) -> (SstableInfo, SstableInfo) {
213        let mut sst_info = origin_sst_info.get_inner();
214        let mut branch_table_info = sst_info.clone();
215        branch_table_info.sst_id = *new_sst_id;
216        branch_table_info.sst_size = std::cmp::max(1, new_sst_size);
217        *new_sst_id += 1;
218
219        sst_info.sst_id = *new_sst_id;
220        sst_info.sst_size = std::cmp::max(1, old_sst_size);
221        *new_sst_id += 1;
222
223        {
224            // related github.com/risingwavelabs/risingwave/pull/17898/
225            // This is a temporary implementation that will update `table_ids`` based on the new split rule after PR 17898
226            // sst_info.table_ids = vec[1, 2, 3];
227            // new_table_ids = vec[2, 3, 4];
228            // branch_table_info.table_ids = vec[1, 2, 3] ∩ vec[2, 3, 4] = vec[2, 3]
229            let set1: BTreeSet<_> = sst_info.table_ids.iter().cloned().collect();
230            let set2: BTreeSet<_> = new_table_ids.into_iter().collect();
231            let intersection: Vec<_> = set1.intersection(&set2).cloned().collect();
232
233            // Update table_ids
234            branch_table_info.table_ids = intersection;
235            sst_info
236                .table_ids
237                .retain(|table_id| !branch_table_info.table_ids.contains(table_id));
238        }
239
240        (sst_info.into(), branch_table_info.into())
241    }
242
243    // Should avoid split same table_id into two groups
244    pub fn split_table_ids_with_split_key(
245        table_ids: &Vec<u32>,
246        split_key: Bytes,
247    ) -> (Vec<u32>, Vec<u32>) {
248        assert!(table_ids.is_sorted());
249        let split_full_key = FullKey::decode(&split_key);
250        let split_user_key = split_full_key.user_key;
251        let vnode = split_user_key.get_vnode_id();
252        let table_id = split_user_key.table_id.table_id();
253        split_table_ids_with_table_id_and_vnode(table_ids, table_id, vnode)
254    }
255
256    pub fn split_table_ids_with_table_id_and_vnode(
257        table_ids: &Vec<u32>,
258        table_id: StateTableId,
259        vnode: usize,
260    ) -> (Vec<u32>, Vec<u32>) {
261        assert!(table_ids.is_sorted());
262        assert_eq!(VirtualNode::ZERO, VirtualNode::from_index(vnode));
263        let pos = table_ids.partition_point(|&id| id < table_id);
264        (table_ids[..pos].to_vec(), table_ids[pos..].to_vec())
265    }
266
267    pub fn get_split_pos(sstables: &Vec<SstableInfo>, split_key: Bytes) -> usize {
268        sstables
269            .partition_point(|sst| {
270                KeyComparator::compare_encoded_full_key(&sst.key_range.left, &split_key).is_lt()
271            })
272            .saturating_sub(1)
273    }
274
275    /// Merge the right levels into the left levels.
276    pub fn merge_levels(left_levels: &mut Levels, right_levels: Levels) {
277        let right_l0 = right_levels.l0;
278
279        let mut max_left_sub_level_id = left_levels
280            .l0
281            .sub_levels
282            .iter()
283            .map(|sub_level| sub_level.sub_level_id + 1)
284            .max()
285            .unwrap_or(0); // If there are no sub levels, the max sub level id is 0.
286        let need_rewrite_right_sub_level_id = max_left_sub_level_id != 0;
287
288        for mut right_sub_level in right_l0.sub_levels {
289            // Rewrtie the sub level id of right sub level to avoid conflict with left sub levels. (conflict level type)
290            // e.g. left sub levels: [0, 1, 2], right sub levels: [0, 1, 2], after rewrite, right sub levels: [3, 4, 5]
291            if need_rewrite_right_sub_level_id {
292                right_sub_level.sub_level_id = max_left_sub_level_id;
293                max_left_sub_level_id += 1;
294            }
295
296            insert_new_sub_level(
297                &mut left_levels.l0,
298                right_sub_level.sub_level_id,
299                right_sub_level.level_type,
300                right_sub_level.table_infos,
301                None,
302            );
303        }
304
305        assert!(
306            left_levels
307                .l0
308                .sub_levels
309                .is_sorted_by_key(|sub_level| sub_level.sub_level_id),
310            "{}",
311            format!("left_levels.l0.sub_levels: {:?}", left_levels.l0.sub_levels)
312        );
313
314        // Reinitialise `vnode_partition_count` to avoid misaligned hierarchies
315        // caused by the merge of different compaction groups.(picker might reject the different `vnode_partition_count` sub_level to compact)
316        left_levels
317            .l0
318            .sub_levels
319            .iter_mut()
320            .for_each(|sub_level| sub_level.vnode_partition_count = 0);
321
322        for (idx, level) in right_levels.levels.into_iter().enumerate() {
323            if level.table_infos.is_empty() {
324                continue;
325            }
326
327            let insert_table_infos = level.table_infos;
328            left_levels.levels[idx].total_file_size += insert_table_infos
329                .iter()
330                .map(|sst| sst.sst_size)
331                .sum::<u64>();
332            left_levels.levels[idx].uncompressed_file_size += insert_table_infos
333                .iter()
334                .map(|sst| sst.uncompressed_file_size)
335                .sum::<u64>();
336
337            left_levels.levels[idx]
338                .table_infos
339                .extend(insert_table_infos);
340            left_levels.levels[idx]
341                .table_infos
342                .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
343            assert!(
344                can_concat(&left_levels.levels[idx].table_infos),
345                "{}",
346                format!(
347                    "left-group {} right-group {} left_levels.levels[{}].table_infos: {:?} level_idx {:?}",
348                    left_levels.group_id,
349                    right_levels.group_id,
350                    idx,
351                    left_levels.levels[idx].table_infos,
352                    left_levels.levels[idx].level_idx
353                )
354            );
355        }
356    }
357
358    // When `insert_hint` is `Ok(idx)`, it means that the sub level `idx` in `target_l0`
359    // will extend these SSTs. When `insert_hint` is `Err(idx)`, it
360    // means that we will add a new sub level `idx` into `target_l0`.
361    pub fn get_sub_level_insert_hint(
362        target_levels: &Vec<Level>,
363        sub_level: &Level,
364    ) -> Result<usize, usize> {
365        for (idx, other) in target_levels.iter().enumerate() {
366            match other.sub_level_id.cmp(&sub_level.sub_level_id) {
367                Ordering::Less => {}
368                Ordering::Equal => {
369                    return Ok(idx);
370                }
371                Ordering::Greater => {
372                    return Err(idx);
373                }
374            }
375        }
376
377        Err(target_levels.len())
378    }
379
380    /// Split the SSTs in the level according to the split key.
381    pub fn split_sst_info_for_level_v2(
382        level: &mut Level,
383        new_sst_id: &mut u64,
384        split_key: Bytes,
385    ) -> Vec<SstableInfo> {
386        if level.table_infos.is_empty() {
387            return vec![];
388        }
389
390        let mut insert_table_infos = vec![];
391        if level.level_type == PbLevelType::Overlapping {
392            level.table_infos.retain_mut(|sst| {
393                let sst_split_type = need_to_split(sst, split_key.clone());
394                match sst_split_type {
395                    SstSplitType::Left => true,
396                    SstSplitType::Right => {
397                        insert_table_infos.push(sst.clone());
398                        false
399                    }
400                    SstSplitType::Both => {
401                        let sst_size = sst.sst_size;
402                        if sst_size / 2 == 0 {
403                            tracing::warn!(
404                                id = sst.sst_id,
405                                object_id = sst.object_id,
406                                sst_size = sst.sst_size,
407                                file_size = sst.file_size,
408                                "Sstable sst_size is under expected",
409                            );
410                        };
411
412                        let (left, right) = split_sst(
413                            sst.clone(),
414                            new_sst_id,
415                            split_key.clone(),
416                            sst_size / 2,
417                            sst_size / 2,
418                        );
419                        if let Some(branch_sst) = right {
420                            insert_table_infos.push(branch_sst);
421                        }
422
423                        if left.is_some() {
424                            *sst = left.unwrap();
425                            true
426                        } else {
427                            false
428                        }
429                    }
430                }
431            });
432        } else {
433            let pos = get_split_pos(&level.table_infos, split_key.clone());
434            if pos >= level.table_infos.len() {
435                return insert_table_infos;
436            }
437
438            let sst_split_type = need_to_split(&level.table_infos[pos], split_key.clone());
439            match sst_split_type {
440                SstSplitType::Left => {
441                    insert_table_infos.extend_from_slice(&level.table_infos[pos + 1..]);
442                    level.table_infos = level.table_infos[0..=pos].to_vec();
443                }
444                SstSplitType::Right => {
445                    assert_eq!(0, pos);
446                    insert_table_infos.extend_from_slice(&level.table_infos[pos..]); // the sst at pos has been split to the right
447                    level.table_infos.clear();
448                }
449                SstSplitType::Both => {
450                    // split the sst
451                    let sst = level.table_infos[pos].clone();
452                    let sst_size = sst.sst_size;
453                    if sst_size / 2 == 0 {
454                        tracing::warn!(
455                            id = sst.sst_id,
456                            object_id = sst.object_id,
457                            sst_size = sst.sst_size,
458                            file_size = sst.file_size,
459                            "Sstable sst_size is under expected",
460                        );
461                    };
462
463                    let (left, right) = split_sst(
464                        sst,
465                        new_sst_id,
466                        split_key.clone(),
467                        sst_size / 2,
468                        sst_size / 2,
469                    );
470
471                    if let Some(branch_sst) = right {
472                        insert_table_infos.push(branch_sst);
473                    }
474
475                    let right_start = pos + 1;
476                    let left_end = pos;
477                    // the sst at pos has been split to both left and right
478                    // the branched sst has been inserted to the `insert_table_infos`
479                    insert_table_infos.extend_from_slice(&level.table_infos[right_start..]);
480                    level.table_infos = level.table_infos[0..=left_end].to_vec();
481                    if let Some(origin_sst) = left {
482                        // replace the origin sst with the left part
483                        level.table_infos[left_end] = origin_sst;
484                    } else {
485                        level.table_infos.pop();
486                    }
487                }
488            };
489        }
490
491        insert_table_infos
492    }
493}
494
495#[cfg(test)]
496mod tests {
497    use risingwave_common::hash::VirtualNode;
498
499    #[test]
500    fn test_split_table_ids() {
501        let table_ids = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
502        let (left, right) = super::group_split::split_table_ids_with_table_id_and_vnode(
503            &table_ids,
504            5,
505            VirtualNode::ZERO.to_index(),
506        );
507        assert_eq!(left, vec![1, 2, 3, 4]);
508        assert_eq!(right, vec![5, 6, 7, 8, 9]);
509
510        // test table_id not in the table_ids
511
512        let (left, right) = super::group_split::split_table_ids_with_table_id_and_vnode(
513            &table_ids,
514            10,
515            VirtualNode::ZERO.to_index(),
516        );
517        assert_eq!(left, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
518        assert!(right.is_empty());
519
520        let (left, right) = super::group_split::split_table_ids_with_table_id_and_vnode(
521            &table_ids,
522            0,
523            VirtualNode::ZERO.to_index(),
524        );
525
526        assert!(left.is_empty());
527        assert_eq!(right, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
528    }
529
530    #[test]
531    fn test_split_table_ids_with_split_key() {
532        let table_ids = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
533        let split_key = super::group_split::build_split_key(5, VirtualNode::ZERO);
534        let (left, right) =
535            super::group_split::split_table_ids_with_split_key(&table_ids, split_key);
536        assert_eq!(left, vec![1, 2, 3, 4]);
537        assert_eq!(right, vec![5, 6, 7, 8, 9]);
538    }
539}