risingwave_hummock_sdk/compaction_group/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod hummock_version_ext;
16
17use risingwave_common::catalog::TableId;
18
19pub type StateTableId = TableId;
20
21/// A compaction task's `StaticCompactionGroupId` indicates the compaction group that all its input
22/// SSTs belong to.
23#[expect(non_upper_case_globals)]
24#[expect(non_snake_case)]
25pub mod StaticCompactionGroupId {
26    use risingwave_pb::id::CompactionGroupId;
27
28    /// Create a new compaction group.
29    pub const NewCompactionGroup: CompactionGroupId = CompactionGroupId::new(0);
30    /// All shared buffer local compaction task goes to here. Meta service will never see this
31    /// value. Note that currently we've restricted the compaction task's input by `via
32    /// compact_shared_buffer_by_compaction_group`
33    pub const SharedBuffer: CompactionGroupId = CompactionGroupId::new(1);
34    /// All states goes to here by default.
35    pub const StateDefault: CompactionGroupId = CompactionGroupId::new(2);
36    /// All MVs goes to here.
37    pub const MaterializedView: CompactionGroupId = CompactionGroupId::new(3);
38    /// Larger than any `StaticCompactionGroupId`.
39    pub const End: CompactionGroupId = CompactionGroupId::new(4);
40}
41
42/// The split will follow the following rules:
43/// 1. Ssts with `split_key` will be split into two separate ssts and their `key_range` will be changed `sst_1`: [`sst.key_range.right`, `split_key`) `sst_2`: [`split_key`, `sst.key_range.right`].
44/// 2. Currently only `vnode` 0 and `vnode` max is supported.
45/// 3. Due to the above rule, `vnode` max will be rewritten as `table_id` + 1, vnode 0
46pub mod group_split {
47    use std::cmp::Ordering;
48    use std::collections::BTreeSet;
49
50    use bytes::Bytes;
51    use risingwave_common::catalog::TableId;
52    use risingwave_common::hash::VirtualNode;
53    use risingwave_pb::hummock::PbLevelType;
54
55    use super::StateTableId;
56    use super::hummock_version_ext::insert_new_sub_level;
57    use crate::key::{FullKey, TableKey};
58    use crate::key_range::KeyRange;
59    use crate::level::{Level, Levels};
60    use crate::sstable_info::SstableInfo;
61    use crate::{HummockEpoch, HummockSstableId, KeyComparator, can_concat};
62
63    // By default, the split key is constructed with vnode = 0 and epoch = MAX, so that we can split table_id to the right group
64    pub fn build_split_key(table_id: StateTableId, vnode: VirtualNode) -> Bytes {
65        build_split_full_key(table_id, vnode).encode().into()
66    }
67
68    /// generate a full key for convenience to get the `table_id` and `vnode`
69    pub fn build_split_full_key(
70        mut table_id: StateTableId,
71        mut vnode: VirtualNode,
72    ) -> FullKey<Vec<u8>> {
73        if VirtualNode::MAX_REPRESENTABLE == vnode {
74            // Modify `table_id` to `next_table_id` to satisfy the `split_to_right`` rule, so that the `table_id`` originally passed in will be split to left.
75            table_id = table_id.as_raw_id().strict_add(1).into();
76            vnode = VirtualNode::ZERO;
77        }
78
79        FullKey::new(
80            table_id,
81            TableKey(vnode.to_be_bytes().to_vec()),
82            HummockEpoch::MAX,
83        )
84    }
85
86    #[derive(Debug, PartialEq, Clone)]
87    pub enum SstSplitType {
88        Left,
89        Right,
90        Both,
91    }
92
93    /// Determine whether the SST needs to be split, and if so, which side to split.
94    pub fn need_to_split(sst: &SstableInfo, split_key: Bytes) -> SstSplitType {
95        let key_range = &sst.key_range;
96        // 1. compare left
97        if KeyComparator::compare_encoded_full_key(&split_key, &key_range.left).is_le() {
98            return SstSplitType::Right;
99        }
100
101        // 2. compare right
102        if key_range.right_exclusive {
103            if KeyComparator::compare_encoded_full_key(&split_key, &key_range.right).is_ge() {
104                return SstSplitType::Left;
105            }
106        } else if KeyComparator::compare_encoded_full_key(&split_key, &key_range.right).is_gt() {
107            return SstSplitType::Left;
108        }
109
110        SstSplitType::Both
111    }
112
113    /// Split the SST into two parts based on the split key.
114    /// The left part is the original SST, and the right part is the new SST.
115    /// The split key is exclusive for the left part and inclusive for the right part.
116    /// The `table_ids` of the new SST are calculated based on the split key.
117    /// e.g.
118    ///  `sst.table_ids` = [1, 2, 3, 4, 5, 6, 7, 8, 9], `split_key` = (`table_id` = 5, vnode = 0)
119    ///  then the result:
120    /// sst1 {
121    ///     `sst_id`: `new_sst_id + 1`,
122    ///     `table_ids`: [1, 2, 3, 4],
123    ///     `key_range`: [left, `split_key`),
124    ///     `sst_size`: `left_size`,
125    /// }
126    /// sst2 {
127    ///    `sst_id`: `new_sst_id`,
128    ///    `table_ids`: [5, 6, 7, 8, 9],
129    ///    `key_range`: [`split_key`, right],
130    ///    `sst_size`: `right_size`,
131    /// }
132    pub fn split_sst(
133        origin_sst_info: SstableInfo,
134        new_sst_id: &mut HummockSstableId,
135        split_key: Bytes,
136        left_size: u64,
137        right_size: u64,
138    ) -> (Option<SstableInfo>, Option<SstableInfo>) {
139        let mut origin_sst_info = origin_sst_info.get_inner();
140        let mut branch_table_info = origin_sst_info.clone();
141        branch_table_info.sst_id = *new_sst_id;
142        *new_sst_id += 1;
143        origin_sst_info.sst_id = *new_sst_id;
144        *new_sst_id += 1;
145
146        let (key_range_l, key_range_r) = {
147            let key_range = &origin_sst_info.key_range;
148            let l = KeyRange {
149                left: key_range.left.clone(),
150                right: split_key.clone(),
151                right_exclusive: true,
152            };
153
154            let r = KeyRange {
155                left: split_key.clone(),
156                right: key_range.right.clone(),
157                right_exclusive: key_range.right_exclusive,
158            };
159
160            (l, r)
161        };
162        let (table_ids_l, table_ids_r) =
163            split_table_ids_with_split_key(&origin_sst_info.table_ids, split_key);
164
165        // rebuild the key_range and size and sstable file size
166        {
167            // origin_sst_info
168            origin_sst_info.key_range = key_range_l;
169            origin_sst_info.sst_size = std::cmp::max(1, left_size);
170            origin_sst_info.table_ids = table_ids_l;
171        }
172
173        {
174            // new sst
175            branch_table_info.key_range = key_range_r;
176            branch_table_info.sst_size = std::cmp::max(1, right_size);
177            branch_table_info.table_ids = table_ids_r;
178        }
179
180        // This function does not make any assumptions about the incoming sst, so add some judgement to ensure that the generated sst meets the restrictions.
181        if origin_sst_info.table_ids.is_empty() {
182            (None, Some(branch_table_info.into()))
183        } else if branch_table_info.table_ids.is_empty() {
184            (Some(origin_sst_info.into()), None)
185        } else if KeyComparator::compare_encoded_full_key(
186            &origin_sst_info.key_range.left,
187            &origin_sst_info.key_range.right,
188        )
189        .is_eq()
190        {
191            // avoid empty key_range of origin_sst
192            (None, Some(branch_table_info.into()))
193        } else {
194            (Some(origin_sst_info.into()), Some(branch_table_info.into()))
195        }
196    }
197
198    /// The old split sst logic with `table_ids`
199    /// This function is used to split the sst into two parts based on the `table_ids`.
200    /// In contrast to `split_sst`, this function does not modify the `key_range` and does not guarantee that the split ssts can be merged, which needs to be guaranteed by the caller.
201    pub fn split_sst_with_table_ids(
202        origin_sst_info: &SstableInfo,
203        new_sst_id: &mut HummockSstableId,
204        old_sst_size: u64,
205        new_sst_size: u64,
206        new_table_ids: Vec<TableId>,
207    ) -> (SstableInfo, SstableInfo) {
208        let mut sst_info = origin_sst_info.get_inner();
209        let mut branch_table_info = sst_info.clone();
210        branch_table_info.sst_id = *new_sst_id;
211        branch_table_info.sst_size = std::cmp::max(1, new_sst_size);
212        *new_sst_id += 1;
213
214        sst_info.sst_id = *new_sst_id;
215        sst_info.sst_size = std::cmp::max(1, old_sst_size);
216        *new_sst_id += 1;
217
218        {
219            // related github.com/risingwavelabs/risingwave/pull/17898/
220            // This is a temporary implementation that will update `table_ids`` based on the new split rule after PR 17898
221            // sst_info.table_ids = vec[1, 2, 3];
222            // new_table_ids = vec[2, 3, 4];
223            // branch_table_info.table_ids = vec[1, 2, 3] ∩ vec[2, 3, 4] = vec[2, 3]
224            let set1: BTreeSet<_> = sst_info.table_ids.iter().cloned().collect();
225            let set2: BTreeSet<_> = new_table_ids.into_iter().collect();
226            let intersection: Vec<_> = set1.intersection(&set2).cloned().collect();
227
228            // Update table_ids
229            branch_table_info.table_ids = intersection;
230            sst_info
231                .table_ids
232                .retain(|table_id| !branch_table_info.table_ids.contains(table_id));
233        }
234
235        (sst_info.into(), branch_table_info.into())
236    }
237
238    // Should avoid split same table_id into two groups
239    pub fn split_table_ids_with_split_key(
240        table_ids: &Vec<TableId>,
241        split_key: Bytes,
242    ) -> (Vec<TableId>, Vec<TableId>) {
243        assert!(table_ids.is_sorted());
244        let split_full_key = FullKey::decode(&split_key);
245        let split_user_key = split_full_key.user_key;
246        let vnode = split_user_key.get_vnode_id();
247        let table_id = split_user_key.table_id;
248        split_table_ids_with_table_id_and_vnode(table_ids, table_id, vnode)
249    }
250
251    pub fn split_table_ids_with_table_id_and_vnode(
252        table_ids: &Vec<TableId>,
253        table_id: StateTableId,
254        vnode: usize,
255    ) -> (Vec<TableId>, Vec<TableId>) {
256        assert!(table_ids.is_sorted());
257        assert_eq!(VirtualNode::ZERO, VirtualNode::from_index(vnode));
258        let pos = table_ids.partition_point(|&id| id < table_id);
259        (table_ids[..pos].to_vec(), table_ids[pos..].to_vec())
260    }
261
262    pub fn get_split_pos(sstables: &Vec<SstableInfo>, split_key: Bytes) -> usize {
263        sstables
264            .partition_point(|sst| {
265                KeyComparator::compare_encoded_full_key(&sst.key_range.left, &split_key).is_lt()
266            })
267            .saturating_sub(1)
268    }
269
270    /// Merge the right levels into the left levels.
271    pub fn merge_levels(left_levels: &mut Levels, right_levels: Levels) {
272        let right_l0 = right_levels.l0;
273
274        let mut max_left_sub_level_id = left_levels
275            .l0
276            .sub_levels
277            .iter()
278            .map(|sub_level| sub_level.sub_level_id + 1)
279            .max()
280            .unwrap_or(0); // If there are no sub levels, the max sub level id is 0.
281        let need_rewrite_right_sub_level_id = max_left_sub_level_id != 0;
282
283        for mut right_sub_level in right_l0.sub_levels {
284            // Rewrtie the sub level id of right sub level to avoid conflict with left sub levels. (conflict level type)
285            // e.g. left sub levels: [0, 1, 2], right sub levels: [0, 1, 2], after rewrite, right sub levels: [3, 4, 5]
286            if need_rewrite_right_sub_level_id {
287                right_sub_level.sub_level_id = max_left_sub_level_id;
288                max_left_sub_level_id += 1;
289            }
290
291            insert_new_sub_level(
292                &mut left_levels.l0,
293                right_sub_level.sub_level_id,
294                right_sub_level.level_type,
295                right_sub_level.table_infos,
296                None,
297            );
298        }
299
300        assert!(
301            left_levels
302                .l0
303                .sub_levels
304                .is_sorted_by_key(|sub_level| sub_level.sub_level_id),
305            "{}",
306            format!("left_levels.l0.sub_levels: {:?}", left_levels.l0.sub_levels)
307        );
308
309        // Reinitialise `vnode_partition_count` to avoid misaligned hierarchies
310        // caused by the merge of different compaction groups.(picker might reject the different `vnode_partition_count` sub_level to compact)
311        left_levels
312            .l0
313            .sub_levels
314            .iter_mut()
315            .for_each(|sub_level| sub_level.vnode_partition_count = 0);
316
317        for (idx, level) in right_levels.levels.into_iter().enumerate() {
318            if level.table_infos.is_empty() {
319                continue;
320            }
321
322            let insert_table_infos = level.table_infos;
323            left_levels.levels[idx].total_file_size += insert_table_infos
324                .iter()
325                .map(|sst| sst.sst_size)
326                .sum::<u64>();
327            left_levels.levels[idx].uncompressed_file_size += insert_table_infos
328                .iter()
329                .map(|sst| sst.uncompressed_file_size)
330                .sum::<u64>();
331
332            left_levels.levels[idx]
333                .table_infos
334                .extend(insert_table_infos);
335            left_levels.levels[idx]
336                .table_infos
337                .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
338            assert!(
339                can_concat(&left_levels.levels[idx].table_infos),
340                "{}",
341                format!(
342                    "left-group {} right-group {} left_levels.levels[{}].table_infos: {:?} level_idx {:?}",
343                    left_levels.group_id,
344                    right_levels.group_id,
345                    idx,
346                    left_levels.levels[idx].table_infos,
347                    left_levels.levels[idx].level_idx
348                )
349            );
350        }
351    }
352
353    // When `insert_hint` is `Ok(idx)`, it means that the sub level `idx` in `target_l0`
354    // will extend these SSTs. When `insert_hint` is `Err(idx)`, it
355    // means that we will add a new sub level `idx` into `target_l0`.
356    pub fn get_sub_level_insert_hint(
357        target_levels: &Vec<Level>,
358        sub_level: &Level,
359    ) -> Result<usize, usize> {
360        for (idx, other) in target_levels.iter().enumerate() {
361            match other.sub_level_id.cmp(&sub_level.sub_level_id) {
362                Ordering::Less => {}
363                Ordering::Equal => {
364                    return Ok(idx);
365                }
366                Ordering::Greater => {
367                    return Err(idx);
368                }
369            }
370        }
371
372        Err(target_levels.len())
373    }
374
375    /// Split the SSTs in the level according to the split key.
376    pub fn split_sst_info_for_level_v2(
377        level: &mut Level,
378        new_sst_id: &mut HummockSstableId,
379        split_key: Bytes,
380    ) -> Vec<SstableInfo> {
381        if level.table_infos.is_empty() {
382            return vec![];
383        }
384
385        let mut insert_table_infos = vec![];
386        if level.level_type == PbLevelType::Overlapping {
387            level.table_infos.retain_mut(|sst| {
388                let sst_split_type = need_to_split(sst, split_key.clone());
389                match sst_split_type {
390                    SstSplitType::Left => true,
391                    SstSplitType::Right => {
392                        insert_table_infos.push(sst.clone());
393                        false
394                    }
395                    SstSplitType::Both => {
396                        let sst_size = sst.sst_size;
397                        if sst_size / 2 == 0 {
398                            tracing::warn!(
399                                id = %sst.sst_id,
400                                object_id = %sst.object_id,
401                                sst_size = sst.sst_size,
402                                file_size = sst.file_size,
403                                "Sstable sst_size is under expected",
404                            );
405                        };
406
407                        let (left, right) = split_sst(
408                            sst.clone(),
409                            new_sst_id,
410                            split_key.clone(),
411                            sst_size / 2,
412                            sst_size / 2,
413                        );
414                        if let Some(branch_sst) = right {
415                            insert_table_infos.push(branch_sst);
416                        }
417
418                        if let Some(s) = left {
419                            *sst = s;
420                            true
421                        } else {
422                            false
423                        }
424                    }
425                }
426            });
427        } else {
428            let pos = get_split_pos(&level.table_infos, split_key.clone());
429            if pos >= level.table_infos.len() {
430                return insert_table_infos;
431            }
432
433            let sst_split_type = need_to_split(&level.table_infos[pos], split_key.clone());
434            match sst_split_type {
435                SstSplitType::Left => {
436                    insert_table_infos.extend_from_slice(&level.table_infos[pos + 1..]);
437                    level.table_infos = level.table_infos[0..=pos].to_vec();
438                }
439                SstSplitType::Right => {
440                    assert_eq!(0, pos);
441                    insert_table_infos.extend_from_slice(&level.table_infos[pos..]); // the sst at pos has been split to the right
442                    level.table_infos.clear();
443                }
444                SstSplitType::Both => {
445                    // split the sst
446                    let sst = level.table_infos[pos].clone();
447                    let sst_size = sst.sst_size;
448                    if sst_size / 2 == 0 {
449                        tracing::warn!(
450                            id = %sst.sst_id,
451                            object_id = %sst.object_id,
452                            sst_size = sst.sst_size,
453                            file_size = sst.file_size,
454                            "Sstable sst_size is under expected",
455                        );
456                    };
457
458                    let (left, right) = split_sst(
459                        sst,
460                        new_sst_id,
461                        split_key.clone(),
462                        sst_size / 2,
463                        sst_size / 2,
464                    );
465
466                    if let Some(branch_sst) = right {
467                        insert_table_infos.push(branch_sst);
468                    }
469
470                    let right_start = pos + 1;
471                    let left_end = pos;
472                    // the sst at pos has been split to both left and right
473                    // the branched sst has been inserted to the `insert_table_infos`
474                    insert_table_infos.extend_from_slice(&level.table_infos[right_start..]);
475                    level.table_infos = level.table_infos[0..=left_end].to_vec();
476                    if let Some(origin_sst) = left {
477                        // replace the origin sst with the left part
478                        level.table_infos[left_end] = origin_sst;
479                    } else {
480                        level.table_infos.pop();
481                    }
482                }
483            };
484        }
485
486        insert_table_infos
487    }
488}
489
490#[cfg(test)]
491mod tests {
492    use itertools::Itertools;
493    use risingwave_common::catalog::TableId;
494    use risingwave_common::hash::VirtualNode;
495
496    #[test]
497    fn test_split_table_ids() {
498        let table_ids = [1, 2, 3, 4, 5, 6, 7, 8, 9]
499            .into_iter()
500            .map(Into::<TableId>::into)
501            .collect();
502        let (left, right) = super::group_split::split_table_ids_with_table_id_and_vnode(
503            &table_ids,
504            5.into(),
505            VirtualNode::ZERO.to_index(),
506        );
507        assert_eq!(
508            left,
509            [1, 2, 3, 4]
510                .into_iter()
511                .map(Into::<TableId>::into)
512                .collect_vec()
513        );
514        assert_eq!(
515            right,
516            [5, 6, 7, 8, 9]
517                .into_iter()
518                .map(Into::<TableId>::into)
519                .collect_vec()
520        );
521
522        // test table_id not in the table_ids
523
524        let (left, right) = super::group_split::split_table_ids_with_table_id_and_vnode(
525            &table_ids,
526            10.into(),
527            VirtualNode::ZERO.to_index(),
528        );
529        assert_eq!(
530            left,
531            [1, 2, 3, 4, 5, 6, 7, 8, 9]
532                .into_iter()
533                .map(Into::<TableId>::into)
534                .collect_vec()
535        );
536        assert!(right.is_empty());
537
538        let (left, right) = super::group_split::split_table_ids_with_table_id_and_vnode(
539            &table_ids,
540            0.into(),
541            VirtualNode::ZERO.to_index(),
542        );
543
544        assert!(left.is_empty());
545        assert_eq!(
546            right,
547            [1, 2, 3, 4, 5, 6, 7, 8, 9]
548                .into_iter()
549                .map(Into::<TableId>::into)
550                .collect_vec()
551        );
552    }
553
554    #[test]
555    fn test_split_table_ids_with_split_key() {
556        let table_ids = [1, 2, 3, 4, 5, 6, 7, 8, 9]
557            .into_iter()
558            .map(Into::<TableId>::into)
559            .collect();
560        let split_key = super::group_split::build_split_key(5.into(), VirtualNode::ZERO);
561        let (left, right) =
562            super::group_split::split_table_ids_with_split_key(&table_ids, split_key);
563        assert_eq!(
564            left,
565            [1, 2, 3, 4]
566                .into_iter()
567                .map(Into::<TableId>::into)
568                .collect_vec()
569        );
570        assert_eq!(
571            right,
572            [5, 6, 7, 8, 9]
573                .into_iter()
574                .map(Into::<TableId>::into)
575                .collect_vec()
576        );
577    }
578}